blob: 491afdd610ca0864867014f4e5ae435531b6e6c5 [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanovcba00532015-12-16 21:30:52 -05005+++++++++++++++++++++++++++++
6Streams (coroutine based API)
7+++++++++++++++++++++++++++++
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
lf627d2c82017-07-25 17:03:51 -06009**Source code:** :source:`Lib/asyncio/streams.py`
10
Victor Stinner24f8ebf2014-01-23 11:05:01 +010011Stream functions
12================
13
Guido van Rossum19ff6972015-10-19 13:18:04 -070014.. note::
15
Ned Deilyf38c93f2016-02-16 13:27:04 +110016 The top-level functions in this module are meant as convenience wrappers
Guido van Rossum19ff6972015-10-19 13:18:04 -070017 only; there's really nothing special there, and if they don't do
18 exactly what you want, feel free to copy their code.
19
20
Victor Stinnerbdd574d2015-02-12 22:49:18 +010021.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010022
Guido van Rossumf68afd82016-08-08 09:41:21 -070023 A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
Victor Stinner24f8ebf2014-01-23 11:05:01 +010024 writer) pair.
25
26 The reader returned is a :class:`StreamReader` instance; the writer is
27 a :class:`StreamWriter` instance.
28
29 The arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070030 :meth:`AbstractEventLoop.create_connection` except *protocol_factory*; most
Victor Stinner24f8ebf2014-01-23 11:05:01 +010031 common are positional host and port, with various optional keyword arguments
32 following.
33
34 Additional optional keyword arguments are *loop* (to set the event loop
35 instance to use) and *limit* (to set the buffer limit passed to the
36 :class:`StreamReader`).
37
Yury Selivanov37f15bc2014-02-20 16:20:44 -050038 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Victor Stinnerbdd574d2015-02-12 22:49:18 +010040.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010041
Victor Stinner8ebeb032014-07-11 23:47:40 +020042 Start a socket server, with a callback for each client connected. The return
Guido van Rossumf68afd82016-08-08 09:41:21 -070043 value is the same as :meth:`~AbstractEventLoop.create_server()`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010044
Victor Stinner8ebeb032014-07-11 23:47:40 +020045 The *client_connected_cb* parameter is called with two parameters:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010046 *client_reader*, *client_writer*. *client_reader* is a
47 :class:`StreamReader` object, while *client_writer* is a
Victor Stinner8ebeb032014-07-11 23:47:40 +020048 :class:`StreamWriter` object. The *client_connected_cb* parameter can
49 either be a plain callback function or a :ref:`coroutine function
50 <coroutine>`; if it is a coroutine function, it will be automatically
Victor Stinner337e03f2014-08-11 01:11:13 +020051 converted into a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010052
53 The rest of the arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070054 :meth:`~AbstractEventLoop.create_server()` except *protocol_factory*; most
Victor Stinner8ebeb032014-07-11 23:47:40 +020055 common are positional *host* and *port*, with various optional keyword
56 arguments following.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010057
58 Additional optional keyword arguments are *loop* (to set the event loop
59 instance to use) and *limit* (to set the buffer limit passed to the
60 :class:`StreamReader`).
61
Yury Selivanov37f15bc2014-02-20 16:20:44 -050062 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010063
Victor Stinnerbdd574d2015-02-12 22:49:18 +010064.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050065
Guido van Rossumf68afd82016-08-08 09:41:21 -070066 A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
Yury Selivanovd3f8e302014-02-20 14:10:02 -050067 a (reader, writer) pair.
68
69 See :func:`open_connection` for information about return value and other
70 details.
71
Yury Selivanov37f15bc2014-02-20 16:20:44 -050072 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050073
74 Availability: UNIX.
75
Victor Stinnerbdd574d2015-02-12 22:49:18 +010076.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050077
78 Start a UNIX Domain Socket server, with a callback for each client connected.
79
80 See :func:`start_server` for information about return value and other
81 details.
82
Yury Selivanov37f15bc2014-02-20 16:20:44 -050083 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050084
85 Availability: UNIX.
86
Victor Stinner24f8ebf2014-01-23 11:05:01 +010087
88StreamReader
89============
90
Victor Stinner08444382014-02-02 22:43:39 +010091.. class:: StreamReader(limit=None, loop=None)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010092
Victor Stinner83704962015-02-25 14:24:15 +010093 This class is :ref:`not thread safe <asyncio-multithreading>`.
94
Victor Stinner24f8ebf2014-01-23 11:05:01 +010095 .. method:: exception()
96
97 Get the exception.
98
99 .. method:: feed_eof()
100
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500101 Acknowledge the EOF.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100102
103 .. method:: feed_data(data)
104
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500105 Feed *data* bytes in the internal buffer. Any operations waiting
106 for the data will be resumed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100107
108 .. method:: set_exception(exc)
109
110 Set the exception.
111
112 .. method:: set_transport(transport)
113
114 Set the transport.
115
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100116 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100117
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500118 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
119 read until EOF and return all read bytes.
120
121 If the EOF was received and the internal buffer is empty,
122 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100123
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500124 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100125
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100126 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100127
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500128 Read one line, where "line" is a sequence of bytes ending with ``\n``.
129
130 If EOF is received, and ``\n`` was not found, the method will
131 return the partial read bytes.
132
133 If the EOF was received and the internal buffer is empty,
134 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100135
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500136 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100137
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100138 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100139
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100140 Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
141 the stream is reached before *n* can be read, the
142 :attr:`IncompleteReadError.partial` attribute of the exception contains
143 the partial read bytes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100144
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500145 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100146
Berker Peksage5b0bd12016-10-18 00:34:46 +0300147 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400148
149 Read data from the stream until ``separator`` is found.
150
151 On success, the data and separator will be removed from the
152 internal buffer (consumed). Returned data will include the
153 separator at the end.
154
155 Configured stream limit is used to check result. Limit sets the
156 maximal length of data that can be returned, not counting the
157 separator.
158
159 If an EOF occurs and the complete separator is still not found,
160 an :exc:`IncompleteReadError` exception will be
161 raised, and the internal buffer will be reset. The
162 :attr:`IncompleteReadError.partial` attribute may contain the
163 separator partially.
164
165 If the data cannot be read because of over limit, a
166 :exc:`LimitOverrunError` exception will be raised, and the data
167 will be left in the internal buffer, so it can be read again.
168
169 .. versionadded:: 3.5.2
170
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500171 .. method:: at_eof()
172
173 Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
174
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100175
176StreamWriter
177============
178
179.. class:: StreamWriter(transport, protocol, reader, loop)
180
181 Wraps a Transport.
182
183 This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`,
184 :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds
185 :meth:`drain` which returns an optional :class:`Future` on which you can
186 wait for flow control. It also adds a transport attribute which references
187 the :class:`Transport` directly.
188
Victor Stinner83704962015-02-25 14:24:15 +0100189 This class is :ref:`not thread safe <asyncio-multithreading>`.
190
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100191 .. attribute:: transport
192
193 Transport.
194
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100195 .. method:: can_write_eof()
196
197 Return :const:`True` if the transport supports :meth:`write_eof`,
198 :const:`False` if not. See :meth:`WriteTransport.can_write_eof`.
199
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100200 .. method:: close()
201
202 Close the transport: see :meth:`BaseTransport.close`.
203
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100204 .. coroutinemethod:: drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100205
Victor Stinnere7182972014-11-28 17:45:41 +0100206 Let the write buffer of the underlying transport a chance to be flushed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100207
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200208 The intended use is to write::
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100209
210 w.write(data)
211 yield from w.drain()
212
Victor Stinnere7182972014-11-28 17:45:41 +0100213 When the size of the transport buffer reaches the high-water limit (the
214 protocol is paused), block until the size of the buffer is drained down
215 to the low-water limit and the protocol is resumed. When there is nothing
216 to wait for, the yield-from continues immediately.
217
218 Yielding from :meth:`drain` gives the opportunity for the loop to
219 schedule the write operation and flush the buffer. It should especially
220 be used when a possibly large amount of data is written to the transport,
221 and the coroutine does not yield-from between calls to :meth:`write`.
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200222
223 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100224
225 .. method:: get_extra_info(name, default=None)
226
227 Return optional transport information: see
228 :meth:`BaseTransport.get_extra_info`.
229
230 .. method:: write(data)
231
232 Write some *data* bytes to the transport: see
233 :meth:`WriteTransport.write`.
234
235 .. method:: writelines(data)
236
237 Write a list (or any iterable) of data bytes to the transport:
238 see :meth:`WriteTransport.writelines`.
239
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100240 .. method:: write_eof()
241
242 Close the write end of the transport after flushing buffered data:
243 see :meth:`WriteTransport.write_eof`.
244
245
246StreamReaderProtocol
247====================
248
249.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
250
251 Trivial helper class to adapt between :class:`Protocol` and
Jesus Ceaded4c492016-04-19 21:50:19 +0200252 :class:`StreamReader`. Subclass of :class:`Protocol`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100253
254 *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb*
255 is an optional function called with (stream_reader, stream_writer) when a
256 connection is made, *loop* is the event loop instance to use.
257
258 (This is a helper class instead of making :class:`StreamReader` itself a
259 :class:`Protocol` subclass, because the :class:`StreamReader` has other
R David Murray87d00662015-09-27 12:36:19 -0400260 potential uses, and to prevent the user of the :class:`StreamReader` from
261 accidentally calling inappropriate methods of the protocol.)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100262
Victor Stinnerc520edc2014-01-23 11:25:48 +0100263
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100264IncompleteReadError
265===================
266
267.. exception:: IncompleteReadError
268
Victor Stinner32970b82014-01-27 12:18:49 +0100269 Incomplete read error, subclass of :exc:`EOFError`.
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100270
271 .. attribute:: expected
272
273 Total number of expected bytes (:class:`int`).
274
275 .. attribute:: partial
276
277 Read bytes string before the end of stream was reached (:class:`bytes`).
278
279
Yury Selivanov950204d2016-05-16 16:23:00 -0400280LimitOverrunError
281=================
282
283.. exception:: LimitOverrunError
284
285 Reached the buffer limit while looking for a separator.
286
287 .. attribute:: consumed
288
289 Total number of to be consumed bytes.
290
291
Victor Stinner5121a9b2014-10-11 15:52:14 +0200292Stream examples
293===============
294
Victor Stinnered051592014-10-12 20:18:16 +0200295.. _asyncio-tcp-echo-client-streams:
296
297TCP echo client using streams
298-----------------------------
299
300TCP echo client using the :func:`asyncio.open_connection` function::
301
302 import asyncio
303
Victor Stinnered8e3a92014-10-13 00:55:50 +0200304 @asyncio.coroutine
Victor Stinnered051592014-10-12 20:18:16 +0200305 def tcp_echo_client(message, loop):
306 reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
307 loop=loop)
308
309 print('Send: %r' % message)
310 writer.write(message.encode())
311
312 data = yield from reader.read(100)
313 print('Received: %r' % data.decode())
314
315 print('Close the socket')
316 writer.close()
317
318 message = 'Hello World!'
319 loop = asyncio.get_event_loop()
320 loop.run_until_complete(tcp_echo_client(message, loop))
321 loop.close()
322
323.. seealso::
324
325 The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700326 example uses the :meth:`AbstractEventLoop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200327
328
329.. _asyncio-tcp-echo-server-streams:
330
331TCP echo server using streams
332-----------------------------
333
334TCP echo server using the :func:`asyncio.start_server` function::
335
336 import asyncio
337
338 @asyncio.coroutine
339 def handle_echo(reader, writer):
340 data = yield from reader.read(100)
341 message = data.decode()
342 addr = writer.get_extra_info('peername')
343 print("Received %r from %r" % (message, addr))
344
345 print("Send: %r" % message)
346 writer.write(data)
347 yield from writer.drain()
348
349 print("Close the client socket")
350 writer.close()
351
352 loop = asyncio.get_event_loop()
353 coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
354 server = loop.run_until_complete(coro)
355
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +0300356 # Serve requests until Ctrl+C is pressed
Victor Stinnered051592014-10-12 20:18:16 +0200357 print('Serving on {}'.format(server.sockets[0].getsockname()))
358 try:
359 loop.run_forever()
360 except KeyboardInterrupt:
361 pass
362
363 # Close the server
364 server.close()
365 loop.run_until_complete(server.wait_closed())
366 loop.close()
367
368.. seealso::
369
370 The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700371 example uses the :meth:`AbstractEventLoop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200372
373
Victor Stinner5121a9b2014-10-11 15:52:14 +0200374Get HTTP headers
375----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100376
377Simple example querying HTTP headers of the URL passed on the command line::
378
379 import asyncio
380 import urllib.parse
381 import sys
382
383 @asyncio.coroutine
384 def print_http_headers(url):
385 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200386 if url.scheme == 'https':
387 connect = asyncio.open_connection(url.hostname, 443, ssl=True)
388 else:
389 connect = asyncio.open_connection(url.hostname, 80)
390 reader, writer = yield from connect
391 query = ('HEAD {path} HTTP/1.0\r\n'
392 'Host: {hostname}\r\n'
393 '\r\n').format(path=url.path or '/', hostname=url.hostname)
Victor Stinnerc520edc2014-01-23 11:25:48 +0100394 writer.write(query.encode('latin-1'))
395 while True:
396 line = yield from reader.readline()
397 if not line:
398 break
399 line = line.decode('latin1').rstrip()
400 if line:
401 print('HTTP header> %s' % line)
402
Victor Stinner5121a9b2014-10-11 15:52:14 +0200403 # Ignore the body, close the socket
404 writer.close()
405
Victor Stinnerc520edc2014-01-23 11:25:48 +0100406 url = sys.argv[1]
407 loop = asyncio.get_event_loop()
Yury Selivanovd7e19bb2015-05-11 16:33:41 -0400408 task = asyncio.ensure_future(print_http_headers(url))
Victor Stinnerc520edc2014-01-23 11:25:48 +0100409 loop.run_until_complete(task)
Victor Stinnerf40c6632014-01-28 23:32:40 +0100410 loop.close()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100411
412Usage::
413
414 python example.py http://example.com/path/page.html
415
Victor Stinner04e6df32014-10-11 16:16:27 +0200416or with HTTPS::
417
418 python example.py https://example.com/path/page.html
419
420.. _asyncio-register-socket-streams:
421
422Register an open socket to wait for data using streams
423------------------------------------------------------
424
425Coroutine waiting until a socket receives data using the
426:func:`open_connection` function::
427
428 import asyncio
Victor Stinnerccd8e342014-10-11 16:30:02 +0200429 try:
430 from socket import socketpair
431 except ImportError:
432 from asyncio.windows_utils import socketpair
Victor Stinner04e6df32014-10-11 16:16:27 +0200433
Victor Stinnered8e3a92014-10-13 00:55:50 +0200434 @asyncio.coroutine
Victor Stinner04e6df32014-10-11 16:16:27 +0200435 def wait_for_data(loop):
436 # Create a pair of connected sockets
Victor Stinnerccd8e342014-10-11 16:30:02 +0200437 rsock, wsock = socketpair()
Victor Stinner04e6df32014-10-11 16:16:27 +0200438
439 # Register the open socket to wait for data
440 reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
441
442 # Simulate the reception of data from the network
443 loop.call_soon(wsock.send, 'abc'.encode())
444
445 # Wait for data
446 data = yield from reader.read(100)
447
448 # Got data, we are done: close the socket
449 print("Received:", data.decode())
450 writer.close()
451
452 # Close the second socket
453 wsock.close()
454
455 loop = asyncio.get_event_loop()
456 loop.run_until_complete(wait_for_data(loop))
457 loop.close()
458
459.. seealso::
460
461 The :ref:`register an open socket to wait for data using a protocol
462 <asyncio-register-socket>` example uses a low-level protocol created by the
Guido van Rossumf68afd82016-08-08 09:41:21 -0700463 :meth:`AbstractEventLoop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200464
465 The :ref:`watch a file descriptor for read events
466 <asyncio-watch-read-event>` example uses the low-level
Guido van Rossumf68afd82016-08-08 09:41:21 -0700467 :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
Victor Stinner04e6df32014-10-11 16:16:27 +0200468 socket.
469