blob: feebd227eb8cad60a704cc3100e74c0c8fff5c0e [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanov7c7605f2018-09-11 09:54:40 -07005=======
6Streams
7=======
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
Yury Selivanov7c7605f2018-09-11 09:54:40 -07009Streams are high-level async/await-ready primitives to work with
Yury Selivanov8be876e2018-09-11 17:10:37 -070010network connections. Streams allow sending and receiving data without
Yury Selivanov7c7605f2018-09-11 09:54:40 -070011using callbacks or low-level protocols and transports.
lf627d2c82017-07-25 17:03:51 -060012
Yury Selivanov7372c3b2018-09-14 15:11:24 -070013.. _asyncio_example_stream:
14
Yury Selivanov8be876e2018-09-11 17:10:37 -070015Here is an example of a TCP echo client written using asyncio
Yury Selivanov7c7605f2018-09-11 09:54:40 -070016streams::
Victor Stinner24f8ebf2014-01-23 11:05:01 +010017
Yury Selivanov7c7605f2018-09-11 09:54:40 -070018 import asyncio
Guido van Rossum19ff6972015-10-19 13:18:04 -070019
Yury Selivanov7c7605f2018-09-11 09:54:40 -070020 async def tcp_echo_client(message):
Xtreak6793cce2019-06-24 23:46:58 +053021 async with asyncio.connect('127.0.0.1', 8888) as stream:
22 print(f'Send: {message!r}')
23 await stream.write(message.encode())
Yury Selivanov7c7605f2018-09-11 09:54:40 -070024
Xtreak6793cce2019-06-24 23:46:58 +053025 data = await stream.read(100)
26 print(f'Received: {data.decode()!r}')
Yury Selivanov7c7605f2018-09-11 09:54:40 -070027
28 asyncio.run(tcp_echo_client('Hello World!'))
Guido van Rossum19ff6972015-10-19 13:18:04 -070029
30
Yury Selivanov8be876e2018-09-11 17:10:37 -070031See also the `Examples`_ section below.
32
33
Yury Selivanov7c7605f2018-09-11 09:54:40 -070034.. rubric:: Stream Functions
Victor Stinner24f8ebf2014-01-23 11:05:01 +010035
Yury Selivanov7c7605f2018-09-11 09:54:40 -070036The following top-level asyncio functions can be used to create
37and work with streams:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010038
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Xtreak6793cce2019-06-24 23:46:58 +053040.. coroutinefunction:: connect(host=None, port=None, \*, \
41 limit=2**16, ssl=None, family=0, \
42 proto=0, flags=0, sock=None, local_addr=None, \
43 server_hostname=None, ssl_handshake_timeout=None, \
44 happy_eyeballs_delay=None, interleave=None)
45
46 Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
47 object of mode :attr:`StreamMode.READWRITE`.
48
Xtreak6793cce2019-06-24 23:46:58 +053049 *limit* determines the buffer size limit used by the returned :class:`Stream`
50 instance. By default the *limit* is set to 64 KiB.
51
52 The rest of the arguments are passed directly to :meth:`loop.create_connection`.
53
54 The function can be used with ``await`` to get a connected stream::
55
56 stream = await asyncio.connect('127.0.0.1', 8888)
57
58 The function can also be used as an async context manager::
59
60 async with asyncio.connect('127.0.0.1', 8888) as stream:
61 ...
62
63 .. versionadded:: 3.8
64
Yury Selivanov7c7605f2018-09-11 09:54:40 -070065.. coroutinefunction:: open_connection(host=None, port=None, \*, \
66 loop=None, limit=None, ssl=None, family=0, \
67 proto=0, flags=0, sock=None, local_addr=None, \
68 server_hostname=None, ssl_handshake_timeout=None)
69
70 Establish a network connection and return a pair of
Yury Selivanov8be876e2018-09-11 17:10:37 -070071 ``(reader, writer)`` objects.
Yury Selivanov7c7605f2018-09-11 09:54:40 -070072
73 The returned *reader* and *writer* objects are instances of
74 :class:`StreamReader` and :class:`StreamWriter` classes.
75
76 The *loop* argument is optional and can always be determined
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -040077 automatically when this function is awaited from a coroutine.
Yury Selivanov7c7605f2018-09-11 09:54:40 -070078
79 *limit* determines the buffer size limit used by the
Yury Selivanov8be876e2018-09-11 17:10:37 -070080 returned :class:`StreamReader` instance. By default the *limit*
81 is set to 64 KiB.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010082
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -040083 The rest of the arguments are passed directly to
Yury Selivanov7c7605f2018-09-11 09:54:40 -070084 :meth:`loop.create_connection`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010085
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -040086 .. versionadded:: 3.7
87
88 The *ssl_handshake_timeout* parameter.
89
Emmanuel Ariased9f3562019-05-31 17:48:57 -030090 .. deprecated-removed:: 3.8 3.10
91
Xtreak6793cce2019-06-24 23:46:58 +053092 `open_connection()` is deprecated in favor of :func:`connect`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -030093
Yury Selivanov7c7605f2018-09-11 09:54:40 -070094.. coroutinefunction:: start_server(client_connected_cb, host=None, \
Xtreak6793cce2019-06-24 23:46:58 +053095 port=None, \*, loop=None, limit=2**16, \
Yury Selivanov7c7605f2018-09-11 09:54:40 -070096 family=socket.AF_UNSPEC, \
97 flags=socket.AI_PASSIVE, sock=None, \
98 backlog=100, ssl=None, reuse_address=None, \
99 reuse_port=None, ssl_handshake_timeout=None, \
100 start_serving=True)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100101
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700102 Start a socket server.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100103
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400104 The *client_connected_cb* callback is called whenever a new client
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700105 connection is established. It receives a ``(reader, writer)`` pair
106 as two arguments, instances of the :class:`StreamReader` and
107 :class:`StreamWriter` classes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100108
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700109 *client_connected_cb* can be a plain callable or a
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400110 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400111 it will be automatically scheduled as a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100112
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700113 The *loop* argument is optional and can always be determined
114 automatically when this method is awaited from a coroutine.
115
116 *limit* determines the buffer size limit used by the
Yury Selivanov8be876e2018-09-11 17:10:37 -0700117 returned :class:`StreamReader` instance. By default the *limit*
118 is set to 64 KiB.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400119
120 The rest of the arguments are passed directly to
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700121 :meth:`loop.create_server`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100122
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400123 .. versionadded:: 3.7
124
125 The *ssl_handshake_timeout* and *start_serving* parameters.
126
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300127 .. deprecated-removed:: 3.8 3.10
128
Xtreak6793cce2019-06-24 23:46:58 +0530129 `start_server()` is deprecated if favor of :class:`StreamServer`
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300130
Xtreak6793cce2019-06-24 23:46:58 +0530131.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
132
133 Takes a :term:`file-like object <file object>` *pipe* to return a
134 :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
135 similar API of :class:`StreamReader`. It can also be used as an async context manager.
136
137 *limit* determines the buffer size limit used by the returned :class:`Stream`
138 instance. By default the limit is set to 64 KiB.
139
140 .. versionadded:: 3.8
141
142.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
143
144 Takes a :term:`file-like object <file object>` *pipe* to return a
145 :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
146 similar API of :class:`StreamWriter`. It can also be used as an async context manager.
147
148 *limit* determines the buffer size limit used by the returned :class:`Stream`
149 instance. By default the limit is set to 64 KiB.
150
151 .. versionadded:: 3.8
Yury Selivanov8be876e2018-09-11 17:10:37 -0700152
153.. rubric:: Unix Sockets
154
Xtreak6793cce2019-06-24 23:46:58 +0530155.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
156 sock=None, server_hostname=None, \
157 ssl_handshake_timeout=None)
158
159 Establish a Unix socket connection to socket with *path* address and
160 return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
161 that can be used as a reader and a writer.
162
163 *limit* determines the buffer size limit used by the returned :class:`Stream`
164 instance. By default the *limit* is set to 64 KiB.
165
166 The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
167
168 The function can be used with ``await`` to get a connected stream::
169
170 stream = await asyncio.connect_unix('/tmp/example.sock')
171
172 The function can also be used as an async context manager::
173
174 async with asyncio.connect_unix('/tmp/example.sock') as stream:
175 ...
176
177 .. availability:: Unix.
178
179 .. versionadded:: 3.8
180
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700181.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
182 limit=None, ssl=None, sock=None, \
183 server_hostname=None, ssl_handshake_timeout=None)
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500184
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400185 Establish a Unix socket connection and return a pair of
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700186 ``(reader, writer)``.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500187
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400188 Similar to :func:`open_connection` but operates on Unix sockets.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400189
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700190 See also the documentation of :meth:`loop.create_unix_connection`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500191
Cheryl Sabella2d6097d2018-10-12 10:55:20 -0400192 .. availability:: Unix.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500193
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400194 .. versionadded:: 3.7
195
196 The *ssl_handshake_timeout* parameter.
197
198 .. versionchanged:: 3.7
199
200 The *path* parameter can now be a :term:`path-like object`
201
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300202 .. deprecated-removed:: 3.8 3.10
203
Xtreak6793cce2019-06-24 23:46:58 +0530204 ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300205
Yury Selivanov8be876e2018-09-11 17:10:37 -0700206
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700207.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
208 \*, loop=None, limit=None, sock=None, \
209 backlog=100, ssl=None, ssl_handshake_timeout=None, \
210 start_serving=True)
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500211
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400212 Start a Unix socket server.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500213
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400214 Similar to :func:`start_server` but works with Unix sockets.
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400215
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700216 See also the documentation of :meth:`loop.create_unix_server`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500217
Cheryl Sabella2d6097d2018-10-12 10:55:20 -0400218 .. availability:: Unix.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500219
Elvis Pranskevichusc0d062f2018-06-08 11:36:00 -0400220 .. versionadded:: 3.7
221
222 The *ssl_handshake_timeout* and *start_serving* parameters.
223
224 .. versionchanged:: 3.7
225
226 The *path* parameter can now be a :term:`path-like object`.
227
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300228 .. deprecated-removed:: 3.8 3.10
229
Xtreak6793cce2019-06-24 23:46:58 +0530230 ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
Emmanuel Ariased9f3562019-05-31 17:48:57 -0300231
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100232
Yury Selivanov8be876e2018-09-11 17:10:37 -0700233---------
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700234
Xtreak6793cce2019-06-24 23:46:58 +0530235StreamServer
236============
237
238.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
239 limit=2**16, family=socket.AF_UNSPEC, \
240 flags=socket.AI_PASSIVE, sock=None, backlog=100, \
241 ssl=None, reuse_address=None, reuse_port=None, \
242 ssl_handshake_timeout=None, shutdown_timeout=60)
243
244 The *client_connected_cb* callback is called whenever a new client
245 connection is established. It receives a :class:`Stream` object of the
246 mode :attr:`StreamMode.READWRITE`.
247
248 *client_connected_cb* can be a plain callable or a
249 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
250 it will be automatically scheduled as a :class:`Task`.
251
252 *limit* determines the buffer size limit used by the
253 returned :class:`Stream` instance. By default the *limit*
254 is set to 64 KiB.
255
256 The rest of the arguments are passed directly to
257 :meth:`loop.create_server`.
258
259 .. coroutinemethod:: start_serving()
260
261 Binds to the given host and port to start the server.
262
263 .. coroutinemethod:: serve_forever()
264
265 Start accepting connections until the coroutine is cancelled.
266 Cancellation of ``serve_forever`` task causes the server
267 to be closed.
268
269 This method can be called if the server is already accepting
270 connections. Only one ``serve_forever`` task can exist per
271 one *Server* object.
272
273 .. method:: is_serving()
274
275 Returns ``True`` if the server is bound and currently serving.
276
277 .. method:: bind()
278
279 Bind the server to the given *host* and *port*. This method is
280 automatically called during ``__aenter__`` when :class:`StreamServer` is
281 used as an async context manager.
282
283 .. method:: is_bound()
284
285 Return ``True`` if the server is bound.
286
287 .. coroutinemethod:: abort()
288
289 Closes the connection and cancels all pending tasks.
290
291 .. coroutinemethod:: close()
292
293 Closes the connection. This method is automatically called during
294 ``__aexit__`` when :class:`StreamServer` is used as an async context
295 manager.
296
297 .. attribute:: sockets
298
299 Returns a tuple of socket objects the server is bound to.
300
301 .. versionadded:: 3.8
302
303
304UnixStreamServer
305================
306
307.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
308 limit=2**16, sock=None, backlog=100, \
309 ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
310
311 The *client_connected_cb* callback is called whenever a new client
312 connection is established. It receives a :class:`Stream` object of the
313 mode :attr:`StreamMode.READWRITE`.
314
315 *client_connected_cb* can be a plain callable or a
316 :ref:`coroutine function <coroutine>`; if it is a coroutine function,
317 it will be automatically scheduled as a :class:`Task`.
318
319 *limit* determines the buffer size limit used by the
320 returned :class:`Stream` instance. By default the *limit*
321 is set to 64 KiB.
322
323 The rest of the arguments are passed directly to
324 :meth:`loop.create_unix_server`.
325
326 .. coroutinemethod:: start_serving()
327
328 Binds to the given host and port to start the server.
329
330 .. method:: is_serving()
331
332 Returns ``True`` if the server is bound and currently serving.
333
334 .. method:: bind()
335
336 Bind the server to the given *host* and *port*. This method is
337 automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
338 used as an async context manager.
339
340 .. method:: is_bound()
341
342 Return ``True`` if the server is bound.
343
344 .. coroutinemethod:: abort()
345
346 Closes the connection and cancels all pending tasks.
347
348 .. coroutinemethod:: close()
349
350 Closes the connection. This method is automatically called during
351 ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
352 manager.
353
354 .. attribute:: sockets
355
356 Returns a tuple of socket objects the server is bound to.
357
358 .. availability:: Unix.
359
360 .. versionadded:: 3.8
361
362Stream
363======
364
365.. class:: Stream
366
367 Represents a Stream object that provides APIs to read and write data
368 to the IO stream . It includes the API provided by :class:`StreamReader`
Xtreakd31b3152019-09-13 11:52:38 +0100369 and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator`
370 where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when
371 :meth:`readline` returns empty data.
Xtreak6793cce2019-06-24 23:46:58 +0530372
373 Do not instantiate *Stream* objects directly; use API like :func:`connect`
374 and :class:`StreamServer` instead.
375
376 .. versionadded:: 3.8
377
Xtreakd31b3152019-09-13 11:52:38 +0100378 .. attribute:: mode
379
380 Returns the mode of the stream which is a :class:`StreamMode` value. It could
381 be one of the below:
382
383 * :attr:`StreamMode.READ` - Connection can receive data.
384 * :attr:`StreamMode.WRITE` - Connection can send data.
385 * :attr:`StreamMode.READWRITE` - Connection can send and receive data.
386
387 .. coroutinemethod:: abort()
388
389 Aborts the connection immediately, without waiting for the send buffer to drain.
390
391 .. method:: at_eof()
392
393 Return ``True`` if the buffer is empty.
394
395 .. method:: can_write_eof()
396
397 Return *True* if the underlying transport supports
398 the :meth:`write_eof` method, *False* otherwise.
399
400 .. method:: close()
401
402 The method closes the stream and the underlying socket.
403
404 It is possible to directly await on the `close()` method::
405
406 await stream.close()
407
408 The ``await`` pauses the current coroutine until the stream and the underlying
409 socket are closed (and SSL shutdown is performed for a secure connection).
410
411 .. coroutinemethod:: drain()
412
413 Wait until it is appropriate to resume writing to the stream.
414 Example::
415
416 stream.write(data)
417 await stream.drain()
418
419 This is a flow control method that interacts with the underlying
420 IO write buffer. When the size of the buffer reaches
421 the high watermark, *drain()* blocks until the size of the
422 buffer is drained down to the low watermark and writing can
423 be resumed. When there is nothing to wait for, the :meth:`drain`
424 returns immediately.
425
426 .. deprecated:: 3.8
427
428 It is recommended to directly await on the `write()` method instead::
429
430 await stream.write(data)
431
432 .. method:: get_extra_info(name, default=None)
433
434 Access optional transport information; see
435 :meth:`BaseTransport.get_extra_info` for details.
436
437 .. method:: is_closing()
438
439 Return ``True`` if the stream is closed or in the process of
440 being closed.
441
442 .. coroutinemethod:: read(n=-1)
443
444 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
445 read until EOF and return all read bytes.
446
447 If EOF was received and the internal buffer is empty,
448 return an empty ``bytes`` object.
449
450 .. coroutinemethod:: readexactly(n)
451
452 Read exactly *n* bytes.
453
454 Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
455 can be read. Use the :attr:`IncompleteReadError.partial`
456 attribute to get the partially read data.
457
458 .. coroutinemethod:: readline()
459
460 Read one line, where "line" is a sequence of bytes
461 ending with ``\n``.
462
463 If EOF is received and ``\n`` was not found, the method
464 returns partially read data.
465
466 If EOF is received and the internal buffer is empty,
467 return an empty ``bytes`` object.
468
469 .. coroutinemethod:: readuntil(separator=b'\\n')
470
471 Read data from the stream until *separator* is found.
472
473 On success, the data and separator will be removed from the
474 internal buffer (consumed). Returned data will include the
475 separator at the end.
476
477 If the amount of data read exceeds the configured stream limit, a
478 :exc:`LimitOverrunError` exception is raised, and the data
479 is left in the internal buffer and can be read again.
480
481 If EOF is reached before the complete separator is found,
482 an :exc:`IncompleteReadError` exception is raised, and the internal
483 buffer is reset. The :attr:`IncompleteReadError.partial` attribute
484 may contain a portion of the separator.
485
486 .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True)
487
488 Sends a *file* over the stream using an optimized syscall if available.
489
490 For other parameters meaning please see :meth:`AbstractEventloop.sendfile`.
491
492 .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \
493 ssl_handshake_timeout=None)
494
495 Upgrades the existing transport-based connection to TLS.
496
497 For other parameters meaning please see :meth:`AbstractEventloop.start_tls`.
498
499 .. coroutinemethod:: wait_closed()
500
501 Wait until the stream is closed.
502
503 Should be called after :meth:`close` to wait until the underlying
504 connection is closed.
505
506 .. coroutinemethod:: write(data)
507
508 Write *data* to the underlying socket; wait until the data is sent, e.g.::
509
510 await stream.write(data)
511
512 .. method:: write(data)
513
514 The method attempts to write the *data* to the underlying socket immediately.
515 If that fails, the data is queued in an internal write buffer until it can be
516 sent. :meth:`drain` can be used to flush the underlying buffer once writing is
517 available::
518
519 stream.write(data)
520 await stream.drain()
521
522 .. deprecated:: 3.8
523
524 It is recommended to directly await on the `write()` method instead::
525
526 await stream.write(data)
527
528 .. method:: writelines(data)
529
530 The method writes a list (or any iterable) of bytes to the underlying socket
531 immediately.
532 If that fails, the data is queued in an internal write buffer until it can be
533 sent.
534
535 It is possible to directly await on the `writelines()` method::
536
537 await stream.writelines(lines)
538
539 The ``await`` pauses the current coroutine until the data is written to the
540 socket.
541
542 .. method:: write_eof()
543
544 Close the write end of the stream after the buffered write
545 data is flushed.
546
Xtreak6793cce2019-06-24 23:46:58 +0530547
548StreamMode
549==========
550
551.. class:: StreamMode
552
553 A subclass of :class:`enum.Flag` that defines a set of values that can be
554 used to determine the ``mode`` of :class:`Stream` objects.
555
556 .. data:: READ
557
558 The stream object is readable and provides the API of :class:`StreamReader`.
559
560 .. data:: WRITE
561
562 The stream object is writeable and provides the API of :class:`StreamWriter`.
563
564 .. data:: READWRITE
565
566 The stream object is readable and writeable and provides the API of both
567 :class:`StreamReader` and :class:`StreamWriter`.
568
569 .. versionadded:: 3.8
570
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700571
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100572StreamReader
573============
574
Yury Selivanov8be876e2018-09-11 17:10:37 -0700575.. class:: StreamReader
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100576
Yury Selivanov8be876e2018-09-11 17:10:37 -0700577 Represents a reader object that provides APIs to read data
578 from the IO stream.
Victor Stinner83704962015-02-25 14:24:15 +0100579
Yury Selivanov8be876e2018-09-11 17:10:37 -0700580 It is not recommended to instantiate *StreamReader* objects
581 directly; use :func:`open_connection` and :func:`start_server`
582 instead.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100583
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100584 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100585
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500586 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
587 read until EOF and return all read bytes.
588
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400589 If EOF was received and the internal buffer is empty,
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500590 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100591
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100592 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100593
Yury Selivanov8be876e2018-09-11 17:10:37 -0700594 Read one line, where "line" is a sequence of bytes
595 ending with ``\n``.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500596
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400597 If EOF is received and ``\n`` was not found, the method
Yury Selivanov8be876e2018-09-11 17:10:37 -0700598 returns partially read data.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500599
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400600 If EOF is received and the internal buffer is empty,
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500601 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100602
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100603 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100604
Yury Selivanov8be876e2018-09-11 17:10:37 -0700605 Read exactly *n* bytes.
606
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400607 Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
Yury Selivanov8be876e2018-09-11 17:10:37 -0700608 can be read. Use the :attr:`IncompleteReadError.partial`
609 attribute to get the partially read data.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100610
Berker Peksage5b0bd12016-10-18 00:34:46 +0300611 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400612
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400613 Read data from the stream until *separator* is found.
Yury Selivanov950204d2016-05-16 16:23:00 -0400614
615 On success, the data and separator will be removed from the
616 internal buffer (consumed). Returned data will include the
617 separator at the end.
618
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400619 If the amount of data read exceeds the configured stream limit, a
620 :exc:`LimitOverrunError` exception is raised, and the data
621 is left in the internal buffer and can be read again.
Yury Selivanov950204d2016-05-16 16:23:00 -0400622
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400623 If EOF is reached before the complete separator is found,
624 an :exc:`IncompleteReadError` exception is raised, and the internal
625 buffer is reset. The :attr:`IncompleteReadError.partial` attribute
626 may contain a portion of the separator.
Yury Selivanov950204d2016-05-16 16:23:00 -0400627
628 .. versionadded:: 3.5.2
629
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500630 .. method:: at_eof()
631
Xtreakd31b3152019-09-13 11:52:38 +0100632 Return ``True`` if the buffer is empty.
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500633
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100634
635StreamWriter
636============
637
Yury Selivanov8be876e2018-09-11 17:10:37 -0700638.. class:: StreamWriter
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100639
Yury Selivanov8be876e2018-09-11 17:10:37 -0700640 Represents a writer object that provides APIs to write data
641 to the IO stream.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100642
Yury Selivanov8be876e2018-09-11 17:10:37 -0700643 It is not recommended to instantiate *StreamWriter* objects
644 directly; use :func:`open_connection` and :func:`start_server`
645 instead.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100646
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400647 .. method:: write(data)
Andrew Svetlov11194c82018-09-13 16:53:49 -0700648
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400649 The method attempts to write the *data* to the underlying socket immediately.
650 If that fails, the data is queued in an internal write buffer until it can be
651 sent.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700652
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400653 Starting with Python 3.8, it is possible to directly await on the `write()`
654 method::
Andrew Svetlov11194c82018-09-13 16:53:49 -0700655
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400656 await stream.write(data)
Andrew Svetlov11194c82018-09-13 16:53:49 -0700657
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400658 The ``await`` pauses the current coroutine until the data is written to the
659 socket.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700660
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400661 Below is an equivalent code that works with Python <= 3.7::
Andrew Svetlov11194c82018-09-13 16:53:49 -0700662
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400663 stream.write(data)
664 await stream.drain()
Andrew Svetlov11194c82018-09-13 16:53:49 -0700665
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400666 .. versionchanged:: 3.8
667 Support ``await stream.write(...)`` syntax.
668
669 .. method:: writelines(data)
670
671 The method writes a list (or any iterable) of bytes to the underlying socket
672 immediately.
673 If that fails, the data is queued in an internal write buffer until it can be
674 sent.
675
Xtreakd31b3152019-09-13 11:52:38 +0100676 Starting with Python 3.8, it is possible to directly await on the `writelines()`
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400677 method::
678
679 await stream.writelines(lines)
680
681 The ``await`` pauses the current coroutine until the data is written to the
682 socket.
683
684 Below is an equivalent code that works with Python <= 3.7::
685
686 stream.writelines(lines)
687 await stream.drain()
688
689 .. versionchanged:: 3.8
690 Support ``await stream.writelines()`` syntax.
691
692 .. method:: close()
693
694 The method closes the stream and the underlying socket.
695
696 Starting with Python 3.8, it is possible to directly await on the `close()`
697 method::
698
699 await stream.close()
700
701 The ``await`` pauses the current coroutine until the stream and the underlying
702 socket are closed (and SSL shutdown is performed for a secure connection).
703
704 Below is an equivalent code that works with Python <= 3.7::
705
706 stream.close()
707 await stream.wait_closed()
708
709 .. versionchanged:: 3.8
710 Support ``await stream.close()`` syntax.
Andrew Svetlov11194c82018-09-13 16:53:49 -0700711
712 .. method:: can_write_eof()
713
714 Return *True* if the underlying transport supports
715 the :meth:`write_eof` method, *False* otherwise.
716
717 .. method:: write_eof()
718
719 Close the write end of the stream after the buffered write
720 data is flushed.
721
722 .. attribute:: transport
723
724 Return the underlying asyncio transport.
725
726 .. method:: get_extra_info(name, default=None)
727
728 Access optional transport information; see
729 :meth:`BaseTransport.get_extra_info` for details.
730
Yury Selivanov8be876e2018-09-11 17:10:37 -0700731 .. coroutinemethod:: drain()
732
733 Wait until it is appropriate to resume writing to the stream.
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400734 Example::
Yury Selivanov8be876e2018-09-11 17:10:37 -0700735
736 writer.write(data)
737 await writer.drain()
738
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400739 This is a flow control method that interacts with the underlying
Yury Selivanov8be876e2018-09-11 17:10:37 -0700740 IO write buffer. When the size of the buffer reaches
Elvis Pranskevichus1fa2ec42018-09-17 19:16:44 -0400741 the high watermark, *drain()* blocks until the size of the
742 buffer is drained down to the low watermark and writing can
Yury Selivanov8be876e2018-09-11 17:10:37 -0700743 be resumed. When there is nothing to wait for, the :meth:`drain`
744 returns immediately.
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100745
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200746 .. method:: is_closing()
747
Yury Selivanov8be876e2018-09-11 17:10:37 -0700748 Return ``True`` if the stream is closed or in the process of
749 being closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200750
751 .. versionadded:: 3.7
752
753 .. coroutinemethod:: wait_closed()
754
Yury Selivanov8be876e2018-09-11 17:10:37 -0700755 Wait until the stream is closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200756
Yury Selivanov8be876e2018-09-11 17:10:37 -0700757 Should be called after :meth:`close` to wait until the underlying
758 connection is closed.
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200759
760 .. versionadded:: 3.7
761
Victor Stinnerc520edc2014-01-23 11:25:48 +0100762
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700763Examples
764========
Victor Stinner5121a9b2014-10-11 15:52:14 +0200765
Victor Stinnered051592014-10-12 20:18:16 +0200766.. _asyncio-tcp-echo-client-streams:
767
768TCP echo client using streams
769-----------------------------
770
Xtreak6793cce2019-06-24 23:46:58 +0530771TCP echo client using the :func:`asyncio.connect` function::
Victor Stinnered051592014-10-12 20:18:16 +0200772
773 import asyncio
774
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700775 async def tcp_echo_client(message):
Xtreak6793cce2019-06-24 23:46:58 +0530776 async with asyncio.connect('127.0.0.1', 8888) as stream:
777 print(f'Send: {message!r}')
778 await stream.write(message.encode())
Victor Stinnered051592014-10-12 20:18:16 +0200779
Xtreak6793cce2019-06-24 23:46:58 +0530780 data = await stream.read(100)
781 print(f'Received: {data.decode()!r}')
Victor Stinnered051592014-10-12 20:18:16 +0200782
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700783 asyncio.run(tcp_echo_client('Hello World!'))
784
Victor Stinnered051592014-10-12 20:18:16 +0200785
786.. seealso::
787
Yury Selivanov394374e2018-09-17 15:35:24 -0400788 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700789 example uses the low-level :meth:`loop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200790
791
792.. _asyncio-tcp-echo-server-streams:
793
794TCP echo server using streams
795-----------------------------
796
Xtreak6793cce2019-06-24 23:46:58 +0530797TCP echo server using the :class:`asyncio.StreamServer` class::
Victor Stinnered051592014-10-12 20:18:16 +0200798
799 import asyncio
800
Xtreak6793cce2019-06-24 23:46:58 +0530801 async def handle_echo(stream):
802 data = await stream.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200803 message = data.decode()
Xtreak6793cce2019-06-24 23:46:58 +0530804 addr = stream.get_extra_info('peername')
Victor Stinnered051592014-10-12 20:18:16 +0200805
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700806 print(f"Received {message!r} from {addr!r}")
807
808 print(f"Send: {message!r}")
Xtreak6793cce2019-06-24 23:46:58 +0530809 await stream.write(data)
Victor Stinnered051592014-10-12 20:18:16 +0200810
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700811 print("Close the connection")
Xtreak6793cce2019-06-24 23:46:58 +0530812 await stream.close()
Victor Stinnered051592014-10-12 20:18:16 +0200813
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700814 async def main():
Xtreak6793cce2019-06-24 23:46:58 +0530815 async with asyncio.StreamServer(
816 handle_echo, '127.0.0.1', 8888) as server:
817 addr = server.sockets[0].getsockname()
818 print(f'Serving on {addr}')
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700819 await server.serve_forever()
820
821 asyncio.run(main())
822
Victor Stinnered051592014-10-12 20:18:16 +0200823
824.. seealso::
825
Yury Selivanov394374e2018-09-17 15:35:24 -0400826 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700827 example uses the :meth:`loop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200828
829
Victor Stinner5121a9b2014-10-11 15:52:14 +0200830Get HTTP headers
831----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100832
833Simple example querying HTTP headers of the URL passed on the command line::
834
835 import asyncio
836 import urllib.parse
837 import sys
838
Mikhail Terekhovd2ac4002018-08-07 16:29:06 -0400839 async def print_http_headers(url):
Victor Stinnerc520edc2014-01-23 11:25:48 +0100840 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200841 if url.scheme == 'https':
Xtreak6793cce2019-06-24 23:46:58 +0530842 stream = await asyncio.connect(url.hostname, 443, ssl=True)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200843 else:
Xtreak6793cce2019-06-24 23:46:58 +0530844 stream = await asyncio.connect(url.hostname, 80)
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700845
846 query = (
847 f"HEAD {url.path or '/'} HTTP/1.0\r\n"
848 f"Host: {url.hostname}\r\n"
849 f"\r\n"
850 )
851
Xtreak6793cce2019-06-24 23:46:58 +0530852 stream.write(query.encode('latin-1'))
853 while (line := await stream.readline()):
Victor Stinnerc520edc2014-01-23 11:25:48 +0100854 line = line.decode('latin1').rstrip()
855 if line:
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700856 print(f'HTTP header> {line}')
Victor Stinnerc520edc2014-01-23 11:25:48 +0100857
Victor Stinner5121a9b2014-10-11 15:52:14 +0200858 # Ignore the body, close the socket
Xtreak6793cce2019-06-24 23:46:58 +0530859 await stream.close()
Victor Stinner5121a9b2014-10-11 15:52:14 +0200860
Victor Stinnerc520edc2014-01-23 11:25:48 +0100861 url = sys.argv[1]
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700862 asyncio.run(print_http_headers(url))
863
Victor Stinnerc520edc2014-01-23 11:25:48 +0100864
865Usage::
866
867 python example.py http://example.com/path/page.html
868
Victor Stinner04e6df32014-10-11 16:16:27 +0200869or with HTTPS::
870
871 python example.py https://example.com/path/page.html
872
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700873
Yury Selivanov394374e2018-09-17 15:35:24 -0400874.. _asyncio_example_create_connection-streams:
Victor Stinner04e6df32014-10-11 16:16:27 +0200875
876Register an open socket to wait for data using streams
877------------------------------------------------------
878
879Coroutine waiting until a socket receives data using the
Xtreak6793cce2019-06-24 23:46:58 +0530880:func:`asyncio.connect` function::
Victor Stinner04e6df32014-10-11 16:16:27 +0200881
882 import asyncio
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700883 import socket
Victor Stinner04e6df32014-10-11 16:16:27 +0200884
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700885 async def wait_for_data():
886 # Get a reference to the current event loop because
887 # we want to access low-level APIs.
888 loop = asyncio.get_running_loop()
Victor Stinner04e6df32014-10-11 16:16:27 +0200889
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700890 # Create a pair of connected sockets.
891 rsock, wsock = socket.socketpair()
892
893 # Register the open socket to wait for data.
Xtreak6793cce2019-06-24 23:46:58 +0530894 async with asyncio.connect(sock=rsock) as stream:
895 # Simulate the reception of data from the network
896 loop.call_soon(wsock.send, 'abc'.encode())
Victor Stinner04e6df32014-10-11 16:16:27 +0200897
Xtreak6793cce2019-06-24 23:46:58 +0530898 # Wait for data
899 data = await stream.read(100)
Victor Stinner04e6df32014-10-11 16:16:27 +0200900
Xtreak6793cce2019-06-24 23:46:58 +0530901 # Got data, we are done: close the socket
902 print("Received:", data.decode())
Victor Stinner04e6df32014-10-11 16:16:27 +0200903
904 # Close the second socket
905 wsock.close()
906
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700907 asyncio.run(wait_for_data())
Victor Stinner04e6df32014-10-11 16:16:27 +0200908
909.. seealso::
910
911 The :ref:`register an open socket to wait for data using a protocol
Yury Selivanov394374e2018-09-17 15:35:24 -0400912 <asyncio_example_create_connection>` example uses a low-level protocol and
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700913 the :meth:`loop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200914
915 The :ref:`watch a file descriptor for read events
Yury Selivanov394374e2018-09-17 15:35:24 -0400916 <asyncio_example_watch_fd>` example uses the low-level
Yury Selivanov7c7605f2018-09-11 09:54:40 -0700917 :meth:`loop.add_reader` method to watch a file descriptor.