bpo-44011: New asyncio ssl implementation (#17975)

diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index f789635..e54ee30 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -273,7 +273,7 @@ async def restore(self):
 class Server(events.AbstractServer):
 
     def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
-                 ssl_handshake_timeout):
+                 ssl_handshake_timeout, ssl_shutdown_timeout=None):
         self._loop = loop
         self._sockets = sockets
         self._active_count = 0
@@ -282,6 +282,7 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
         self._backlog = backlog
         self._ssl_context = ssl_context
         self._ssl_handshake_timeout = ssl_handshake_timeout
+        self._ssl_shutdown_timeout = ssl_shutdown_timeout
         self._serving = False
         self._serving_forever_fut = None
 
@@ -313,7 +314,8 @@ def _start_serving(self):
             sock.listen(self._backlog)
             self._loop._start_serving(
                 self._protocol_factory, sock, self._ssl_context,
-                self, self._backlog, self._ssl_handshake_timeout)
+                self, self._backlog, self._ssl_handshake_timeout,
+                self._ssl_shutdown_timeout)
 
     def get_loop(self):
         return self._loop
@@ -467,6 +469,7 @@ def _make_ssl_transport(
             *, server_side=False, server_hostname=None,
             extra=None, server=None,
             ssl_handshake_timeout=None,
+            ssl_shutdown_timeout=None,
             call_connection_made=True):
         """Create SSL transport."""
         raise NotImplementedError
@@ -969,6 +972,7 @@ async def create_connection(
             proto=0, flags=0, sock=None,
             local_addr=None, server_hostname=None,
             ssl_handshake_timeout=None,
+            ssl_shutdown_timeout=None,
             happy_eyeballs_delay=None, interleave=None):
         """Connect to a TCP server.
 
@@ -1004,6 +1008,10 @@ async def create_connection(
             raise ValueError(
                 'ssl_handshake_timeout is only meaningful with ssl')
 
+        if ssl_shutdown_timeout is not None and not ssl:
+            raise ValueError(
+                'ssl_shutdown_timeout is only meaningful with ssl')
+
         if happy_eyeballs_delay is not None and interleave is None:
             # If using happy eyeballs, default to interleave addresses by family
             interleave = 1
@@ -1079,7 +1087,8 @@ async def create_connection(
 
         transport, protocol = await self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname,
-            ssl_handshake_timeout=ssl_handshake_timeout)
+            ssl_handshake_timeout=ssl_handshake_timeout,
+            ssl_shutdown_timeout=ssl_shutdown_timeout)
         if self._debug:
             # Get the socket from the transport because SSL transport closes
             # the old socket and creates a new SSL socket
@@ -1091,7 +1100,8 @@ async def create_connection(
     async def _create_connection_transport(
             self, sock, protocol_factory, ssl,
             server_hostname, server_side=False,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            ssl_shutdown_timeout=None):
 
         sock.setblocking(False)
 
@@ -1102,7 +1112,8 @@ async def _create_connection_transport(
             transport = self._make_ssl_transport(
                 sock, protocol, sslcontext, waiter,
                 server_side=server_side, server_hostname=server_hostname,
-                ssl_handshake_timeout=ssl_handshake_timeout)
+                ssl_handshake_timeout=ssl_handshake_timeout,
+                ssl_shutdown_timeout=ssl_shutdown_timeout)
         else:
             transport = self._make_socket_transport(sock, protocol, waiter)
 
@@ -1193,7 +1204,8 @@ async def _sendfile_fallback(self, transp, file, offset, count):
     async def start_tls(self, transport, protocol, sslcontext, *,
                         server_side=False,
                         server_hostname=None,
-                        ssl_handshake_timeout=None):
+                        ssl_handshake_timeout=None,
+                        ssl_shutdown_timeout=None):
         """Upgrade transport to TLS.
 
         Return a new transport that *protocol* should start using
@@ -1216,6 +1228,7 @@ async def start_tls(self, transport, protocol, sslcontext, *,
             self, protocol, sslcontext, waiter,
             server_side, server_hostname,
             ssl_handshake_timeout=ssl_handshake_timeout,
+            ssl_shutdown_timeout=ssl_shutdown_timeout,
             call_connection_made=False)
 
         # Pause early so that "ssl_protocol.data_received()" doesn't
@@ -1414,6 +1427,7 @@ async def create_server(
             reuse_address=None,
             reuse_port=None,
             ssl_handshake_timeout=None,
+            ssl_shutdown_timeout=None,
             start_serving=True):
         """Create a TCP server.
 
@@ -1437,6 +1451,10 @@ async def create_server(
             raise ValueError(
                 'ssl_handshake_timeout is only meaningful with ssl')
 
+        if ssl_shutdown_timeout is not None and ssl is None:
+            raise ValueError(
+                'ssl_shutdown_timeout is only meaningful with ssl')
+
         if host is not None or port is not None:
             if sock is not None:
                 raise ValueError(
@@ -1509,7 +1527,8 @@ async def create_server(
             sock.setblocking(False)
 
         server = Server(self, sockets, protocol_factory,
-                        ssl, backlog, ssl_handshake_timeout)
+                        ssl, backlog, ssl_handshake_timeout,
+                        ssl_shutdown_timeout)
         if start_serving:
             server._start_serving()
             # Skip one loop iteration so that all 'loop.add_reader'
@@ -1523,7 +1542,8 @@ async def create_server(
     async def connect_accepted_socket(
             self, protocol_factory, sock,
             *, ssl=None,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            ssl_shutdown_timeout=None):
         if sock.type != socket.SOCK_STREAM:
             raise ValueError(f'A Stream Socket was expected, got {sock!r}')
 
@@ -1531,9 +1551,14 @@ async def connect_accepted_socket(
             raise ValueError(
                 'ssl_handshake_timeout is only meaningful with ssl')
 
+        if ssl_shutdown_timeout is not None and not ssl:
+            raise ValueError(
+                'ssl_shutdown_timeout is only meaningful with ssl')
+
         transport, protocol = await self._create_connection_transport(
             sock, protocol_factory, ssl, '', server_side=True,
-            ssl_handshake_timeout=ssl_handshake_timeout)
+            ssl_handshake_timeout=ssl_handshake_timeout,
+            ssl_shutdown_timeout=ssl_shutdown_timeout)
         if self._debug:
             # Get the socket from the transport because SSL transport closes
             # the old socket and creates a new SSL socket