bpo-33649: Backport asyncio docs from 'master' to 3.7 (GH-9377)
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index ca7daab..c543aa6 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -2,85 +2,120 @@
.. _asyncio-streams:
-+++++++++++++++++++++++++++++
-Streams (coroutine based API)
-+++++++++++++++++++++++++++++
+=======
+Streams
+=======
-**Source code:** :source:`Lib/asyncio/streams.py`
+Streams are high-level async/await-ready primitives to work with
+network connections. Streams allow sending and receiving data without
+using callbacks or low-level protocols and transports.
-Stream functions
-================
+.. _asyncio_example_stream:
-.. note::
+Here is an example of a TCP echo client written using asyncio
+streams::
- The top-level functions in this module are meant as convenience wrappers
- only; there's really nothing special there, and if they don't do
- exactly what you want, feel free to copy their code.
+ import asyncio
+
+ async def tcp_echo_client(message):
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
+
+ print(f'Send: {message!r}')
+ writer.write(message.encode())
+
+ data = await reader.read(100)
+ print(f'Received: {data.decode()!r}')
+
+ print('Close the connection')
+ writer.close()
+ await writer.wait_closed()
+
+ asyncio.run(tcp_echo_client('Hello World!'))
-.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
+See also the `Examples`_ section below.
- A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
- writer) pair.
- The reader returned is a :class:`StreamReader` instance; the writer is
- a :class:`StreamWriter` instance.
+.. rubric:: Stream Functions
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- returned :class:`StreamReader` instance.
+The following top-level asyncio functions can be used to create
+and work with streams:
+
+
+.. coroutinefunction:: open_connection(host=None, port=None, \*, \
+ loop=None, limit=None, ssl=None, family=0, \
+ proto=0, flags=0, sock=None, local_addr=None, \
+ server_hostname=None, ssl_handshake_timeout=None)
+
+ Establish a network connection and return a pair of
+ ``(reader, writer)`` objects.
+
+ The returned *reader* and *writer* objects are instances of
+ :class:`StreamReader` and :class:`StreamWriter` classes.
+
+ The *loop* argument is optional and can always be determined
+ automatically when this function is awaited from a coroutine.
+
+ *limit* determines the buffer size limit used by the
+ returned :class:`StreamReader` instance. By default the *limit*
+ is set to 64 KiB.
The rest of the arguments are passed directly to
- :meth:`AbstractEventLoop.create_connection`.
-
- This function is a :ref:`coroutine <coroutine>`.
+ :meth:`loop.create_connection`.
.. versionadded:: 3.7
The *ssl_handshake_timeout* parameter.
-.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
+.. coroutinefunction:: start_server(client_connected_cb, host=None, \
+ port=None, \*, loop=None, limit=None, \
+ family=socket.AF_UNSPEC, \
+ flags=socket.AI_PASSIVE, sock=None, \
+ backlog=100, ssl=None, reuse_address=None, \
+ reuse_port=None, ssl_handshake_timeout=None, \
+ start_serving=True)
- Start a socket server, with a callback for each client connected. The return
- value is the same as :meth:`~AbstractEventLoop.create_server()`.
+ Start a socket server.
The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a reader/writer pair as two
- arguments, the first is a :class:`StreamReader` instance,
- and the second is a :class:`StreamWriter` instance.
+ connection is established. It receives a ``(reader, writer)`` pair
+ as two arguments, instances of the :class:`StreamReader` and
+ :class:`StreamWriter` classes.
- *client_connected_cb* accepts a plain callable or a
+ *client_connected_cb* can be a plain callable or a
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically converted into a :class:`Task`.
+ it will be automatically scheduled as a :class:`Task`.
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- :class:`StreamReader` instance passed to *client_connected_cb*.
+ The *loop* argument is optional and can always be determined
+ automatically when this method is awaited from a coroutine.
+
+ *limit* determines the buffer size limit used by the
+ returned :class:`StreamReader` instance. By default the *limit*
+ is set to 64 KiB.
The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_server()`.
-
- This function is a :ref:`coroutine <coroutine>`.
+ :meth:`loop.create_server`.
.. versionadded:: 3.7
The *ssl_handshake_timeout* and *start_serving* parameters.
-.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
- A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
- a (reader, writer) pair.
+.. rubric:: Unix Sockets
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- returned :class:`StreamReader` instance.
+.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
+ limit=None, ssl=None, sock=None, \
+ server_hostname=None, ssl_handshake_timeout=None)
- The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_unix_connection()`.
+ Establish a Unix socket connection and return a pair of
+ ``(reader, writer)``.
- This function is a :ref:`coroutine <coroutine>`.
+ Similar to :func:`open_connection` but operates on Unix sockets.
- Availability: UNIX.
+ See also the documentation of :meth:`loop.create_unix_connection`.
+
+ Availability: Unix.
.. versionadded:: 3.7
@@ -90,29 +125,19 @@
The *path* parameter can now be a :term:`path-like object`
-.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
- Start a UNIX Domain Socket server, with a callback for each client connected.
+.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
+ \*, loop=None, limit=None, sock=None, \
+ backlog=100, ssl=None, ssl_handshake_timeout=None, \
+ start_serving=True)
- The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a reader/writer pair as two
- arguments, the first is a :class:`StreamReader` instance,
- and the second is a :class:`StreamWriter` instance.
+ Start a Unix socket server.
- *client_connected_cb* accepts a plain callable or a
- :ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically converted into a :class:`Task`.
+ Similar to :func:`start_server` but works with Unix sockets.
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- :class:`StreamReader` instance passed to *client_connected_cb*.
+ See also the documentation of :meth:`loop.create_unix_server`.
- The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_unix_server()`.
-
- This function is a :ref:`coroutine <coroutine>`.
-
- Availability: UNIX.
+ Availability: Unix.
.. versionadded:: 3.7
@@ -123,229 +148,156 @@
The *path* parameter can now be a :term:`path-like object`.
+---------
+
+
StreamReader
============
-.. class:: StreamReader(limit=_DEFAULT_LIMIT, loop=None)
+.. class:: StreamReader
- This class is :ref:`not thread safe <asyncio-multithreading>`.
+ Represents a reader object that provides APIs to read data
+ from the IO stream.
- The *limit* argument's default value is set to _DEFAULT_LIMIT which is 2**16 (64 KiB)
-
- .. method:: exception()
-
- Get the exception.
-
- .. method:: feed_eof()
-
- Acknowledge the EOF.
-
- .. method:: feed_data(data)
-
- Feed *data* bytes in the internal buffer. Any operations waiting
- for the data will be resumed.
-
- .. method:: set_exception(exc)
-
- Set the exception.
-
- .. method:: set_transport(transport)
-
- Set the transport.
+ It is not recommended to instantiate *StreamReader* objects
+ directly; use :func:`open_connection` and :func:`start_server`
+ instead.
.. coroutinemethod:: read(n=-1)
Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
read until EOF and return all read bytes.
- If the EOF was received and the internal buffer is empty,
+ If EOF was received and the internal buffer is empty,
return an empty ``bytes`` object.
- This method is a :ref:`coroutine <coroutine>`.
-
.. coroutinemethod:: readline()
- Read one line, where "line" is a sequence of bytes ending with ``\n``.
+ Read one line, where "line" is a sequence of bytes
+ ending with ``\n``.
- If EOF is received, and ``\n`` was not found, the method will
- return the partial read bytes.
+ If EOF is received and ``\n`` was not found, the method
+ returns partially read data.
- If the EOF was received and the internal buffer is empty,
+ If EOF is received and the internal buffer is empty,
return an empty ``bytes`` object.
- This method is a :ref:`coroutine <coroutine>`.
-
.. coroutinemethod:: readexactly(n)
- Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
- the stream is reached before *n* can be read, the
- :attr:`IncompleteReadError.partial` attribute of the exception contains
- the partial read bytes.
+ Read exactly *n* bytes.
- This method is a :ref:`coroutine <coroutine>`.
+ Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
+ can be read. Use the :attr:`IncompleteReadError.partial`
+ attribute to get the partially read data.
.. coroutinemethod:: readuntil(separator=b'\\n')
- Read data from the stream until ``separator`` is found.
+ Read data from the stream until *separator* is found.
On success, the data and separator will be removed from the
internal buffer (consumed). Returned data will include the
separator at the end.
- Configured stream limit is used to check result. Limit sets the
- maximal length of data that can be returned, not counting the
- separator.
+ If the amount of data read exceeds the configured stream limit, a
+ :exc:`LimitOverrunError` exception is raised, and the data
+ is left in the internal buffer and can be read again.
- If an EOF occurs and the complete separator is still not found,
- an :exc:`IncompleteReadError` exception will be
- raised, and the internal buffer will be reset. The
- :attr:`IncompleteReadError.partial` attribute may contain the
- separator partially.
-
- If the data cannot be read because of over limit, a
- :exc:`LimitOverrunError` exception will be raised, and the data
- will be left in the internal buffer, so it can be read again.
+ If EOF is reached before the complete separator is found,
+ an :exc:`IncompleteReadError` exception is raised, and the internal
+ buffer is reset. The :attr:`IncompleteReadError.partial` attribute
+ may contain a portion of the separator.
.. versionadded:: 3.5.2
.. method:: at_eof()
- Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
+ Return ``True`` if the buffer is empty and :meth:`feed_eof`
+ was called.
StreamWriter
============
-.. class:: StreamWriter(transport, protocol, reader, loop)
+.. class:: StreamWriter
- Wraps a Transport.
+ Represents a writer object that provides APIs to write data
+ to the IO stream.
- This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`,
- :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds
- :meth:`drain` which returns an optional :class:`Future` on which you can
- wait for flow control. It also adds a transport attribute which references
- the :class:`Transport` directly.
-
- This class is :ref:`not thread safe <asyncio-multithreading>`.
-
- .. attribute:: transport
-
- Transport.
+ It is not recommended to instantiate *StreamWriter* objects
+ directly; use :func:`open_connection` and :func:`start_server`
+ instead.
.. method:: can_write_eof()
- Return :const:`True` if the transport supports :meth:`write_eof`,
- :const:`False` if not. See :meth:`WriteTransport.can_write_eof`.
+ Return *True* if the underlying transport supports
+ the :meth:`write_eof` method, *False* otherwise.
+
+ .. method:: write_eof()
+
+ Close the write end of the stream after the buffered write
+ data is flushed.
+
+ .. attribute:: transport
+
+ Return the underlying asyncio transport.
+
+ .. method:: get_extra_info(name, default=None)
+
+ Access optional transport information; see
+ :meth:`BaseTransport.get_extra_info` for details.
+
+ .. method:: write(data)
+
+ Write *data* to the stream.
+
+ This method is not subject to flow control. Calls to ``write()`` should
+ be followed by :meth:`drain`.
+
+ .. method:: writelines(data)
+
+ Write a list (or any iterable) of bytes to the stream.
+
+ This method is not subject to flow control. Calls to ``writelines()``
+ should be followed by :meth:`drain`.
+
+ .. coroutinemethod:: drain()
+
+ Wait until it is appropriate to resume writing to the stream.
+ Example::
+
+ writer.write(data)
+ await writer.drain()
+
+ This is a flow control method that interacts with the underlying
+ IO write buffer. When the size of the buffer reaches
+ the high watermark, *drain()* blocks until the size of the
+ buffer is drained down to the low watermark and writing can
+ be resumed. When there is nothing to wait for, the :meth:`drain`
+ returns immediately.
.. method:: close()
- Close the transport: see :meth:`BaseTransport.close`.
+ Close the stream.
.. method:: is_closing()
- Return ``True`` if the writer is closing or is closed.
+ Return ``True`` if the stream is closed or in the process of
+ being closed.
.. versionadded:: 3.7
.. coroutinemethod:: wait_closed()
- Wait until the writer is closed.
+ Wait until the stream is closed.
- Should be called after :meth:`close` to wait until the underlying
- connection (and the associated transport/protocol pair) is closed.
+ Should be called after :meth:`close` to wait until the underlying
+ connection is closed.
.. versionadded:: 3.7
- .. coroutinemethod:: drain()
- Let the write buffer of the underlying transport a chance to be flushed.
-
- The intended use is to write::
-
- w.write(data)
- await w.drain()
-
- When the size of the transport buffer reaches the high-water limit (the
- protocol is paused), block until the size of the buffer is drained down
- to the low-water limit and the protocol is resumed. When there is nothing
- to wait for, the yield-from continues immediately.
-
- Yielding from :meth:`drain` gives the opportunity for the loop to
- schedule the write operation and flush the buffer. It should especially
- be used when a possibly large amount of data is written to the transport,
- and the coroutine does not yield-from between calls to :meth:`write`.
-
- This method is a :ref:`coroutine <coroutine>`.
-
- .. method:: get_extra_info(name, default=None)
-
- Return optional transport information: see
- :meth:`BaseTransport.get_extra_info`.
-
- .. method:: write(data)
-
- Write some *data* bytes to the transport: see
- :meth:`WriteTransport.write`.
-
- .. method:: writelines(data)
-
- Write a list (or any iterable) of data bytes to the transport:
- see :meth:`WriteTransport.writelines`.
-
- .. method:: write_eof()
-
- Close the write end of the transport after flushing buffered data:
- see :meth:`WriteTransport.write_eof`.
-
-
-StreamReaderProtocol
-====================
-
-.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
-
- Trivial helper class to adapt between :class:`Protocol` and
- :class:`StreamReader`. Subclass of :class:`Protocol`.
-
- *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb*
- is an optional function called with (stream_reader, stream_writer) when a
- connection is made, *loop* is the event loop instance to use.
-
- (This is a helper class instead of making :class:`StreamReader` itself a
- :class:`Protocol` subclass, because the :class:`StreamReader` has other
- potential uses, and to prevent the user of the :class:`StreamReader` from
- accidentally calling inappropriate methods of the protocol.)
-
-
-IncompleteReadError
-===================
-
-.. exception:: IncompleteReadError
-
- Incomplete read error, subclass of :exc:`EOFError`.
-
- .. attribute:: expected
-
- Total number of expected bytes (:class:`int`).
-
- .. attribute:: partial
-
- Read bytes string before the end of stream was reached (:class:`bytes`).
-
-
-LimitOverrunError
-=================
-
-.. exception:: LimitOverrunError
-
- Reached the buffer limit while looking for a separator.
-
- .. attribute:: consumed
-
- Total number of to be consumed bytes.
-
-
-Stream examples
-===============
+Examples
+========
.. _asyncio-tcp-echo-client-streams:
@@ -356,28 +308,26 @@
import asyncio
- async def tcp_echo_client(message, loop):
- reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
- loop=loop)
+ async def tcp_echo_client(message):
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
- print('Send: %r' % message)
+ print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
- print('Received: %r' % data.decode())
+ print(f'Received: {data.decode()!r}')
- print('Close the socket')
+ print('Close the connection')
writer.close()
- message = 'Hello World!'
- loop = asyncio.get_event_loop()
- loop.run_until_complete(tcp_echo_client(message, loop))
- loop.close()
+ asyncio.run(tcp_echo_client('Hello World!'))
+
.. seealso::
- The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
- example uses the :meth:`AbstractEventLoop.create_connection` method.
+ The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
+ example uses the low-level :meth:`loop.create_connection` method.
.. _asyncio-tcp-echo-server-streams:
@@ -393,35 +343,33 @@
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
- print("Received %r from %r" % (message, addr))
- print("Send: %r" % message)
+ print(f"Received {message!r} from {addr!r}")
+
+ print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
- print("Close the client socket")
+ print("Close the connection")
writer.close()
- loop = asyncio.get_event_loop()
- coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
- server = loop.run_until_complete(coro)
+ async def main():
+ server = await asyncio.start_server(
+ handle_echo, '127.0.0.1', 8888)
- # Serve requests until Ctrl+C is pressed
- print('Serving on {}'.format(server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
+ addr = server.sockets[0].getsockname()
+ print(f'Serving on {addr}')
- # Close the server
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
+ async with server:
+ await server.serve_forever()
+
+ asyncio.run(main())
+
.. seealso::
- The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
- example uses the :meth:`AbstractEventLoop.create_server` method.
+ The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
+ example uses the :meth:`loop.create_server` method.
Get HTTP headers
@@ -436,30 +384,34 @@
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
- connect = asyncio.open_connection(url.hostname, 443, ssl=True)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 443, ssl=True)
else:
- connect = asyncio.open_connection(url.hostname, 80)
- reader, writer = await connect
- query = ('HEAD {path} HTTP/1.0\r\n'
- 'Host: {hostname}\r\n'
- '\r\n').format(path=url.path or '/', hostname=url.hostname)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 80)
+
+ query = (
+ f"HEAD {url.path or '/'} HTTP/1.0\r\n"
+ f"Host: {url.hostname}\r\n"
+ f"\r\n"
+ )
+
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
+
line = line.decode('latin1').rstrip()
if line:
- print('HTTP header> %s' % line)
+ print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
- loop = asyncio.get_event_loop()
- task = asyncio.ensure_future(print_http_headers(url))
- loop.run_until_complete(task)
- loop.close()
+ asyncio.run(print_http_headers(url))
+
Usage::
@@ -469,7 +421,8 @@
python example.py https://example.com/path/page.html
-.. _asyncio-register-socket-streams:
+
+.. _asyncio_example_create_connection-streams:
Register an open socket to wait for data using streams
------------------------------------------------------
@@ -478,14 +431,18 @@
:func:`open_connection` function::
import asyncio
- from socket import socketpair
+ import socket
- async def wait_for_data(loop):
- # Create a pair of connected sockets
- rsock, wsock = socketpair()
+ async def wait_for_data():
+ # Get a reference to the current event loop because
+ # we want to access low-level APIs.
+ loop = asyncio.get_running_loop()
- # Register the open socket to wait for data
- reader, writer = await asyncio.open_connection(sock=rsock, loop=loop)
+ # Create a pair of connected sockets.
+ rsock, wsock = socket.socketpair()
+
+ # Register the open socket to wait for data.
+ reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
@@ -500,17 +457,14 @@
# Close the second socket
wsock.close()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(wait_for_data(loop))
- loop.close()
+ asyncio.run(wait_for_data())
.. seealso::
The :ref:`register an open socket to wait for data using a protocol
- <asyncio-register-socket>` example uses a low-level protocol created by the
- :meth:`AbstractEventLoop.create_connection` method.
+ <asyncio_example_create_connection>` example uses a low-level protocol and
+ the :meth:`loop.create_connection` method.
The :ref:`watch a file descriptor for read events
- <asyncio-watch-read-event>` example uses the low-level
- :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
- socket.
+ <asyncio_example_watch_fd>` example uses the low-level
+ :meth:`loop.add_reader` method to watch a file descriptor.