bpo-36801: Fix waiting in StreamWriter.drain for closing SSL transport (GH-13098)



https://bugs.python.org/issue36801
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index c9b1f32..79adf02 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -199,6 +199,9 @@
         self._drain_waiter = waiter
         await waiter
 
+    def _get_close_waiter(self, stream):
+        raise NotImplementedError
+
 
 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
     """Helper class to adapt between Protocol and StreamReader.
@@ -315,6 +318,9 @@
             return False
         return True
 
+    def _get_close_waiter(self, stream):
+        return self._closed
+
     def __del__(self):
         # Prevent reports about unhandled exceptions.
         # Better than self._closed._log_traceback = False hack
@@ -376,7 +382,7 @@
         return self._transport.is_closing()
 
     async def wait_closed(self):
-        await self._protocol._closed
+        await self._protocol._get_close_waiter(self)
 
     def get_extra_info(self, name, default=None):
         return self._transport.get_extra_info(name, default)
@@ -394,13 +400,12 @@
             if exc is not None:
                 raise exc
         if self._transport.is_closing():
-            # Yield to the event loop so connection_lost() may be
-            # called.  Without this, _drain_helper() would return
-            # immediately, and code that calls
-            #     write(...); await drain()
-            # in a loop would never call connection_lost(), so it
-            # would not see an error when the socket is closed.
-            await sleep(0, loop=self._loop)
+            # Wait for protocol.connection_lost() call
+            # Raise connection closing error if any,
+            # ConnectionResetError otherwise
+            fut = self._protocol._get_close_waiter(self)
+            await fut
+            raise ConnectionResetError('Connection lost')
         await self._protocol._drain_helper()
 
     async def aclose(self):
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index fa58e1e..d34b611 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -26,6 +26,7 @@
         self._transport = None
         self._process_exited = False
         self._pipe_fds = []
+        self._stdin_closed = self._loop.create_future()
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -80,6 +81,10 @@
             if pipe is not None:
                 pipe.close()
             self.connection_lost(exc)
+            if exc is None:
+                self._stdin_closed.set_result(None)
+            else:
+                self._stdin_closed.set_exception(exc)
             return
         if fd == 1:
             reader = self.stdout
@@ -106,6 +111,10 @@
             self._transport.close()
             self._transport = None
 
+    def _get_close_waiter(self, stream):
+        if stream is self.stdin:
+            return self._stdin_closed
+
 
 class Process:
     def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index c1cc9d7..905141c 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -109,6 +109,29 @@
 
             self._basetest_open_connection_no_loop_ssl(conn_fut)
 
+    @unittest.skipIf(ssl is None, 'No ssl module')
+    def test_drain_on_closed_writer_ssl(self):
+
+        async def inner(httpd):
+            reader, writer = await asyncio.open_connection(
+                *httpd.address,
+                ssl=test_utils.dummy_ssl_context())
+
+            messages = []
+            self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
+            writer.write(b'GET / HTTP/1.0\r\n\r\n')
+            data = await reader.read()
+            self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+
+            writer.close()
+            with self.assertRaises(ConnectionResetError):
+                await writer.drain()
+
+            self.assertEqual(messages, [])
+
+        with test_utils.run_test_server(use_ssl=True) as httpd:
+            self.loop.run_until_complete(inner(httpd))
+
     def _basetest_open_connection_error(self, open_connection_fut):
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
diff --git a/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst b/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst
new file mode 100644
index 0000000..43e51fe
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst
@@ -0,0 +1 @@
+Properly handle SSL connection closing in asyncio StreamWriter.drain() call.