Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module

* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
  for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
  subprocess.Popen class:

  - pid, returncode, stdin, stdout and stderr attributes
  - communicate(), wait(), send_signal(), terminate() and kill() methods

* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
  and unix_events, to not be confused with the symbols with the same name of
  subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
  of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
  the write buffer size is greater than the high water mark (64 KB by default)
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index b6b3be2..fb67155 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -29,6 +29,7 @@
         self._buffer = None  # None or bytearray.
         self._read_fut = None
         self._write_fut = None
+        self._pending_write = 0
         self._conn_lost = 0
         self._closing = False  # Set when close() called.
         self._eof_written = False
@@ -68,6 +69,7 @@
         if self._read_fut:
             self._read_fut.cancel()
         self._write_fut = self._read_fut = None
+        self._pending_write = 0
         self._buffer = None
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -128,11 +130,10 @@
         self._low_water = low
 
     def get_write_buffer_size(self):
-        # NOTE: This doesn't take into account data already passed to
-        # send() even if send() hasn't finished yet.
-        if not self._buffer:
-            return 0
-        return len(self._buffer)
+        size = self._pending_write
+        if self._buffer is not None:
+            size += len(self._buffer)
+        return size
 
 
 class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
@@ -206,7 +207,7 @@
 
 
 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
-                                  transports.WriteTransport):
+                                      transports.WriteTransport):
     """Transport for write pipes."""
 
     def write(self, data):
@@ -252,6 +253,7 @@
         try:
             assert f is self._write_fut
             self._write_fut = None
+            self._pending_write = 0
             if f:
                 f.result()
             if data is None:
@@ -262,15 +264,21 @@
                     self._loop.call_soon(self._call_connection_lost, None)
                 if self._eof_written:
                     self._sock.shutdown(socket.SHUT_WR)
+                # Now that we've reduced the buffer size, tell the
+                # protocol to resume writing if it was paused.  Note that
+                # we do this last since the callback is called immediately
+                # and it may add more data to the buffer (even causing the
+                # protocol to be paused again).
+                self._maybe_resume_protocol()
             else:
                 self._write_fut = self._loop._proactor.send(self._sock, data)
-                self._write_fut.add_done_callback(self._loop_writing)
-            # Now that we've reduced the buffer size, tell the
-            # protocol to resume writing if it was paused.  Note that
-            # we do this last since the callback is called immediately
-            # and it may add more data to the buffer (even causing the
-            # protocol to be paused again).
-            self._maybe_resume_protocol()
+                if not self._write_fut.done():
+                    assert self._pending_write == 0
+                    self._pending_write = len(data)
+                    self._write_fut.add_done_callback(self._loop_writing)
+                    self._maybe_pause_protocol()
+                else:
+                    self._write_fut.add_done_callback(self._loop_writing)
         except ConnectionResetError as exc:
             self._force_close(exc)
         except OSError as exc: