bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)

See https://bugs.python.org/issue38242 for more details
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index feebd22..471e6e9 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -18,12 +18,19 @@
     import asyncio
 
     async def tcp_echo_client(message):
-        async with asyncio.connect('127.0.0.1', 8888) as stream:
-            print(f'Send: {message!r}')
-            await stream.write(message.encode())
+        reader, writer = await asyncio.open_connection(
+            '127.0.0.1', 8888)
 
-            data = await stream.read(100)
-            print(f'Received: {data.decode()!r}')
+        print(f'Send: {message!r}')
+        writer.write(message.encode())
+        await writer.drain()
+
+        data = await reader.read(100)
+        print(f'Received: {data.decode()!r}')
+
+        print('Close the connection')
+        writer.close()
+        await writer.wait_closed()
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -37,31 +44,6 @@
 and work with streams:
 
 
-.. coroutinefunction:: connect(host=None, port=None, \*, \
-                               limit=2**16, ssl=None, family=0, \
-                               proto=0, flags=0, sock=None, local_addr=None, \
-                               server_hostname=None, ssl_handshake_timeout=None, \
-                               happy_eyeballs_delay=None, interleave=None)
-
-   Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
-   object of mode :attr:`StreamMode.READWRITE`.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the *limit* is set to 64 KiB.
-
-   The rest of the arguments are passed directly to :meth:`loop.create_connection`.
-
-   The function can be used with ``await`` to get a connected stream::
-
-       stream = await asyncio.connect('127.0.0.1', 8888)
-
-   The function can also be used as an async context manager::
-
-       async with asyncio.connect('127.0.0.1', 8888) as stream:
-           ...
-
-   .. versionadded:: 3.8
-
 .. coroutinefunction:: open_connection(host=None, port=None, \*, \
                           loop=None, limit=None, ssl=None, family=0, \
                           proto=0, flags=0, sock=None, local_addr=None, \
@@ -87,12 +69,8 @@
 
       The *ssl_handshake_timeout* parameter.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      `open_connection()` is deprecated in favor of :func:`connect`.
-
 .. coroutinefunction:: start_server(client_connected_cb, host=None, \
-                          port=None, \*, loop=None, limit=2**16, \
+                          port=None, \*, loop=None, limit=None, \
                           family=socket.AF_UNSPEC, \
                           flags=socket.AI_PASSIVE, sock=None, \
                           backlog=100, ssl=None, reuse_address=None, \
@@ -124,60 +102,9 @@
 
       The *ssl_handshake_timeout* and *start_serving* parameters.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      `start_server()` is deprecated if favor of :class:`StreamServer`
-
-.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
-
-   Takes a :term:`file-like object <file object>` *pipe* to return a
-   :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
-   similar API of :class:`StreamReader`. It can also be used as an async context manager.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the limit is set to 64 KiB.
-
-   .. versionadded:: 3.8
-
-.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
-
-   Takes a :term:`file-like object <file object>` *pipe* to return a
-   :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
-   similar API of :class:`StreamWriter`. It can also be used as an async context manager.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the limit is set to 64 KiB.
-
-   .. versionadded:: 3.8
 
 .. rubric:: Unix Sockets
 
-.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
-                           sock=None, server_hostname=None, \
-                           ssl_handshake_timeout=None)
-
-   Establish a Unix socket connection to socket with *path* address and
-   return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
-   that can be used as a reader and a writer.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the *limit* is set to 64 KiB.
-
-   The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
-
-   The function can be used with ``await`` to get a connected stream::
-
-       stream = await asyncio.connect_unix('/tmp/example.sock')
-
-   The function can also be used as an async context manager::
-
-       async with asyncio.connect_unix('/tmp/example.sock') as stream:
-           ...
-
-   .. availability:: Unix.
-
-   .. versionadded:: 3.8
-
 .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
                         limit=None, ssl=None, sock=None, \
                         server_hostname=None, ssl_handshake_timeout=None)
