Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module

* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
  for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
  subprocess.Popen class:

  - pid, returncode, stdin, stdout and stderr attributes
  - communicate(), wait(), send_signal(), terminate() and kill() methods

* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
  and unix_events, to not be confused with the symbols with the same name of
  subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
  of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
  the write buffer size is greater than the high water mark (64 KB by default)
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index eb22c38..3df2f80 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -24,6 +24,7 @@
 from .protocols import *
 from .queues import *
 from .streams import *
+from .subprocess import *
 from .tasks import *
 from .transports import *
 
@@ -39,5 +40,6 @@
            protocols.__all__ +
            queues.__all__ +
            streams.__all__ +
+           subprocess.__all__ +
            tasks.__all__ +
            transports.__all__)
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index b7cdbce..b78f816 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -6,11 +6,6 @@
 from . import transports
 
 
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
 class BaseSubprocessTransport(transports.SubprocessTransport):
 
     def __init__(self, loop, protocol, args, shell,
@@ -22,11 +17,11 @@
 
         self._pipes = {}
         if stdin == subprocess.PIPE:
-            self._pipes[STDIN] = None
+            self._pipes[0] = None
         if stdout == subprocess.PIPE:
-            self._pipes[STDOUT] = None
+            self._pipes[1] = None
         if stderr == subprocess.PIPE:
-            self._pipes[STDERR] = None
+            self._pipes[2] = None
         self._pending_calls = collections.deque()
         self._finished = False
         self._returncode = None
@@ -76,19 +71,19 @@
         loop = self._loop
         if proc.stdin is not None:
             _, pipe = yield from loop.connect_write_pipe(
-                lambda: WriteSubprocessPipeProto(self, STDIN),
+                lambda: WriteSubprocessPipeProto(self, 0),
                 proc.stdin)
-            self._pipes[STDIN] = pipe
+            self._pipes[0] = pipe
         if proc.stdout is not None:
             _, pipe = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDOUT),
+                lambda: ReadSubprocessPipeProto(self, 1),
                 proc.stdout)
-            self._pipes[STDOUT] = pipe
+            self._pipes[1] = pipe
         if proc.stderr is not None:
             _, pipe = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDERR),
+                lambda: ReadSubprocessPipeProto(self, 2),
                 proc.stderr)
-            self._pipes[STDERR] = pipe
+            self._pipes[2] = pipe
 
         assert self._pending_calls is not None
 
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index b6b3be2..fb67155 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -29,6 +29,7 @@
         self._buffer = None  # None or bytearray.
         self._read_fut = None
         self._write_fut = None
+        self._pending_write = 0
         self._conn_lost = 0
         self._closing = False  # Set when close() called.
         self._eof_written = False
@@ -68,6 +69,7 @@
         if self._read_fut:
             self._read_fut.cancel()
         self._write_fut = self._read_fut = None
+        self._pending_write = 0
         self._buffer = None
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -128,11 +130,10 @@
         self._low_water = low
 
     def get_write_buffer_size(self):
-        # NOTE: This doesn't take into account data already passed to
-        # send() even if send() hasn't finished yet.
-        if not self._buffer:
-            return 0
-        return len(self._buffer)
+        size = self._pending_write
+        if self._buffer is not None:
+            size += len(self._buffer)
+        return size
 
 
 class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
@@ -206,7 +207,7 @@
 
 
 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
-                                  transports.WriteTransport):
+                                      transports.WriteTransport):
     """Transport for write pipes."""
 
     def write(self, data):
@@ -252,6 +253,7 @@
         try:
             assert f is self._write_fut
             self._write_fut = None
+            self._pending_write = 0
             if f:
                 f.result()
             if data is None:
@@ -262,15 +264,21 @@
                     self._loop.call_soon(self._call_connection_lost, None)
                 if self._eof_written:
                     self._sock.shutdown(socket.SHUT_WR)
+                # Now that we've reduced the buffer size, tell the
+                # protocol to resume writing if it was paused.  Note that
+                # we do this last since the callback is called immediately
+                # and it may add more data to the buffer (even causing the
+                # protocol to be paused again).
+                self._maybe_resume_protocol()
             else:
                 self._write_fut = self._loop._proactor.send(self._sock, data)
