blob: 099b59ee5823f43b88771ae0b2d9c694f4ea8a16 [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanovcba00532015-12-16 21:30:52 -05005+++++++++++++++++++++++++++++
6Streams (coroutine based API)
7+++++++++++++++++++++++++++++
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
lf627d2c82017-07-25 17:03:51 -06009**Source code:** :source:`Lib/asyncio/streams.py`
10
Victor Stinner24f8ebf2014-01-23 11:05:01 +010011Stream functions
12================
13
Guido van Rossum19ff6972015-10-19 13:18:04 -070014.. note::
15
Ned Deilyf38c93f2016-02-16 13:27:04 +110016 The top-level functions in this module are meant as convenience wrappers
Guido van Rossum19ff6972015-10-19 13:18:04 -070017 only; there's really nothing special there, and if they don't do
18 exactly what you want, feel free to copy their code.
19
20
Victor Stinnerbdd574d2015-02-12 22:49:18 +010021.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010022
Guido van Rossumf68afd82016-08-08 09:41:21 -070023 A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
Victor Stinner24f8ebf2014-01-23 11:05:01 +010024 writer) pair.
25
26 The reader returned is a :class:`StreamReader` instance; the writer is
27 a :class:`StreamWriter` instance.
28
29 The arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070030 :meth:`AbstractEventLoop.create_connection` except *protocol_factory*; most
Victor Stinner24f8ebf2014-01-23 11:05:01 +010031 common are positional host and port, with various optional keyword arguments
32 following.
33
34 Additional optional keyword arguments are *loop* (to set the event loop
35 instance to use) and *limit* (to set the buffer limit passed to the
36 :class:`StreamReader`).
37
Yury Selivanov37f15bc2014-02-20 16:20:44 -050038 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Victor Stinnerbdd574d2015-02-12 22:49:18 +010040.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010041
Victor Stinner8ebeb032014-07-11 23:47:40 +020042 Start a socket server, with a callback for each client connected. The return
Guido van Rossumf68afd82016-08-08 09:41:21 -070043 value is the same as :meth:`~AbstractEventLoop.create_server()`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010044
Victor Stinner8ebeb032014-07-11 23:47:40 +020045 The *client_connected_cb* parameter is called with two parameters:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010046 *client_reader*, *client_writer*. *client_reader* is a
47 :class:`StreamReader` object, while *client_writer* is a
Victor Stinner8ebeb032014-07-11 23:47:40 +020048 :class:`StreamWriter` object. The *client_connected_cb* parameter can
49 either be a plain callback function or a :ref:`coroutine function
50 <coroutine>`; if it is a coroutine function, it will be automatically
Victor Stinner337e03f2014-08-11 01:11:13 +020051 converted into a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010052
53 The rest of the arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070054 :meth:`~AbstractEventLoop.create_server()` except *protocol_factory*; most
Victor Stinner8ebeb032014-07-11 23:47:40 +020055 common are positional *host* and *port*, with various optional keyword
56 arguments following.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010057
58 Additional optional keyword arguments are *loop* (to set the event loop
59 instance to use) and *limit* (to set the buffer limit passed to the
60 :class:`StreamReader`).
61
Yury Selivanov37f15bc2014-02-20 16:20:44 -050062 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010063
Victor Stinnerbdd574d2015-02-12 22:49:18 +010064.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050065
Guido van Rossumf68afd82016-08-08 09:41:21 -070066 A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
Yury Selivanovd3f8e302014-02-20 14:10:02 -050067 a (reader, writer) pair.
68
69 See :func:`open_connection` for information about return value and other
70 details.
71
Yury Selivanov37f15bc2014-02-20 16:20:44 -050072 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050073
74 Availability: UNIX.
75
Victor Stinnerbdd574d2015-02-12 22:49:18 +010076.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050077
78 Start a UNIX Domain Socket server, with a callback for each client connected.
79
80 See :func:`start_server` for information about return value and other
81 details.
82
Yury Selivanov37f15bc2014-02-20 16:20:44 -050083 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050084
85 Availability: UNIX.
86
Victor Stinner24f8ebf2014-01-23 11:05:01 +010087
88StreamReader
89============
90
Victor Stinner08444382014-02-02 22:43:39 +010091.. class:: StreamReader(limit=None, loop=None)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010092
Victor Stinner83704962015-02-25 14:24:15 +010093 This class is :ref:`not thread safe <asyncio-multithreading>`.
94
Victor Stinner24f8ebf2014-01-23 11:05:01 +010095 .. method:: exception()
96
97 Get the exception.
98
99 .. method:: feed_eof()
100
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500101 Acknowledge the EOF.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100102
103 .. method:: feed_data(data)
104
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500105 Feed *data* bytes in the internal buffer. Any operations waiting
106 for the data will be resumed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100107
108 .. method:: set_exception(exc)
109
110 Set the exception.
111
112 .. method:: set_transport(transport)
113
114 Set the transport.
115
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100116 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100117
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500118 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
119 read until EOF and return all read bytes.
120
121 If the EOF was received and the internal buffer is empty,
122 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100123
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500124 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100125
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100126 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100127
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500128 Read one line, where "line" is a sequence of bytes ending with ``\n``.
129
130 If EOF is received, and ``\n`` was not found, the method will
131 return the partial read bytes.
132
133 If the EOF was received and the internal buffer is empty,
134 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100135
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500136 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100137
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100138 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100139
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100140 Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
141 the stream is reached before *n* can be read, the
142 :attr:`IncompleteReadError.partial` attribute of the exception contains
143 the partial read bytes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100144
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500145 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100146
Berker Peksage5b0bd12016-10-18 00:34:46 +0300147 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400148
149 Read data from the stream until ``separator`` is found.
150
151 On success, the data and separator will be removed from the
152 internal buffer (consumed). Returned data will include the
153 separator at the end.
154
155 Configured stream limit is used to check result. Limit sets the
156 maximal length of data that can be returned, not counting the
157 separator.
158
159 If an EOF occurs and the complete separator is still not found,
160 an :exc:`IncompleteReadError` exception will be
161 raised, and the internal buffer will be reset. The
162 :attr:`IncompleteReadError.partial` attribute may contain the
163 separator partially.
164
165 If the data cannot be read because of over limit, a
166 :exc:`LimitOverrunError` exception will be raised, and the data
167 will be left in the internal buffer, so it can be read again.
168
169 .. versionadded:: 3.5.2
170
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500171 .. method:: at_eof()
172
173 Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
174
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100175
176StreamWriter
177============
178
179.. class:: StreamWriter(transport, protocol, reader, loop)
180
181 Wraps a Transport.
182
183 This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`,
184 :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds
185 :meth:`drain` which returns an optional :class:`Future` on which you can
186 wait for flow control. It also adds a transport attribute which references
187 the :class:`Transport` directly.
188
Victor Stinner83704962015-02-25 14:24:15 +0100189 This class is :ref:`not thread safe <asyncio-multithreading>`.
190
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100191 .. attribute:: transport
192
193 Transport.
194
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100195 .. method:: can_write_eof()
196
197 Return :const:`True` if the transport supports :meth:`write_eof`,
198 :const:`False` if not. See :meth:`WriteTransport.can_write_eof`.
199
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100200 .. method:: close()
201
202 Close the transport: see :meth:`BaseTransport.close`.
203
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200204 .. method:: is_closing()
205
206 Return ``True`` if the writer is closing or is closed.
207
208 .. versionadded:: 3.7
209
210 .. coroutinemethod:: wait_closed()
211
212 Wait until the writer is closed.
213
214 Should be called after :meth:`close` to wait until the underlying
215 connection (and the associated transport/protocol pair) is closed.
216
217 .. versionadded:: 3.7
218
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100219 .. coroutinemethod:: drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100220
Victor Stinnere7182972014-11-28 17:45:41 +0100221 Let the write buffer of the underlying transport a chance to be flushed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100222
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200223 The intended use is to write::
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100224
225 w.write(data)
Andrew Svetlov88743422017-12-11 17:35:49 +0200226 await w.drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100227
Victor Stinnere7182972014-11-28 17:45:41 +0100228 When the size of the transport buffer reaches the high-water limit (the
229 protocol is paused), block until the size of the buffer is drained down
230 to the low-water limit and the protocol is resumed. When there is nothing
231 to wait for, the yield-from continues immediately.
232
233 Yielding from :meth:`drain` gives the opportunity for the loop to
234 schedule the write operation and flush the buffer. It should especially
235 be used when a possibly large amount of data is written to the transport,
236 and the coroutine does not yield-from between calls to :meth:`write`.
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200237
238 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100239
240 .. method:: get_extra_info(name, default=None)
241
242 Return optional transport information: see
243 :meth:`BaseTransport.get_extra_info`.
244
245 .. method:: write(data)
246
247 Write some *data* bytes to the transport: see
248 :meth:`WriteTransport.write`.
249
250 .. method:: writelines(data)
251
252 Write a list (or any iterable) of data bytes to the transport:
253 see :meth:`WriteTransport.writelines`.
254
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100255 .. method:: write_eof()
256
257 Close the write end of the transport after flushing buffered data:
258 see :meth:`WriteTransport.write_eof`.
259
260
261StreamReaderProtocol
262====================
263
264.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
265
266 Trivial helper class to adapt between :class:`Protocol` and
Jesus Ceaded4c492016-04-19 21:50:19 +0200267 :class:`StreamReader`. Subclass of :class:`Protocol`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100268
269 *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb*
270 is an optional function called with (stream_reader, stream_writer) when a
271 connection is made, *loop* is the event loop instance to use.
272
273 (This is a helper class instead of making :class:`StreamReader` itself a
274 :class:`Protocol` subclass, because the :class:`StreamReader` has other
R David Murray87d00662015-09-27 12:36:19 -0400275 potential uses, and to prevent the user of the :class:`StreamReader` from
276 accidentally calling inappropriate methods of the protocol.)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100277
Victor Stinnerc520edc2014-01-23 11:25:48 +0100278
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100279IncompleteReadError
280===================
281
282.. exception:: IncompleteReadError
283
Victor Stinner32970b82014-01-27 12:18:49 +0100284 Incomplete read error, subclass of :exc:`EOFError`.
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100285
286 .. attribute:: expected
287
288 Total number of expected bytes (:class:`int`).
289
290 .. attribute:: partial
291
292 Read bytes string before the end of stream was reached (:class:`bytes`).
293
294
Yury Selivanov950204d2016-05-16 16:23:00 -0400295LimitOverrunError
296=================
297
298.. exception:: LimitOverrunError
299
300 Reached the buffer limit while looking for a separator.
301
302 .. attribute:: consumed
303
304 Total number of to be consumed bytes.
305
306
Victor Stinner5121a9b2014-10-11 15:52:14 +0200307Stream examples
308===============
309
Victor Stinnered051592014-10-12 20:18:16 +0200310.. _asyncio-tcp-echo-client-streams:
311
312TCP echo client using streams
313-----------------------------
314
315TCP echo client using the :func:`asyncio.open_connection` function::
316
317 import asyncio
318
Andrew Svetlov88743422017-12-11 17:35:49 +0200319 async def tcp_echo_client(message, loop):
320 reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
321 loop=loop)
Victor Stinnered051592014-10-12 20:18:16 +0200322
323 print('Send: %r' % message)
324 writer.write(message.encode())
325
Andrew Svetlov88743422017-12-11 17:35:49 +0200326 data = await reader.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200327 print('Received: %r' % data.decode())
328
329 print('Close the socket')
330 writer.close()
331
332 message = 'Hello World!'
333 loop = asyncio.get_event_loop()
334 loop.run_until_complete(tcp_echo_client(message, loop))
335 loop.close()
336
337.. seealso::
338
339 The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700340 example uses the :meth:`AbstractEventLoop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200341
342
343.. _asyncio-tcp-echo-server-streams:
344
345TCP echo server using streams
346-----------------------------
347
348TCP echo server using the :func:`asyncio.start_server` function::
349
350 import asyncio
351
Andrew Svetlov88743422017-12-11 17:35:49 +0200352 async def handle_echo(reader, writer):
353 data = await reader.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200354 message = data.decode()
355 addr = writer.get_extra_info('peername')
356 print("Received %r from %r" % (message, addr))
357
358 print("Send: %r" % message)
359 writer.write(data)
Andrew Svetlov88743422017-12-11 17:35:49 +0200360 await writer.drain()
Victor Stinnered051592014-10-12 20:18:16 +0200361
362 print("Close the client socket")
363 writer.close()
364
365 loop = asyncio.get_event_loop()
366 coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
367 server = loop.run_until_complete(coro)
368
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +0300369 # Serve requests until Ctrl+C is pressed
Victor Stinnered051592014-10-12 20:18:16 +0200370 print('Serving on {}'.format(server.sockets[0].getsockname()))
371 try:
372 loop.run_forever()
373 except KeyboardInterrupt:
374 pass
375
376 # Close the server
377 server.close()
378 loop.run_until_complete(server.wait_closed())
379 loop.close()
380
381.. seealso::
382
383 The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700384 example uses the :meth:`AbstractEventLoop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200385
386
Victor Stinner5121a9b2014-10-11 15:52:14 +0200387Get HTTP headers
388----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100389
390Simple example querying HTTP headers of the URL passed on the command line::
391
392 import asyncio
393 import urllib.parse
394 import sys
395
396 @asyncio.coroutine
397 def print_http_headers(url):
398 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200399 if url.scheme == 'https':
400 connect = asyncio.open_connection(url.hostname, 443, ssl=True)
401 else:
402 connect = asyncio.open_connection(url.hostname, 80)
Andrew Svetlov88743422017-12-11 17:35:49 +0200403 reader, writer = await connect
Victor Stinner5121a9b2014-10-11 15:52:14 +0200404 query = ('HEAD {path} HTTP/1.0\r\n'
405 'Host: {hostname}\r\n'
406 '\r\n').format(path=url.path or '/', hostname=url.hostname)
Victor Stinnerc520edc2014-01-23 11:25:48 +0100407 writer.write(query.encode('latin-1'))
408 while True:
Andrew Svetlov88743422017-12-11 17:35:49 +0200409 line = await reader.readline()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100410 if not line:
411 break
412 line = line.decode('latin1').rstrip()
413 if line:
414 print('HTTP header> %s' % line)
415
Victor Stinner5121a9b2014-10-11 15:52:14 +0200416 # Ignore the body, close the socket
417 writer.close()
418
Victor Stinnerc520edc2014-01-23 11:25:48 +0100419 url = sys.argv[1]
420 loop = asyncio.get_event_loop()
Yury Selivanovd7e19bb2015-05-11 16:33:41 -0400421 task = asyncio.ensure_future(print_http_headers(url))
Victor Stinnerc520edc2014-01-23 11:25:48 +0100422 loop.run_until_complete(task)
Victor Stinnerf40c6632014-01-28 23:32:40 +0100423 loop.close()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100424
425Usage::
426
427 python example.py http://example.com/path/page.html
428
Victor Stinner04e6df32014-10-11 16:16:27 +0200429or with HTTPS::
430
431 python example.py https://example.com/path/page.html
432
433.. _asyncio-register-socket-streams:
434
435Register an open socket to wait for data using streams
436------------------------------------------------------
437
438Coroutine waiting until a socket receives data using the
439:func:`open_connection` function::
440
441 import asyncio
Victor Stinnerac577d72017-11-28 21:33:20 +0100442 from socket import socketpair
Victor Stinner04e6df32014-10-11 16:16:27 +0200443
Andrew Svetlov88743422017-12-11 17:35:49 +0200444 async def wait_for_data(loop):
Victor Stinner04e6df32014-10-11 16:16:27 +0200445 # Create a pair of connected sockets
Victor Stinnerccd8e342014-10-11 16:30:02 +0200446 rsock, wsock = socketpair()
Victor Stinner04e6df32014-10-11 16:16:27 +0200447
448 # Register the open socket to wait for data
Andrew Svetlov88743422017-12-11 17:35:49 +0200449 reader, writer = await asyncio.open_connection(sock=rsock, loop=loop)
Victor Stinner04e6df32014-10-11 16:16:27 +0200450
451 # Simulate the reception of data from the network
452 loop.call_soon(wsock.send, 'abc'.encode())
453
454 # Wait for data
Andrew Svetlov88743422017-12-11 17:35:49 +0200455 data = await reader.read(100)
Victor Stinner04e6df32014-10-11 16:16:27 +0200456
457 # Got data, we are done: close the socket
458 print("Received:", data.decode())
459 writer.close()
460
461 # Close the second socket
462 wsock.close()
463
464 loop = asyncio.get_event_loop()
465 loop.run_until_complete(wait_for_data(loop))
466 loop.close()
467
468.. seealso::
469
470 The :ref:`register an open socket to wait for data using a protocol
471 <asyncio-register-socket>` example uses a low-level protocol created by the
Guido van Rossumf68afd82016-08-08 09:41:21 -0700472 :meth:`AbstractEventLoop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200473
474 The :ref:`watch a file descriptor for read events
475 <asyncio-watch-read-event>` example uses the low-level
Guido van Rossumf68afd82016-08-08 09:41:21 -0700476 :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
Victor Stinner04e6df32014-10-11 16:16:27 +0200477 socket.
478