bpo-34638: Store a weak reference to stream reader to break strong references loop (GH-9201)

Store a weak reference to stream readerfor breaking strong references

It breaks the strong reference loop between reader and protocol and allows to detect and close the socket if the stream is deleted (garbage collected)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 9dab49b..e7fb22e 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -3,6 +3,8 @@
     'open_connection', 'start_server')
 
 import socket
+import sys
+import weakref
 
 if hasattr(socket, 'AF_UNIX'):
     __all__ += ('open_unix_connection', 'start_unix_server')
@@ -10,6 +12,7 @@
 from . import coroutines
 from . import events
 from . import exceptions
+from . import format_helpers
 from . import protocols
 from .log import logger
 from .tasks import sleep
@@ -186,46 +189,106 @@
     call inappropriate methods of the protocol.)
     """
 
+    _source_traceback = None
+
     def __init__(self, stream_reader, client_connected_cb=None, loop=None):
         super().__init__(loop=loop)
-        self._stream_reader = stream_reader
+        if stream_reader is not None:
+            self._stream_reader_wr = weakref.ref(stream_reader,
+                                                 self._on_reader_gc)
+            self._source_traceback = stream_reader._source_traceback
+        else:
+            self._stream_reader_wr = None
+        if client_connected_cb is not None:
+            # This is a stream created by the `create_server()` function.
+            # Keep a strong reference to the reader until a connection
+            # is established.
+            self._strong_reader = stream_reader
+        self._reject_connection = False
         self._stream_writer = None
+        self._transport = None
         self._client_connected_cb = client_connected_cb
         self._over_ssl = False
         self._closed = self._loop.create_future()
 
+    def _on_reader_gc(self, wr):
+        transport = self._transport
+        if transport is not None:
+            # connection_made was called
+            context = {
+                'message': ('An open stream object is being garbage '
+                            'collected; call "stream.close()" explicitly.')
+            }
+            if self._source_traceback:
+                context['source_traceback'] = self._source_traceback
+            self._loop.call_exception_handler(context)
+            transport.abort()
+        else:
+            self._reject_connection = True
+        self._stream_reader_wr = None
+
+    def _untrack_reader(self):
+        self._stream_reader_wr = None
+
+    @property
+    def _stream_reader(self):
+        if self._stream_reader_wr is None:
+            return None
+        return self._stream_reader_wr()
+
     def connection_made(self, transport):
-        self._stream_reader.set_transport(transport)
+        if self._reject_connection:
+            context = {
+                'message': ('An open stream was garbage collected prior to '
+                            'establishing network connection; '
+                            'call "stream.close()" explicitly.')
+            }
+            if self._source_traceback:
+                context['source_traceback'] = self._source_traceback
+            self._loop.call_exception_handler(context)
+            transport.abort()
+            return
+        self._transport = transport
+        reader = self._stream_reader
+        if reader is not None:
+            reader.set_transport(transport)
         self._over_ssl = transport.get_extra_info('sslcontext') is not None
         if self._client_connected_cb is not None:
             self._stream_writer = StreamWriter(transport, self,
-                                               self._stream_reader,
+                                               reader,
                                                self._loop)
-            res = self._client_connected_cb(self._stream_reader,
+            res = self._client_connected_cb(reader,
                                             self._stream_writer)
             if coroutines.iscoroutine(res):
                 self._loop.create_task(res)
+            self._strong_reader = None
 
     def connection_lost(self, exc):
-        if self._stream_reader is not None:
+        reader = self._stream_reader
+        if reader is not None:
             if exc is None:
-                self._stream_reader.feed_eof()
+                reader.feed_eof()
             else:
-                self._stream_reader.set_exception(exc)
+                reader.set_exception(exc)
         if not self._closed.done():
             if exc is None:
                 self._closed.set_result(None)
             else:
                 self._closed.set_exception(exc)
         super().connection_lost(exc)
-        self._stream_reader = None
+        self._stream_reader_wr = None
         self._stream_writer = None
+        self._transport = None
 
     def data_received(self, data):
-        self._stream_reader.feed_data(data)
+        reader = self._stream_reader
+        if reader is not None:
+            reader.feed_data(data)
 
     def eof_received(self):
-        self._stream_reader.feed_eof()
+        reader = self._stream_reader
+        if reader is not None:
+            reader.feed_eof()
         if self._over_ssl:
             # Prevent a warning in SSLProtocol.eof_received:
             # "returning true from eof_received()
@@ -282,6 +345,9 @@
         return self._transport.can_write_eof()
 
     def close(self):
+        # a reader can be garbage collected
+        # after connection closing
+        self._protocol._untrack_reader()
         return self._transport.close()
 
     def is_closing(self):
@@ -318,6 +384,8 @@
 
 class StreamReader:
 
+    _source_traceback = None
+
     def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
         # The line length limit is  a security feature;
         # it also doubles as half the buffer limit.
@@ -336,6 +404,9 @@
         self._exception = None
         self._transport = None
         self._paused = False
+        if self._loop.get_debug():
+            self._source_traceback = format_helpers.extract_stack(
+                sys._getframe(1))
 
     def __repr__(self):
         info = ['StreamReader']
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index 90fc00d..c86de3d 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -36,6 +36,11 @@
             info.append(f'stderr={self.stderr!r}')
         return '<{}>'.format(' '.join(info))
 
+    def _untrack_reader(self):
+        # StreamWriter.close() expects the protocol
+        # to have this method defined.
+        pass
+
     def connection_made(self, transport):
         self._transport = transport