-                self._write_fut.add_done_callback(self._loop_writing)
-            # Now that we've reduced the buffer size, tell the
-            # protocol to resume writing if it was paused.  Note that
-            # we do this last since the callback is called immediately
-            # and it may add more data to the buffer (even causing the
-            # protocol to be paused again).
-            self._maybe_resume_protocol()
+                if not self._write_fut.done():
+                    assert self._pending_write == 0
+                    self._pending_write = len(data)
+                    self._write_fut.add_done_callback(self._loop_writing)
+                    self._maybe_pause_protocol()
+                else:
+                    self._write_fut.add_done_callback(self._loop_writing)
         except ConnectionResetError as exc:
             self._force_close(exc)
         except OSError as exc:
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
new file mode 100644
index 0000000..4312d44
--- /dev/null
+++ b/Lib/asyncio/subprocess.py
@@ -0,0 +1,197 @@
+__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
+
+import collections
+import subprocess
+
+from . import events
+from . import futures
+from . import protocols
+from . import streams
+from . import tasks
+
+
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+DEVNULL = subprocess.DEVNULL
+
+
+class SubprocessStreamProtocol(streams.FlowControlMixin,
+                               protocols.SubprocessProtocol):
+    """Like StreamReaderProtocol, but for a subprocess."""
+
+    def __init__(self, limit, loop):
+        super().__init__(loop=loop)
+        self._limit = limit
+        self.stdin = self.stdout = self.stderr = None
+        self.waiter = futures.Future(loop=loop)
+        self._waiters = collections.deque()
+        self._transport = None
+
+    def connection_made(self, transport):
+        self._transport = transport
+        if transport.get_pipe_transport(1):
+            self.stdout = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+        if transport.get_pipe_transport(2):
+            self.stderr = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+        stdin = transport.get_pipe_transport(0)
+        if stdin is not None:
+            self.stdin = streams.StreamWriter(stdin,
+                                              protocol=self,
+                                              reader=None,
+                                              loop=self._loop)
+        self.waiter.set_result(None)
+
+    def pipe_data_received(self, fd, data):
+        if fd == 1:
+            reader = self.stdout
+        elif fd == 2:
+            reader = self.stderr
+        else:
+            reader = None
+        if reader is not None:
+            reader.feed_data(data)
+
+    def pipe_connection_lost(self, fd, exc):
+        if fd == 0:
+            pipe = self.stdin
+            if pipe is not None:
+                pipe.close()
+            self.connection_lost(exc)
+            return
+        if fd == 1:
+            reader = self.stdout
+        elif fd == 2:
+            reader = self.stderr
+        else:
+            reader = None
+        if reader != None:
+            if exc is None:
+                reader.feed_eof()
+            else:
+                reader.set_exception(exc)
+
+    def process_exited(self):
+        # wake up futures waiting for wait()
+        returncode = self._transport.get_returncode()
+        while self._waiters:
+            waiter = self._waiters.popleft()
+            waiter.set_result(returncode)
+
+
+class Process:
+    def __init__(self, transport, protocol, loop):
+        self._transport = transport
+        self._protocol = protocol
+        self._loop = loop
+        self.stdin = protocol.stdin
+        self.stdout = protocol.stdout
+        self.stderr = protocol.stderr
+        self.pid = transport.get_pid()
+
+    @property
+    def returncode(self):
+        return self._transport.get_returncode()
+
+    @tasks.coroutine
+    def wait(self):
+        """Wait until the process exit and return the process return code."""
+        returncode = self._transport.get_returncode()
+        if returncode is not None:
+            return returncode
+
+        waiter = futures.Future(loop=self._loop)
+        self._protocol._waiters.append(waiter)
+        yield from waiter
+        return waiter.result()
+
+    def get_subprocess(self):
+        return self._transport.get_extra_info('subprocess')
+
+    def _check_alive(self):
+        if self._transport.get_returncode() is not None:
+            raise ProcessLookupError()
+
+    def send_signal(self, signal):
+        self._check_alive()
+        self._transport.send_signal(signal)
+
+    def terminate(self):
+        self._check_alive()
+        self._transport.terminate()
+
+    def kill(self):
+        self._check_alive()
+        self._transport.kill()
+
+    @tasks.coroutine
+    def _feed_stdin(self, input):
+        self.stdin.write(input)
+        yield from self.stdin.drain()
+        self.stdin.close()
+
+    @tasks.coroutine
+    def _noop(self):
+        return None
+
+    @tasks.coroutine
+    def _read_stream(self, fd):
+        transport = self._transport.get_pipe_transport(fd)
+        if fd == 2:
+            stream = self.stderr
+        else:
+            assert fd == 1
+            stream = self.stdout
+        output = yield from stream.read()
+        transport.close()
+        return output
+
+    @tasks.coroutine
+    def communicate(self, input=None):
+        loop = self._transport._loop
+        if input:
+            stdin = self._feed_stdin(input)
+        else:
+            stdin = self._noop()
+        if self.stdout is not None:
+            stdout = self._read_stream(1)
+        else:
+            stdout = self._noop()
+        if self.stderr is not None:
+            stderr = self._read_stream(2)
+        else:
+            stderr = self._noop()
+        stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
+                                                        loop=loop)
+        yield from self.wait()
+        return (stdout, stderr)
+
+
+@tasks.coroutine
+def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
+                            loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+    if loop is None:
+        loop = events.get_event_loop()
+    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+                                                        loop=loop)
+    transport, protocol = yield from loop.subprocess_shell(
+                                            protocol_factory,
+                                            cmd, stdin=stdin, stdout=stdout,
+                                            stderr=stderr, **kwds)
+    yield from protocol.waiter
+    return Process(transport, protocol, loop)
+
+@tasks.coroutine
+def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
+                           loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+    if loop is None:
+        loop = events.get_event_loop()
+    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+                                                        loop=loop)
+    transport, protocol = yield from loop.subprocess_exec(
+                                            protocol_factory,
+                                            *args, stdin=stdin, stdout=stdout,
+                                            stderr=stderr, **kwds)
+    yield from protocol.waiter
+    return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 98fddde..3ce2db8 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -21,16 +21,11 @@
 from .log import logger
 
 
-__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
+__all__ = ['SelectorEventLoop',
            'AbstractChildWatcher', 'SafeChildWatcher',
            'FastChildWatcher', 'DefaultEventLoopPolicy',
            ]
 
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
 if sys.platform == 'win32':  # pragma: no cover
     raise ImportError('Signals are not really supported on Windows')