bpo-33654: Support BufferedProtocol in set_protocol() and start_tls() (GH-7130)


In this commit:

* Support BufferedProtocol in set_protocol() and start_tls()
* Fix proactor to cancel readers reliably
* Update tests to be compatible with OpenSSL 1.1.1
* Clarify BufferedProtocol docs
* Bump TLS tests timeouts to 60 seconds; eliminate possible race from start_serving
* Rewrite test_start_tls_server_1
(cherry picked from commit dbf102271fcc316f353c7e0a283811b661d128f2)

Co-authored-by: Yury Selivanov <yury@magic.io>
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 09eb440..a0243f5 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -157,7 +157,6 @@
     futures._get_loop(fut).stop()
 
 
-
 class _SendfileFallbackProtocol(protocols.Protocol):
     def __init__(self, transp):
         if not isinstance(transp, transports._FlowControlMixin):
@@ -304,6 +303,9 @@
 
     async def start_serving(self):
         self._start_serving()
+        # Skip one loop iteration so that all 'loop.add_reader'
+        # go through.
+        await tasks.sleep(0, loop=self._loop)
 
     async def serve_forever(self):
         if self._serving_forever_fut is not None:
@@ -1363,6 +1365,9 @@
                         ssl, backlog, ssl_handshake_timeout)
         if start_serving:
             server._start_serving()
+            # Skip one loop iteration so that all 'loop.add_reader'
+            # go through.
+            await tasks.sleep(0, loop=self)
 
         if self._debug:
             logger.info("%r is serving", server)
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 877dfb0..337ed0f 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -30,7 +30,7 @@
         super().__init__(extra, loop)
         self._set_extra(sock)
         self._sock = sock
-        self._protocol = protocol
+        self.set_protocol(protocol)
         self._server = server
         self._buffer = None  # None or bytearray.
         self._read_fut = None
@@ -159,16 +159,26 @@
 
     def __init__(self, loop, sock, protocol, waiter=None,
                  extra=None, server=None):
+        self._loop_reading_cb = None
+        self._paused = True
         super().__init__(loop, sock, protocol, waiter, extra, server)
-        self._paused = False
+
         self._reschedule_on_resume = False
-
-        if protocols._is_buffered_protocol(protocol):
-            self._loop_reading = self._loop_reading__get_buffer
-        else:
-            self._loop_reading = self._loop_reading__data_received
-
         self._loop.call_soon(self._loop_reading)
+        self._paused = False
+
+    def set_protocol(self, protocol):
+        if isinstance(protocol, protocols.BufferedProtocol):
+            self._loop_reading_cb = self._loop_reading__get_buffer
+        else:
+            self._loop_reading_cb = self._loop_reading__data_received
+
+        super().set_protocol(protocol)
+
+        if self.is_reading():
+            # reset reading callback / buffers / self._read_fut
+            self.pause_reading()
+            self.resume_reading()
 
     def is_reading(self):
         return not self._paused and not self._closing
@@ -179,6 +189,13 @@
         self._paused = True
 
         if self._read_fut is not None and not self._read_fut.done():
+            # TODO: This is an ugly hack to cancel the current read future
+            # *and* avoid potential race conditions, as read cancellation
+            # goes through `future.cancel()` and `loop.call_soon()`.
+            # We then use this special attribute in the reader callback to
+            # exit *immediately* without doing any cleanup/rescheduling.
+            self._read_fut.__asyncio_cancelled_on_pause__ = True
+
             self._read_fut.cancel()
             self._read_fut = None
             self._reschedule_on_resume = True
@@ -210,7 +227,14 @@
         if not keep_open:
             self.close()
 
-    def _loop_reading__data_received(self, fut=None):
+    def _loop_reading(self, fut=None):
+        self._loop_reading_cb(fut)
+
+    def _loop_reading__data_received(self, fut):
+        if (fut is not None and
+                getattr(fut, '__asyncio_cancelled_on_pause__', False)):
+            return
+
         if self._paused:
             self._reschedule_on_resume = True
             return
@@ -253,14 +277,18 @@
             if not self._closing:
                 raise
         else:
-            self._read_fut.add_done_callback(self._loop_reading)
+            self._read_fut.add_done_callback(self._loop_reading__data_received)
         finally:
             if data:
                 self._protocol.data_received(data)
             elif data == b'':
                 self._loop_reading__on_eof()
 
-    def _loop_reading__get_buffer(self, fut=None):
+    def _loop_reading__get_buffer(self, fut):
+        if (fut is not None and
+                getattr(fut, '__asyncio_cancelled_on_pause__', False)):
+            return
+
         if self._paused:
             self._reschedule_on_resume = True
             return
@@ -310,7 +338,9 @@
             return
 
         try:
-            buf = self._protocol.get_buffer()
+            buf = self._protocol.get_buffer(-1)
+            if not len(buf):
+                raise RuntimeError('get_buffer() returned an empty buffer')
         except Exception as exc:
             self._fatal_error(
                 exc, 'Fatal error: protocol.get_buffer() call failed.')
@@ -319,7 +349,7 @@
         try:
             # schedule a new read
             self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
-            self._read_fut.add_done_callback(self._loop_reading)
+            self._read_fut.add_done_callback(self._loop_reading__get_buffer)
         except ConnectionAbortedError as exc:
             if not self._closing:
                 self._fatal_error(exc, 'Fatal read error on pipe transport')
diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py
index dc298a8..b8d2e6b 100644
--- a/Lib/asyncio/protocols.py
+++ b/Lib/asyncio/protocols.py
@@ -130,11 +130,15 @@
     * CL: connection_lost()
     """
 
-    def get_buffer(self):
+    def get_buffer(self, sizehint):
         """Called to allocate a new receive buffer.
 
+        *sizehint* is a recommended minimal size for the returned
+        buffer.  When set to -1, the buffer size can be arbitrary.
+
         Must return an object that implements the
         :ref:`buffer protocol <bufferobjects>`.
+        It is an error to return a zero-sized buffer.
         """
 
     def buffer_updated(self, nbytes):
@@ -185,7 +189,3 @@
 
     def process_exited(self):
         """Called when subprocess has exited."""
-
-
-def _is_buffered_protocol(proto):
-    return hasattr(proto, 'get_buffer') and not hasattr(proto, 'data_received')
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5473c70..116c08d 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -597,8 +597,10 @@
                 self._extra['peername'] = None
         self._sock = sock
         self._sock_fd = sock.fileno()
-        self._protocol = protocol
-        self._protocol_connected = True
+
+        self._protocol_connected = False
+        self.set_protocol(protocol)
+
         self._server = server
         self._buffer = self._buffer_factory()
         self._conn_lost = 0  # Set when call to connection_lost scheduled.
@@ -640,6 +642,7 @@
 
     def set_protocol(self, protocol):
         self._protocol = protocol
+        self._protocol_connected = True
 
     def get_protocol(self):
         return self._protocol
@@ -721,11 +724,7 @@
     def __init__(self, loop, sock, protocol, waiter=None,
                  extra=None, server=None):
 
-        if protocols._is_buffered_protocol(protocol):
-            self._read_ready = self._read_ready__get_buffer
-        else:
-            self._read_ready = self._read_ready__data_received
-
+        self._read_ready_cb = None
         super().__init__(loop, sock, protocol, extra, server)
         self._eof = False
         self._paused = False
@@ -745,6 +744,14 @@
             self._loop.call_soon(futures._set_result_unless_cancelled,
                                  waiter, None)
 
+    def set_protocol(self, protocol):
+        if isinstance(protocol, protocols.BufferedProtocol):
+            self._read_ready_cb = self._read_ready__get_buffer
+        else:
+            self._read_ready_cb = self._read_ready__data_received
+
+        super().set_protocol(protocol)
+
     def is_reading(self):
         return not self._paused and not self._closing
 
@@ -764,12 +771,17 @@
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
+    def _read_ready(self):
+        self._read_ready_cb()
+
     def _read_ready__get_buffer(self):
         if self._conn_lost:
             return
 
         try:
-            buf = self._protocol.get_buffer()
+            buf = self._protocol.get_buffer(-1)
+            if not len(buf):
+                raise RuntimeError('get_buffer() returned an empty buffer')
         except Exception as exc:
             self._fatal_error(
                 exc, 'Fatal error: protocol.get_buffer() call failed.')
diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py
index 2bbf134..2bfa45d 100644
--- a/Lib/asyncio/sslproto.py
+++ b/Lib/asyncio/sslproto.py
@@ -441,6 +441,8 @@
         self._waiter = waiter
         self._loop = loop
         self._app_protocol = app_protocol
+        self._app_protocol_is_buffer = \
+            isinstance(app_protocol, protocols.BufferedProtocol)
         self._app_transport = _SSLProtocolTransport(self._loop, self)
         # _SSLPipe instance (None until the connection is made)
         self._sslpipe = None
@@ -522,7 +524,16 @@
 
         for chunk in appdata:
             if chunk:
-                self._app_protocol.data_received(chunk)
+                try:
+                    if self._app_protocol_is_buffer:
+                        _feed_data_to_bufferred_proto(
+                            self._app_protocol, chunk)
+                    else:
+                        self._app_protocol.data_received(chunk)
+                except Exception as ex:
+                    self._fatal_error(
+                        ex, 'application protocol failed to receive SSL data')
+                    return
             else:
                 self._start_shutdown()
                 break
@@ -709,3 +720,22 @@
                 self._transport.abort()
         finally:
             self._finalize()
+
+
+def _feed_data_to_bufferred_proto(proto, data):
+    data_len = len(data)
+    while data_len:
+        buf = proto.get_buffer(data_len)
+        buf_len = len(buf)
+        if not buf_len:
+            raise RuntimeError('get_buffer() returned an empty buffer')
+
+        if buf_len >= data_len:
+            buf[:data_len] = data
+            proto.buffer_updated(data_len)
+            return
+        else:
+            buf[:buf_len] = data[:buf_len]
+            proto.buffer_updated(buf_len)
+            data = data[buf_len:]
+            data_len = len(data)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 6cac137..639300f 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -20,6 +20,7 @@
 from . import events
 from . import futures
 from . import selector_events
+from . import tasks
 from . import transports
 from .log import logger
 
@@ -308,6 +309,9 @@
                                     ssl, backlog, ssl_handshake_timeout)
         if start_serving:
             server._start_serving()
+            # Skip one loop iteration so that all 'loop.add_reader'
+            # go through.
+            await tasks.sleep(0, loop=self)
 
         return server