blob: dfe520de56bf04527e34d772181449a26463f446 [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanov7c7605f2018-09-11 09:54:40 -07005=======
6Streams
7=======
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
Yury Selivanov7c7605f2018-09-11 09:54:40 -07009Streams are high-level async/await-ready primitives to work with
Yury Selivanov8be876e2018-09-11 17:10:37 -070010network connections. Streams allow sending and receiving data without
Yury Selivanov7c7605f2018-09-11 09:54:40 -070011using callbacks or low-level protocols and transports.
lf627d2c82017-07-25 17:03:51 -060012
Yury Selivanov7372c3b2018-09-14 15:11:24 -070013.. _asyncio_example_stream:
14
Yury Selivanov8be876e2018-09-11 17:10:37 -070015Here is an example of a TCP echo client written using asyncio
Yury Selivanov7c7605f2018-09-11 09:54:40 -070016streams::
Victor Stinner24f8ebf2014-01-23 11:05:01 +010017
Yury Selivanov7c7605f2018-09-11 09:54:40 -070018 import asyncio
Guido van Rossum19ff6972015-10-19 13:18:04 -070019
Yury Selivanov7c7605f2018-09-11 09:54:40 -070020 async def tcp_echo_client(message):
Xtreak6793cce2019-06-24 23:46:58 +053021 async with asyncio.connect('127.0.0.1', 8888) as stream:
22 print(f'Send: {message!r}')
23 await stream.write(message.encode())
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024
Xtreak6793cce2019-06-24 23:46:58 +053025 data = await stream.read(100)
26 print(f'Received: {data.decode()!r}')
Yury Selivanov7c7605f2018-09-11 09:54:40 -070027
28 asyncio.run(tcp_echo_client('Hello World!'))
Guido van Rossum19ff6972015-10-19 13:18:04 -070029
30
Yury Selivanov8be876e2018-09-11 17:10:37 -070031See also the `Examples`_ section below.
32
33
Yury Selivanov7c7605f2018-09-11 09:54:40 -070034.. rubric:: Stream Functions
Victor Stinner24f8ebf2014-01-23 11:05:01 +010035
Yury Selivanov7c7605f2018-09-11 09:54:40 -070036The following top-level asyncio functions can be used to create
37and work with streams:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010038
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Xtreak6793cce2019-06-24 23:46:58 +053040.. coroutinefunction:: connect(host=None, port=None, \*, \
41 limit=2**16, ssl=None, family=0, \
42 proto=0, flags=0, sock=None, local_addr=None, \
43 server_hostname=None, ssl_handshake_timeout=None, \
44 happy_eyeballs_delay=None, interleave=None)
45
46 Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
47 object of mode :attr:`StreamMode.READWRITE`.
48
49
50 *limit* determines the buffer size limit used by the returned :class:`Stream`
51 instance. By default the *limit* is set to 64 KiB.
52
53 The rest of the arguments are passed directly to :meth:`loop.create_connection`.
54
55 The function can be used with ``await`` to get a connected stream::
56
57 stream = await asyncio.connect('127.0.0.1', 8888)
58
59 The function can also be used as an async context manager::
60
61 async with asyncio.connect('127.0.0.1', 8888) as stream:
62 ...
63
64 .. versionadded:: 3.8
65
Yury Selivanov7c7605f2018-09-11 09:54:40 -070066.. coroutinefunction:: open_connection(host=None, port=None, \*, \
67 loop=None, limit=None, ssl=None, family=0, \
68 proto=0, flags=0, sock=None, local_addr=None, \
69 server_hostname=None, ssl_handshake_timeout=None)
70
71 Establish a network connection and return a pair of
Yury Selivanov8be876e2018-09-11 17:10:37 -070072 ``(reader, writer)`` objects.
Yury Selivanov7c7605f2018-09-11 09:54:40 -070073
74 The returned *reader* and *writer* objects are instances of
75 :class:`StreamReader` and :class:`StreamWriter` classes.
76
77 The *loop* argument is optional and can always be determined
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040078 automatically when this function is awaited from a coroutine.
Yury Selivanov7c7605f2018-09-11 09:54:40 -070079
80 *limit* determines the buffer size limit used by the
Yury Selivanov8be876e2018-09-11 17:10:37 -070081 returned :class:`StreamReader` instance. By default the *limit*
82 is set to 64 KiB.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010083
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -040084 The rest of the arguments are passed directly to
Yury Selivanov7c7605f2018-09-11 09:54:40 -070085 :meth:`loop.create_connection`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010086
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -040087 .. versionadded:: 3.7
88
89 The *ssl_handshake_timeout* parameter.
90
Emmanuel Ariased9f3562019-05-31 17:48:57 -030091 .. deprecated-removed:: 3.8 3.10
92
Xtreak6793cce2019-06-24 23:46:58 +053093 `open_connection()` is deprecated in favor of :func:`connect`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -030094
Yury Selivanov7c7605f2018-09-11 09:54:40 -070095.. coroutinefunction:: start_server(client_connected_cb, host=None, \
Xtreak6793cce2019-06-24 23:46:58 +053096 port=None, \*, loop=None, limit=2**16, \
Yury Selivanov7c7605f2018-09-11 09:54:40 -070097 family=socket.AF_UNSPEC, \
98 flags=socket.AI_PASSIVE, sock=None, \
99 backlog=100, ssl=None, reuse_address=None, \
100 reuse_port=None, ssl_handshake_timeout=None, \
101 start_serving=True)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100102
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700103 Start a socket server.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100104
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400105 The *client_connected_cb* callback is called whenever a new client
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700106 connection is established. It receives a ``(reader, writer)`` pair
107 as two arguments, instances of the :class:`StreamReader` and
108 :class:`StreamWriter` classes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100109
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700110 *client_connected_cb* can be a plain callable or a
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400111 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400112 it will be automatically scheduled as a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100113
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700114 The *loop* argument is optional and can always be determined
115 automatically when this method is awaited from a coroutine.
116
117 *limit* determines the buffer size limit used by the
Yury Selivanov8be876e2018-09-11 17:10:37 -0700118 returned :class:`StreamReader` instance. By default the *limit*
119 is set to 64 KiB.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400120
121 The rest of the arguments are passed directly to
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700122 :meth:`loop.create_server`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100123
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400124 .. versionadded:: 3.7
125
126 The *ssl_handshake_timeout* and *start_serving* parameters.
127
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300128 .. deprecated-removed:: 3.8 3.10
129
Xtreak6793cce2019-06-24 23:46:58 +0530130 `start_server()` is deprecated if favor of :class:`StreamServer`
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300131
Xtreak6793cce2019-06-24 23:46:58 +0530132.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
133
134 Takes a :term:`file-like object <file object>` *pipe* to return a
135 :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
136 similar API of :class:`StreamReader`. It can also be used as an async context manager.
137
138 *limit* determines the buffer size limit used by the returned :class:`Stream`
139 instance. By default the limit is set to 64 KiB.
140
141 .. versionadded:: 3.8
142
143.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
144
145 Takes a :term:`file-like object <file object>` *pipe* to return a
146 :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
147 similar API of :class:`StreamWriter`. It can also be used as an async context manager.
148
149 *limit* determines the buffer size limit used by the returned :class:`Stream`
150 instance. By default the limit is set to 64 KiB.
151
152 .. versionadded:: 3.8
Yury Selivanov8be876e2018-09-11 17:10:37 -0700153
154.. rubric:: Unix Sockets
155
Xtreak6793cce2019-06-24 23:46:58 +0530156.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
157 sock=None, server_hostname=None, \
158 ssl_handshake_timeout=None)
159
160 Establish a Unix socket connection to socket with *path* address and
161 return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
162 that can be used as a reader and a writer.
163
164 *limit* determines the buffer size limit used by the returned :class:`Stream`
165 instance. By default the *limit* is set to 64 KiB.
166
167 The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
168
169 The function can be used with ``await`` to get a connected stream::
170
171 stream = await asyncio.connect_unix('/tmp/example.sock')
172
173 The function can also be used as an async context manager::
174
175 async with asyncio.connect_unix('/tmp/example.sock') as stream:
176 ...
177
178 .. availability:: Unix.
179
180 .. versionadded:: 3.8
181
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700182.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
183 limit=None, ssl=None, sock=None, \
184 server_hostname=None, ssl_handshake_timeout=None)
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500185
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400186 Establish a Unix socket connection and return a pair of
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700187 ``(reader, writer)``.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500188
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400189 Similar to :func:`open_connection` but operates on Unix sockets.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400190
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700191 See also the documentation of :meth:`loop.create_unix_connection`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500192
Cheryl Sabella2d6097d2018-10-12 10:55:20 -0400193 .. availability:: Unix.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500194
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400195 .. versionadded:: 3.7
196
197 The *ssl_handshake_timeout* parameter.
198
199 .. versionchanged:: 3.7
200
201 The *path* parameter can now be a :term:`path-like object`
202
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300203 .. deprecated-removed:: 3.8 3.10
204
Xtreak6793cce2019-06-24 23:46:58 +0530205 ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300206
Yury Selivanov8be876e2018-09-11 17:10:37 -0700207
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700208.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
209 \*, loop=None, limit=None, sock=None, \
210 backlog=100, ssl=None, ssl_handshake_timeout=None, \
211 start_serving=True)
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500212
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400213 Start a Unix socket server.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500214
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400215 Similar to :func:`start_server` but works with Unix sockets.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400216
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700217 See also the documentation of :meth:`loop.create_unix_server`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500218
Cheryl Sabella2d6097d2018-10-12 10:55:20 -0400219 .. availability:: Unix.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500220
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400221 .. versionadded:: 3.7
222
223 The *ssl_handshake_timeout* and *start_serving* parameters.
224
225 .. versionchanged:: 3.7
226
227 The *path* parameter can now be a :term:`path-like object`.
228
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300229 .. deprecated-removed:: 3.8 3.10
230
Xtreak6793cce2019-06-24 23:46:58 +0530231 ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300232
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100233
Yury Selivanov8be876e2018-09-11 17:10:37 -0700234---------
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700235
Xtreak6793cce2019-06-24 23:46:58 +0530236StreamServer
237============
238
239.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
240 limit=2**16, family=socket.AF_UNSPEC, \
241 flags=socket.AI_PASSIVE, sock=None, backlog=100, \
242 ssl=None, reuse_address=None, reuse_port=None, \
243 ssl_handshake_timeout=None, shutdown_timeout=60)
244
245 The *client_connected_cb* callback is called whenever a new client
246 connection is established. It receives a :class:`Stream` object of the
247 mode :attr:`StreamMode.READWRITE`.
248
249 *client_connected_cb* can be a plain callable or a
250 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
251 it will be automatically scheduled as a :class:`Task`.
252
253 *limit* determines the buffer size limit used by the
254 returned :class:`Stream` instance. By default the *limit*
255 is set to 64 KiB.
256
257 The rest of the arguments are passed directly to
258 :meth:`loop.create_server`.
259
260 .. coroutinemethod:: start_serving()
261
262 Binds to the given host and port to start the server.
263
264 .. coroutinemethod:: serve_forever()
265
266 Start accepting connections until the coroutine is cancelled.
267 Cancellation of ``serve_forever`` task causes the server
268 to be closed.
269
270 This method can be called if the server is already accepting
271 connections. Only one ``serve_forever`` task can exist per
272 one *Server* object.
273
274 .. method:: is_serving()
275
276 Returns ``True`` if the server is bound and currently serving.
277
278 .. method:: bind()
279
280 Bind the server to the given *host* and *port*. This method is
281 automatically called during ``__aenter__`` when :class:`StreamServer` is
282 used as an async context manager.
283
284 .. method:: is_bound()
285
286 Return ``True`` if the server is bound.
287
288 .. coroutinemethod:: abort()
289
290 Closes the connection and cancels all pending tasks.
291
292 .. coroutinemethod:: close()
293
294 Closes the connection. This method is automatically called during
295 ``__aexit__`` when :class:`StreamServer` is used as an async context
296 manager.
297
298 .. attribute:: sockets
299
300 Returns a tuple of socket objects the server is bound to.
301
302 .. versionadded:: 3.8
303
304
305UnixStreamServer
306================
307
308.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
309 limit=2**16, sock=None, backlog=100, \
310 ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
311
312 The *client_connected_cb* callback is called whenever a new client
313 connection is established. It receives a :class:`Stream` object of the
314 mode :attr:`StreamMode.READWRITE`.
315
316 *client_connected_cb* can be a plain callable or a
317 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
318 it will be automatically scheduled as a :class:`Task`.
319
320 *limit* determines the buffer size limit used by the
321 returned :class:`Stream` instance. By default the *limit*
322 is set to 64 KiB.
323
324 The rest of the arguments are passed directly to
325 :meth:`loop.create_unix_server`.
326
327 .. coroutinemethod:: start_serving()
328
329 Binds to the given host and port to start the server.
330
331 .. method:: is_serving()
332
333 Returns ``True`` if the server is bound and currently serving.
334
335 .. method:: bind()
336
337 Bind the server to the given *host* and *port*. This method is
338 automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
339 used as an async context manager.
340
341 .. method:: is_bound()
342
343 Return ``True`` if the server is bound.
344
345 .. coroutinemethod:: abort()
346
347 Closes the connection and cancels all pending tasks.
348
349 .. coroutinemethod:: close()
350
351 Closes the connection. This method is automatically called during
352 ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
353 manager.
354
355 .. attribute:: sockets
356
357 Returns a tuple of socket objects the server is bound to.
358
359 .. availability:: Unix.
360
361 .. versionadded:: 3.8
362
363Stream
364======
365
366.. class:: Stream
367
368 Represents a Stream object that provides APIs to read and write data
369 to the IO stream . It includes the API provided by :class:`StreamReader`
370 and :class:`StreamWriter`.
371
372 Do not instantiate *Stream* objects directly; use API like :func:`connect`
373 and :class:`StreamServer` instead.
374
375 .. versionadded:: 3.8
376
377
378StreamMode
379==========
380
381.. class:: StreamMode
382
383 A subclass of :class:`enum.Flag` that defines a set of values that can be
384 used to determine the ``mode`` of :class:`Stream` objects.
385
386 .. data:: READ
387
388 The stream object is readable and provides the API of :class:`StreamReader`.
389
390 .. data:: WRITE
391
392 The stream object is writeable and provides the API of :class:`StreamWriter`.
393
394 .. data:: READWRITE
395
396 The stream object is readable and writeable and provides the API of both
397 :class:`StreamReader` and :class:`StreamWriter`.
398
399 .. versionadded:: 3.8
400
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700401
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100402StreamReader
403============
404
Yury Selivanov8be876e2018-09-11 17:10:37 -0700405.. class:: StreamReader
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100406
Yury Selivanov8be876e2018-09-11 17:10:37 -0700407 Represents a reader object that provides APIs to read data
408 from the IO stream.
Victor Stinner83704962015-02-25 14:24:15 +0100409
Yury Selivanov8be876e2018-09-11 17:10:37 -0700410 It is not recommended to instantiate *StreamReader* objects
411 directly; use :func:`open_connection` and :func:`start_server`
412 instead.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100413
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100414 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100415
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500416 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
417 read until EOF and return all read bytes.
418
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400419 If EOF was received and the internal buffer is empty,
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500420 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100421
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100422 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100423
Yury Selivanov8be876e2018-09-11 17:10:37 -0700424 Read one line, where "line" is a sequence of bytes
425 ending with ``\n``.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500426
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400427 If EOF is received and ``\n`` was not found, the method
Yury Selivanov8be876e2018-09-11 17:10:37 -0700428 returns partially read data.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500429
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400430 If EOF is received and the internal buffer is empty,
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500431 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100432
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100433 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100434
Yury Selivanov8be876e2018-09-11 17:10:37 -0700435 Read exactly *n* bytes.
436
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400437 Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
Yury Selivanov8be876e2018-09-11 17:10:37 -0700438 can be read. Use the :attr:`IncompleteReadError.partial`
439 attribute to get the partially read data.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100440
Berker Peksage5b0bd12016-10-18 00:34:46 +0300441 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400442
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400443 Read data from the stream until *separator* is found.
Yury Selivanov950204d2016-05-16 16:23:00 -0400444
445 On success, the data and separator will be removed from the
446 internal buffer (consumed). Returned data will include the
447 separator at the end.
448
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400449 If the amount of data read exceeds the configured stream limit, a
450 :exc:`LimitOverrunError` exception is raised, and the data
451 is left in the internal buffer and can be read again.
Yury Selivanov950204d2016-05-16 16:23:00 -0400452
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400453 If EOF is reached before the complete separator is found,
454 an :exc:`IncompleteReadError` exception is raised, and the internal
455 buffer is reset. The :attr:`IncompleteReadError.partial` attribute
456 may contain a portion of the separator.
Yury Selivanov950204d2016-05-16 16:23:00 -0400457
458 .. versionadded:: 3.5.2
459
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500460 .. method:: at_eof()
461
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700462 Return ``True`` if the buffer is empty and :meth:`feed_eof`
463 was called.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500464
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100465
466StreamWriter
467============
468
Yury Selivanov8be876e2018-09-11 17:10:37 -0700469.. class:: StreamWriter
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100470
Yury Selivanov8be876e2018-09-11 17:10:37 -0700471 Represents a writer object that provides APIs to write data
472 to the IO stream.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100473
Yury Selivanov8be876e2018-09-11 17:10:37 -0700474 It is not recommended to instantiate *StreamWriter* objects
475 directly; use :func:`open_connection` and :func:`start_server`
476 instead.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100477
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400478 .. method:: write(data)
Andrew Svetlov11194c82018-09-13 16:53:49 -0700479
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400480 The method attempts to write the *data* to the underlying socket immediately.
481 If that fails, the data is queued in an internal write buffer until it can be
482 sent.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700483
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400484 Starting with Python 3.8, it is possible to directly await on the `write()`
485 method::
Andrew Svetlov11194c82018-09-13 16:53:49 -0700486
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400487 await stream.write(data)
Andrew Svetlov11194c82018-09-13 16:53:49 -0700488
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400489 The ``await`` pauses the current coroutine until the data is written to the
490 socket.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700491
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400492 Below is an equivalent code that works with Python <= 3.7::
Andrew Svetlov11194c82018-09-13 16:53:49 -0700493
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400494 stream.write(data)
495 await stream.drain()
Andrew Svetlov11194c82018-09-13 16:53:49 -0700496
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400497 .. versionchanged:: 3.8
498 Support ``await stream.write(...)`` syntax.
499
500 .. method:: writelines(data)
501
502 The method writes a list (or any iterable) of bytes to the underlying socket
503 immediately.
504 If that fails, the data is queued in an internal write buffer until it can be
505 sent.
506
507 Starting with Python 3.8, it is possible to directly await on the `write()`
508 method::
509
510 await stream.writelines(lines)
511
512 The ``await`` pauses the current coroutine until the data is written to the
513 socket.
514
515 Below is an equivalent code that works with Python <= 3.7::
516
517 stream.writelines(lines)
518 await stream.drain()
519
520 .. versionchanged:: 3.8
521 Support ``await stream.writelines()`` syntax.
522
523 .. method:: close()
524
525 The method closes the stream and the underlying socket.
526
527 Starting with Python 3.8, it is possible to directly await on the `close()`
528 method::
529
530 await stream.close()
531
532 The ``await`` pauses the current coroutine until the stream and the underlying
533 socket are closed (and SSL shutdown is performed for a secure connection).
534
535 Below is an equivalent code that works with Python <= 3.7::
536
537 stream.close()
538 await stream.wait_closed()
539
540 .. versionchanged:: 3.8
541 Support ``await stream.close()`` syntax.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700542
543 .. method:: can_write_eof()
544
545 Return *True* if the underlying transport supports
546 the :meth:`write_eof` method, *False* otherwise.
547
548 .. method:: write_eof()
549
550 Close the write end of the stream after the buffered write
551 data is flushed.
552
553 .. attribute:: transport
554
555 Return the underlying asyncio transport.
556
557 .. method:: get_extra_info(name, default=None)
558
559 Access optional transport information; see
560 :meth:`BaseTransport.get_extra_info` for details.
561
Yury Selivanov8be876e2018-09-11 17:10:37 -0700562 .. coroutinemethod:: drain()
563
564 Wait until it is appropriate to resume writing to the stream.
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400565 Example::
Yury Selivanov8be876e2018-09-11 17:10:37 -0700566
567 writer.write(data)
568 await writer.drain()
569
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400570 This is a flow control method that interacts with the underlying
Yury Selivanov8be876e2018-09-11 17:10:37 -0700571 IO write buffer. When the size of the buffer reaches
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400572 the high watermark, *drain()* blocks until the size of the
573 buffer is drained down to the low watermark and writing can
Yury Selivanov8be876e2018-09-11 17:10:37 -0700574 be resumed. When there is nothing to wait for, the :meth:`drain`
575 returns immediately.
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100576
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200577 .. method:: is_closing()
578
Yury Selivanov8be876e2018-09-11 17:10:37 -0700579 Return ``True`` if the stream is closed or in the process of
580 being closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200581
582 .. versionadded:: 3.7
583
584 .. coroutinemethod:: wait_closed()
585
Yury Selivanov8be876e2018-09-11 17:10:37 -0700586 Wait until the stream is closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200587
Yury Selivanov8be876e2018-09-11 17:10:37 -0700588 Should be called after :meth:`close` to wait until the underlying
589 connection is closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200590
591 .. versionadded:: 3.7
592
Victor Stinnerc520edc2014-01-23 11:25:48 +0100593
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700594Examples
595========
Victor Stinner5121a9b2014-10-11 15:52:14 +0200596
Victor Stinnered051592014-10-12 20:18:16 +0200597.. _asyncio-tcp-echo-client-streams:
598
599TCP echo client using streams
600-----------------------------
601
Xtreak6793cce2019-06-24 23:46:58 +0530602TCP echo client using the :func:`asyncio.connect` function::
Victor Stinnered051592014-10-12 20:18:16 +0200603
604 import asyncio
605
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700606 async def tcp_echo_client(message):
Xtreak6793cce2019-06-24 23:46:58 +0530607 async with asyncio.connect('127.0.0.1', 8888) as stream:
608 print(f'Send: {message!r}')
609 await stream.write(message.encode())
Victor Stinnered051592014-10-12 20:18:16 +0200610
Xtreak6793cce2019-06-24 23:46:58 +0530611 data = await stream.read(100)
612 print(f'Received: {data.decode()!r}')
Victor Stinnered051592014-10-12 20:18:16 +0200613
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700614 asyncio.run(tcp_echo_client('Hello World!'))
615
Victor Stinnered051592014-10-12 20:18:16 +0200616
617.. seealso::
618
Yury Selivanov394374e2018-09-17 15:35:24 -0400619 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700620 example uses the low-level :meth:`loop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200621
622
623.. _asyncio-tcp-echo-server-streams:
624
625TCP echo server using streams
626-----------------------------
627
Xtreak6793cce2019-06-24 23:46:58 +0530628TCP echo server using the :class:`asyncio.StreamServer` class::
Victor Stinnered051592014-10-12 20:18:16 +0200629
630 import asyncio
631
Xtreak6793cce2019-06-24 23:46:58 +0530632 async def handle_echo(stream):
633 data = await stream.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200634 message = data.decode()
Xtreak6793cce2019-06-24 23:46:58 +0530635 addr = stream.get_extra_info('peername')
Victor Stinnered051592014-10-12 20:18:16 +0200636
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700637 print(f"Received {message!r} from {addr!r}")
638
639 print(f"Send: {message!r}")
Xtreak6793cce2019-06-24 23:46:58 +0530640 await stream.write(data)
Victor Stinnered051592014-10-12 20:18:16 +0200641
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700642 print("Close the connection")
Xtreak6793cce2019-06-24 23:46:58 +0530643 await stream.close()
Victor Stinnered051592014-10-12 20:18:16 +0200644
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700645 async def main():
Xtreak6793cce2019-06-24 23:46:58 +0530646 async with asyncio.StreamServer(
647 handle_echo, '127.0.0.1', 8888) as server:
648 addr = server.sockets[0].getsockname()
649 print(f'Serving on {addr}')
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700650 await server.serve_forever()
651
652 asyncio.run(main())
653
Victor Stinnered051592014-10-12 20:18:16 +0200654
655.. seealso::
656
Yury Selivanov394374e2018-09-17 15:35:24 -0400657 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700658 example uses the :meth:`loop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200659
660
Victor Stinner5121a9b2014-10-11 15:52:14 +0200661Get HTTP headers
662----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100663
664Simple example querying HTTP headers of the URL passed on the command line::
665
666 import asyncio
667 import urllib.parse
668 import sys
669
Mikhail Terekhovd2ac4002018-08-07 16:29:06 -0400670 async def print_http_headers(url):
Victor Stinnerc520edc2014-01-23 11:25:48 +0100671 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200672 if url.scheme == 'https':
Xtreak6793cce2019-06-24 23:46:58 +0530673 stream = await asyncio.connect(url.hostname, 443, ssl=True)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200674 else:
Xtreak6793cce2019-06-24 23:46:58 +0530675 stream = await asyncio.connect(url.hostname, 80)
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700676
677 query = (
678 f"HEAD {url.path or '/'} HTTP/1.0\r\n"
679 f"Host: {url.hostname}\r\n"
680 f"\r\n"
681 )
682
Xtreak6793cce2019-06-24 23:46:58 +0530683 stream.write(query.encode('latin-1'))
684 while (line := await stream.readline()):
Victor Stinnerc520edc2014-01-23 11:25:48 +0100685 line = line.decode('latin1').rstrip()
686 if line:
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700687 print(f'HTTP header> {line}')
Victor Stinnerc520edc2014-01-23 11:25:48 +0100688
Victor Stinner5121a9b2014-10-11 15:52:14 +0200689 # Ignore the body, close the socket
Xtreak6793cce2019-06-24 23:46:58 +0530690 await stream.close()
Victor Stinner5121a9b2014-10-11 15:52:14 +0200691
Victor Stinnerc520edc2014-01-23 11:25:48 +0100692 url = sys.argv[1]
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700693 asyncio.run(print_http_headers(url))
694
Victor Stinnerc520edc2014-01-23 11:25:48 +0100695
696Usage::
697
698 python example.py http://example.com/path/page.html
699
Victor Stinner04e6df32014-10-11 16:16:27 +0200700or with HTTPS::
701
702 python example.py https://example.com/path/page.html
703
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700704
Yury Selivanov394374e2018-09-17 15:35:24 -0400705.. _asyncio_example_create_connection-streams:
Victor Stinner04e6df32014-10-11 16:16:27 +0200706
707Register an open socket to wait for data using streams
708------------------------------------------------------
709
710Coroutine waiting until a socket receives data using the
Xtreak6793cce2019-06-24 23:46:58 +0530711:func:`asyncio.connect` function::
Victor Stinner04e6df32014-10-11 16:16:27 +0200712
713 import asyncio
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700714 import socket
Victor Stinner04e6df32014-10-11 16:16:27 +0200715
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700716 async def wait_for_data():
717 # Get a reference to the current event loop because
718 # we want to access low-level APIs.
719 loop = asyncio.get_running_loop()
Victor Stinner04e6df32014-10-11 16:16:27 +0200720
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700721 # Create a pair of connected sockets.
722 rsock, wsock = socket.socketpair()
723
724 # Register the open socket to wait for data.
Xtreak6793cce2019-06-24 23:46:58 +0530725 async with asyncio.connect(sock=rsock) as stream:
726 # Simulate the reception of data from the network
727 loop.call_soon(wsock.send, 'abc'.encode())
Victor Stinner04e6df32014-10-11 16:16:27 +0200728
Xtreak6793cce2019-06-24 23:46:58 +0530729 # Wait for data
730 data = await stream.read(100)
Victor Stinner04e6df32014-10-11 16:16:27 +0200731
Xtreak6793cce2019-06-24 23:46:58 +0530732 # Got data, we are done: close the socket
733 print("Received:", data.decode())
Victor Stinner04e6df32014-10-11 16:16:27 +0200734
735 # Close the second socket
736 wsock.close()
737
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700738 asyncio.run(wait_for_data())
Victor Stinner04e6df32014-10-11 16:16:27 +0200739
740.. seealso::
741
742 The :ref:`register an open socket to wait for data using a protocol
Yury Selivanov394374e2018-09-17 15:35:24 -0400743 <asyncio_example_create_connection>` example uses a low-level protocol and
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700744 the :meth:`loop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200745
746 The :ref:`watch a file descriptor for read events
Yury Selivanov394374e2018-09-17 15:35:24 -0400747 <asyncio_example_watch_fd>` example uses the low-level
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700748 :meth:`loop.add_reader` method to watch a file descriptor.