@@ -199,10 +126,6 @@
 
       The *path* parameter can now be a :term:`path-like object`
 
-   .. deprecated-removed:: 3.8 3.10
-
-      ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
-
 
 .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
                           \*, loop=None, limit=None, sock=None, \
@@ -225,349 +148,6 @@
 
       The *path* parameter can now be a :term:`path-like object`.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
-
-
----------
-
-StreamServer
-============
-
-.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
-                        limit=2**16, family=socket.AF_UNSPEC, \
-                        flags=socket.AI_PASSIVE, sock=None, backlog=100, \
-                        ssl=None, reuse_address=None, reuse_port=None, \
-                        ssl_handshake_timeout=None, shutdown_timeout=60)
-
-   The *client_connected_cb* callback is called whenever a new client
-   connection is established.  It receives a :class:`Stream` object of the
-   mode :attr:`StreamMode.READWRITE`.
-
-   *client_connected_cb* can be a plain callable or a
-   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
-   it will be automatically scheduled as a :class:`Task`.
-
-   *limit* determines the buffer size limit used by the
-   returned :class:`Stream` instance.  By default the *limit*
-   is set to 64 KiB.
-
-   The rest of the arguments are passed directly to
-   :meth:`loop.create_server`.
-
-   .. coroutinemethod:: start_serving()
-
-      Binds to the given host and port to start the server.
-
-   .. coroutinemethod:: serve_forever()
-
-      Start accepting connections until the coroutine is cancelled.
-      Cancellation of ``serve_forever`` task causes the server
-      to be closed.
-
-      This method can be called if the server is already accepting
-      connections.  Only one ``serve_forever`` task can exist per
-      one *Server* object.
-
-   .. method:: is_serving()
-
-      Returns ``True`` if the server is bound and currently serving.
-
-   .. method:: bind()
-
-      Bind the server to the given *host* and *port*. This method is
-      automatically called during ``__aenter__`` when :class:`StreamServer` is
-      used as an async context manager.
-
-   .. method:: is_bound()
-
-      Return ``True`` if the server is bound.
-
-   .. coroutinemethod:: abort()
-
-      Closes the connection and cancels all pending tasks.
-
-   .. coroutinemethod:: close()
-
-      Closes the connection. This method is automatically called during
-      ``__aexit__`` when :class:`StreamServer` is used as an async context
-      manager.
-
-   .. attribute:: sockets
-
-      Returns a tuple of socket objects the server is bound to.
-
-   .. versionadded:: 3.8
-
-
-UnixStreamServer
-================
-
-.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
-                            limit=2**16, sock=None, backlog=100, \
-                            ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
-
-   The *client_connected_cb* callback is called whenever a new client
-   connection is established.  It receives a :class:`Stream` object of the
-   mode :attr:`StreamMode.READWRITE`.
-
-   *client_connected_cb* can be a plain callable or a
-   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
-   it will be automatically scheduled as a :class:`Task`.
-
-   *limit* determines the buffer size limit used by the
-   returned :class:`Stream` instance.  By default the *limit*
-   is set to 64 KiB.
-
-   The rest of the arguments are passed directly to
-   :meth:`loop.create_unix_server`.
-
-   .. coroutinemethod:: start_serving()
-
-      Binds to the given host and port to start the server.
-
-   .. method:: is_serving()
-
-      Returns ``True`` if the server is bound and currently serving.
-
-   .. method:: bind()
-
-      Bind the server to the given *host* and *port*. This method is
-      automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
-      used as an async context manager.
-
-   .. method:: is_bound()
-
-      Return ``True`` if the server is bound.
-
-   .. coroutinemethod:: abort()
-
-      Closes the connection and cancels all pending tasks.
-
-   .. coroutinemethod:: close()
-
-      Closes the connection. This method is automatically called during
-      ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
-      manager.
-
-   .. attribute:: sockets
-
-      Returns a tuple of socket objects the server is bound to.
-
-   .. availability:: Unix.
-
-   .. versionadded:: 3.8
-
-Stream
-======
-
-.. class:: Stream
-
-   Represents a Stream object that provides APIs to read and write data
-   to the IO stream . It includes the API provided by :class:`StreamReader`
-   and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator`
-   where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when
-   :meth:`readline` returns empty data.
-
-   Do not instantiate *Stream* objects directly; use API like :func:`connect`
-   and :class:`StreamServer` instead.
-
-   .. versionadded:: 3.8
-
-   .. attribute:: mode
-
-      Returns the mode of the stream which is a :class:`StreamMode` value. It could
-      be one of the below:
-
-      * :attr:`StreamMode.READ` - Connection can receive data.
-      * :attr:`StreamMode.WRITE` - Connection can send data.
-      * :attr:`StreamMode.READWRITE` - Connection can send and receive data.
-
-   .. coroutinemethod:: abort()
-
-      Aborts the connection immediately, without waiting for the send buffer to drain.
-
-   .. method:: at_eof()
-
-      Return ``True`` if the buffer is empty.
-
-   .. method:: can_write_eof()
-
-      Return *True* if the underlying transport supports
-      the :meth:`write_eof` method, *False* otherwise.
-
-   .. method:: close()
-
-      The method closes the stream and the underlying socket.
-
-      It is possible to directly await on the `close()` method::
-
-         await stream.close()
-
-      The ``await`` pauses the current coroutine until the stream and the underlying
-      socket are closed (and SSL shutdown is performed for a secure connection).
-
-   .. coroutinemethod:: drain()
-
-      Wait until it is appropriate to resume writing to the stream.
-      Example::
-
-          stream.write(data)
-          await stream.drain()
-
-      This is a flow control method that interacts with the underlying
-      IO write buffer.  When the size of the buffer reaches
-      the high watermark, *drain()* blocks until the size of the
-      buffer is drained down to the low watermark and writing can
-      be resumed.  When there is nothing to wait for, the :meth:`drain`
-      returns immediately.
-
-      .. deprecated:: 3.8
-
-      It is recommended to directly await on the `write()` method instead::
-
-         await stream.write(data)
-
-   .. method:: get_extra_info(name, default=None)
-
-      Access optional transport information; see
-      :meth:`BaseTransport.get_extra_info` for details.
-
-   .. method:: is_closing()
-
-      Return ``True`` if the stream is closed or in the process of
-      being closed.
-
-   .. coroutinemethod:: read(n=-1)
-
-      Read up to *n* bytes.  If *n* is not provided, or set to ``-1``,
-      read until EOF and return all read bytes.
-
-      If EOF was received and the internal buffer is empty,
-      return an empty ``bytes`` object.
-
-   .. coroutinemethod:: readexactly(n)
-
-      Read exactly *n* bytes.
-
-      Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
-      can be read.  Use the :attr:`IncompleteReadError.partial`
-      attribute to get the partially read data.
-
-   .. coroutinemethod:: readline()
-
-      Read one line, where "line" is a sequence of bytes
-      ending with ``\n``.
-
-      If EOF is received and ``\n`` was not found, the method
-      returns partially read data.
-
-      If EOF is received and the internal buffer is empty,
-      return an empty ``bytes`` object.
-
-   .. coroutinemethod:: readuntil(separator=b'\\n')
-
-      Read data from the stream until *separator* is found.
-
-      On success, the data and separator will be removed from the
-      internal buffer (consumed). Returned data will include the
-      separator at the end.
-
-      If the amount of data read exceeds the configured stream limit, a
-      :exc:`LimitOverrunError` exception is raised, and the data
-      is left in the internal buffer and can be read again.
-
-      If EOF is reached before the complete separator is found,
-      an :exc:`IncompleteReadError` exception is raised, and the internal
-      buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
-      may contain a portion of the separator.
-
-   .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True)
-
-      Sends a *file* over the stream using an optimized syscall if available.
-
-      For other parameters meaning please see :meth:`AbstractEventloop.sendfile`.
-
-   .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \
-                                  ssl_handshake_timeout=None)
-
-      Upgrades the existing transport-based connection to TLS.
-
-      For other parameters meaning please see :meth:`AbstractEventloop.start_tls`.
-
-   .. coroutinemethod:: wait_closed()
-
-      Wait until the stream is closed.
-
-      Should be called after :meth:`close` to wait until the underlying
-      connection is closed.
-
-   .. coroutinemethod:: write(data)
-
-      Write *data* to the underlying socket; wait until the data is sent, e.g.::
-
-         await stream.write(data)
-
-   .. method:: write(data)
-
-      The method attempts to write the *data* to the underlying socket immediately.
-      If that fails, the data is queued in an internal write buffer until it can be
-      sent. :meth:`drain` can be used to flush the underlying buffer once writing is
-      available::
-
-         stream.write(data)
-         await stream.drain()
-
-      .. deprecated:: 3.8
-
-      It is recommended to directly await on the `write()` method instead::
-
-          await stream.write(data)
-
-   .. method:: writelines(data)
-
-      The method writes a list (or any iterable) of bytes to the underlying socket
-      immediately.
-      If that fails, the data is queued in an internal write buffer until it can be
-      sent.
-
-      It is possible to directly await on the `writelines()` method::
-
-         await stream.writelines(lines)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-   .. method:: write_eof()
-
-      Close the write end of the stream after the buffered write
-      data is flushed.
-
-
-StreamMode
-==========
-
-.. class:: StreamMode
-
-   A subclass of :class:`enum.Flag` that defines a set of values that can be
-   used to determine the ``mode`` of :class:`Stream` objects.
-
-   .. data:: READ
-
-   The stream object is readable and provides the API of :class:`StreamReader`.
-
-   .. data:: WRITE
-
-   The stream object is writeable and provides the API of :class:`StreamWriter`.
-
-   .. data:: READWRITE
-
-   The stream object is readable and writeable and provides the API of both
-   :class:`StreamReader` and :class:`StreamWriter`.
-
-  .. versionadded:: 3.8
-
 
 StreamReader
 ============
@@ -629,7 +209,8 @@
 
    .. method:: at_eof()
 
-      Return ``True`` if the buffer is empty.
+      Return ``True`` if the buffer is empty and :meth:`feed_eof`
+      was called.
 
 
 StreamWriter
@@ -650,22 +231,11 @@
       If that fails, the data is queued in an internal write buffer until it can be
       sent.
 
-      Starting with Python 3.8, it is possible to directly await on the `write()`
-      method::
-
-         await stream.write(data)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``drain()`` method::
 
          stream.write(data)
          await stream.drain()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.write(...)`` syntax.
-
    .. method:: writelines(data)
 
       The method writes a list (or any iterable) of bytes to the underlying socket
@@ -673,42 +243,20 @@
       If that fails, the data is queued in an internal write buffer until it can be
       sent.
 
-      Starting with Python 3.8, it is possible to directly await on the `writelines()`
-      method::
-
-         await stream.writelines(lines)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``drain()`` method::
 
          stream.writelines(lines)
          await stream.drain()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.writelines()`` syntax.
-
    .. method:: close()
 
       The method closes the stream and the underlying socket.
 
-      Starting with Python 3.8, it is possible to directly await on the `close()`
-      method::
-
-         await stream.close()
-
-      The ``await`` pauses the current coroutine until the stream and the underlying
-      socket are closed (and SSL shutdown is performed for a secure connection).
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``wait_closed()`` method::
 
          stream.close()
          await stream.wait_closed()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.close()`` syntax.
-
    .. method:: can_write_eof()
 
       Return *True* if the underlying transport supports
@@ -768,17 +316,22 @@
 TCP echo client using streams
 -----------------------------
 
-TCP echo client using the :func:`asyncio.connect` function::
+TCP echo client using the :func:`asyncio.open_connection` function::
 
     import asyncio
 
     async def tcp_echo_client(message):
-        async with asyncio.connect('127.0.0.1', 8888) as stream:
-            print(f'Send: {message!r}')
-            await stream.write(message.encode())
+        reader, writer = await asyncio.open_connection(
+            '127.0.0.1', 8888)
 
-            data = await stream.read(100)
-            print(f'Received: {data.decode()!r}')
+        print(f'Send: {message!r}')
+        writer.write(message.encode())
+
+        data = await reader.read(100)
+        print(f'Received: {data.decode()!r}')
+
+        print('Close the connection')
+        writer.close()
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -794,28 +347,32 @@
 TCP echo server using streams
 -----------------------------
 
-TCP echo server using the :class:`asyncio.StreamServer` class::
+TCP echo server using the :func:`asyncio.start_server` function::
 
     import asyncio
 
-    async def handle_echo(stream):
-        data = await stream.read(100)
+    async def handle_echo(reader, writer):
+        data = await reader.read(100)
         message = data.decode()
-        addr = stream.get_extra_info('peername')
+        addr = writer.get_extra_info('peername')
 
         print(f"Received {message!r} from {addr!r}")
 
         print(f"Send: {message!r}")
-        await stream.write(data)
+        writer.write(data)
+        await writer.drain()
 
         print("Close the connection")
-        await stream.close()
+        writer.close()
 
     async def main():
-        async with asyncio.StreamServer(
-                handle_echo, '127.0.0.1', 8888) as server:
-            addr = server.sockets[0].getsockname()
-            print(f'Serving on {addr}')
+        server = await asyncio.start_server(
+            handle_echo, '127.0.0.1', 8888)
+
+        addr = server.sockets[0].getsockname()
+        print(f'Serving on {addr}')
+
+        async with server:
             await server.serve_forever()
 
     asyncio.run(main())
@@ -839,9 +396,11 @@
     async def print_http_headers(url):
         url = urllib.parse.urlsplit(url)
         if url.scheme == 'https':
-            stream = await asyncio.connect(url.hostname, 443, ssl=True)
+            reader, writer = await asyncio.open_connection(
+                url.hostname, 443, ssl=True)
         else:
-            stream = await asyncio.connect(url.hostname, 80)
+            reader, writer = await asyncio.open_connection(
+                url.hostname, 80)
 
         query = (
             f"HEAD {url.path or '/'} HTTP/1.0\r\n"
@@ -849,14 +408,18 @@
             f"\r\n"
         )
 
-        stream.write(query.encode('latin-1'))
-        while (line := await stream.readline()):
+        writer.write(query.encode('latin-1'))
+        while True:
+            line = await reader.readline()
+            if not line:
+                break
+
             line = line.decode('latin1').rstrip()
             if line:
                 print(f'HTTP header> {line}')
 
         # Ignore the body, close the socket
-        await stream.close()
+        writer.close()
 
     url = sys.argv[1]
     asyncio.run(print_http_headers(url))
@@ -877,7 +440,7 @@
 ------------------------------------------------------
 
 Coroutine waiting until a socket receives data using the
-:func:`asyncio.connect` function::
+:func:`open_connection` function::
 
     import asyncio
     import socket
@@ -891,15 +454,17 @@
         rsock, wsock = socket.socketpair()
 
         # Register the open socket to wait for data.
-        async with asyncio.connect(sock=rsock) as stream:
-            # Simulate the reception of data from the network
-            loop.call_soon(wsock.send, 'abc'.encode())
+        reader, writer = await asyncio.open_connection(sock=rsock)
 
-            # Wait for data
-            data = await stream.read(100)
+        # Simulate the reception of data from the network
+        loop.call_soon(wsock.send, 'abc'.encode())
 
-            # Got data, we are done: close the socket
-            print("Received:", data.decode())
+        # Wait for data
+        data = await reader.read(100)
+
+        # Got data, we are done: close the socket
+        print("Received:", data.decode())
+        writer.close()
 
         # Close the second socket
         wsock.close()