blob: 6d5cbbc5bd533d7ab8ae01a9bf9e65997846a9e1 [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanovcba00532015-12-16 21:30:52 -05005+++++++++++++++++++++++++++++
6Streams (coroutine based API)
7+++++++++++++++++++++++++++++
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
lf627d2c82017-07-25 17:03:51 -06009**Source code:** :source:`Lib/asyncio/streams.py`
10
Victor Stinner24f8ebf2014-01-23 11:05:01 +010011Stream functions
12================
13
Guido van Rossum19ff6972015-10-19 13:18:04 -070014.. note::
15
Ned Deilyf38c93f2016-02-16 13:27:04 +110016 The top-level functions in this module are meant as convenience wrappers
Guido van Rossum19ff6972015-10-19 13:18:04 -070017 only; there's really nothing special there, and if they don't do
18 exactly what you want, feel free to copy their code.
19
20
Victor Stinnerbdd574d2015-02-12 22:49:18 +010021.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010022
Guido van Rossumf68afd82016-08-08 09:41:21 -070023 A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
Victor Stinner24f8ebf2014-01-23 11:05:01 +010024 writer) pair.
25
26 The reader returned is a :class:`StreamReader` instance; the writer is
27 a :class:`StreamWriter` instance.
28
29 The arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070030 :meth:`AbstractEventLoop.create_connection` except *protocol_factory*; most
Victor Stinner24f8ebf2014-01-23 11:05:01 +010031 common are positional host and port, with various optional keyword arguments
32 following.
33
34 Additional optional keyword arguments are *loop* (to set the event loop
35 instance to use) and *limit* (to set the buffer limit passed to the
36 :class:`StreamReader`).
37
Yury Selivanov37f15bc2014-02-20 16:20:44 -050038 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Victor Stinnerbdd574d2015-02-12 22:49:18 +010040.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010041
Victor Stinner8ebeb032014-07-11 23:47:40 +020042 Start a socket server, with a callback for each client connected. The return
Guido van Rossumf68afd82016-08-08 09:41:21 -070043 value is the same as :meth:`~AbstractEventLoop.create_server()`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010044
Victor Stinner8ebeb032014-07-11 23:47:40 +020045 The *client_connected_cb* parameter is called with two parameters:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010046 *client_reader*, *client_writer*. *client_reader* is a
47 :class:`StreamReader` object, while *client_writer* is a
Victor Stinner8ebeb032014-07-11 23:47:40 +020048 :class:`StreamWriter` object. The *client_connected_cb* parameter can
49 either be a plain callback function or a :ref:`coroutine function
50 <coroutine>`; if it is a coroutine function, it will be automatically
Victor Stinner337e03f2014-08-11 01:11:13 +020051 converted into a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010052
53 The rest of the arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070054 :meth:`~AbstractEventLoop.create_server()` except *protocol_factory*; most
Victor Stinner8ebeb032014-07-11 23:47:40 +020055 common are positional *host* and *port*, with various optional keyword
56 arguments following.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010057
58 Additional optional keyword arguments are *loop* (to set the event loop
59 instance to use) and *limit* (to set the buffer limit passed to the
60 :class:`StreamReader`).
61
Yury Selivanov37f15bc2014-02-20 16:20:44 -050062 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010063
Victor Stinnerbdd574d2015-02-12 22:49:18 +010064.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050065
Guido van Rossumf68afd82016-08-08 09:41:21 -070066 A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
Yury Selivanovd3f8e302014-02-20 14:10:02 -050067 a (reader, writer) pair.
68
69 See :func:`open_connection` for information about return value and other
70 details.
71
Yury Selivanov37f15bc2014-02-20 16:20:44 -050072 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050073
74 Availability: UNIX.
75
Victor Stinnerbdd574d2015-02-12 22:49:18 +010076.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050077
78 Start a UNIX Domain Socket server, with a callback for each client connected.
79
80 See :func:`start_server` for information about return value and other
81 details.
82
Yury Selivanov37f15bc2014-02-20 16:20:44 -050083 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050084
85 Availability: UNIX.
86
Victor Stinner24f8ebf2014-01-23 11:05:01 +010087
88StreamReader
89============
90
Victor Stinner08444382014-02-02 22:43:39 +010091.. class:: StreamReader(limit=None, loop=None)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010092
Victor Stinner83704962015-02-25 14:24:15 +010093 This class is :ref:`not thread safe <asyncio-multithreading>`.
94
Victor Stinner24f8ebf2014-01-23 11:05:01 +010095 .. method:: exception()
96
97 Get the exception.
98
99 .. method:: feed_eof()
100
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500101 Acknowledge the EOF.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100102
103 .. method:: feed_data(data)
104
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500105 Feed *data* bytes in the internal buffer. Any operations waiting
106 for the data will be resumed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100107
108 .. method:: set_exception(exc)
109
110 Set the exception.
111
112 .. method:: set_transport(transport)
113
114 Set the transport.
115
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100116 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100117
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500118 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
119 read until EOF and return all read bytes.
120
121 If the EOF was received and the internal buffer is empty,
122 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100123
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500124 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100125
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100126 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100127
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500128 Read one line, where "line" is a sequence of bytes ending with ``\n``.
129
130 If EOF is received, and ``\n`` was not found, the method will
131 return the partial read bytes.
132
133 If the EOF was received and the internal buffer is empty,
134 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100135
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500136 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100137
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100138 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100139
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100140 Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
141 the stream is reached before *n* can be read, the
142 :attr:`IncompleteReadError.partial` attribute of the exception contains
143 the partial read bytes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100144
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500145 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100146
Berker Peksage5b0bd12016-10-18 00:34:46 +0300147 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400148
149 Read data from the stream until ``separator`` is found.
150
151 On success, the data and separator will be removed from the
152 internal buffer (consumed). Returned data will include the
153 separator at the end.
154
155 Configured stream limit is used to check result. Limit sets the
156 maximal length of data that can be returned, not counting the
157 separator.
158
159 If an EOF occurs and the complete separator is still not found,
160 an :exc:`IncompleteReadError` exception will be
161 raised, and the internal buffer will be reset. The
162 :attr:`IncompleteReadError.partial` attribute may contain the
163 separator partially.
164
165 If the data cannot be read because of over limit, a
166 :exc:`LimitOverrunError` exception will be raised, and the data
167 will be left in the internal buffer, so it can be read again.
168
169 .. versionadded:: 3.5.2
170
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500171 .. method:: at_eof()
172
173 Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
174
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100175
176StreamWriter
177============
178
179.. class:: StreamWriter(transport, protocol, reader, loop)
180
181 Wraps a Transport.
182
183 This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`,
184 :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds
185 :meth:`drain` which returns an optional :class:`Future` on which you can
186 wait for flow control. It also adds a transport attribute which references
187 the :class:`Transport` directly.
188
Victor Stinner83704962015-02-25 14:24:15 +0100189 This class is :ref:`not thread safe <asyncio-multithreading>`.
190
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100191 .. attribute:: transport
192
193 Transport.
194
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100195 .. method:: can_write_eof()
196
197 Return :const:`True` if the transport supports :meth:`write_eof`,
198 :const:`False` if not. See :meth:`WriteTransport.can_write_eof`.
199
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100200 .. method:: close()
201
202 Close the transport: see :meth:`BaseTransport.close`.
203
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100204 .. coroutinemethod:: drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100205
Victor Stinnere7182972014-11-28 17:45:41 +0100206 Let the write buffer of the underlying transport a chance to be flushed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100207
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200208 The intended use is to write::
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100209
210 w.write(data)
Andrew Svetlov88743422017-12-11 17:35:49 +0200211 await w.drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100212
Victor Stinnere7182972014-11-28 17:45:41 +0100213 When the size of the transport buffer reaches the high-water limit (the
214 protocol is paused), block until the size of the buffer is drained down
215 to the low-water limit and the protocol is resumed. When there is nothing
216 to wait for, the yield-from continues immediately.
217
218 Yielding from :meth:`drain` gives the opportunity for the loop to
219 schedule the write operation and flush the buffer. It should especially
220 be used when a possibly large amount of data is written to the transport,
221 and the coroutine does not yield-from between calls to :meth:`write`.
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200222
223 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100224
225 .. method:: get_extra_info(name, default=None)
226
227 Return optional transport information: see
228 :meth:`BaseTransport.get_extra_info`.
229
230 .. method:: write(data)
231
232 Write some *data* bytes to the transport: see
233 :meth:`WriteTransport.write`.
234
235 .. method:: writelines(data)
236
237 Write a list (or any iterable) of data bytes to the transport:
238 see :meth:`WriteTransport.writelines`.
239
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100240 .. method:: write_eof()
241
242 Close the write end of the transport after flushing buffered data:
243 see :meth:`WriteTransport.write_eof`.
244
245
246StreamReaderProtocol
247====================
248
249.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
250
251 Trivial helper class to adapt between :class:`Protocol` and
Jesus Ceaded4c492016-04-19 21:50:19 +0200252 :class:`StreamReader`. Subclass of :class:`Protocol`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100253
254 *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb*
255 is an optional function called with (stream_reader, stream_writer) when a
256 connection is made, *loop* is the event loop instance to use.
257
258 (This is a helper class instead of making :class:`StreamReader` itself a
259 :class:`Protocol` subclass, because the :class:`StreamReader` has other
R David Murray87d00662015-09-27 12:36:19 -0400260 potential uses, and to prevent the user of the :class:`StreamReader` from
261 accidentally calling inappropriate methods of the protocol.)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100262
Victor Stinnerc520edc2014-01-23 11:25:48 +0100263
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100264IncompleteReadError
265===================
266
267.. exception:: IncompleteReadError
268
Victor Stinner32970b82014-01-27 12:18:49 +0100269 Incomplete read error, subclass of :exc:`EOFError`.
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100270
271 .. attribute:: expected
272
273 Total number of expected bytes (:class:`int`).
274
275 .. attribute:: partial
276
277 Read bytes string before the end of stream was reached (:class:`bytes`).
278
279
Yury Selivanov950204d2016-05-16 16:23:00 -0400280LimitOverrunError
281=================
282
283.. exception:: LimitOverrunError
284
285 Reached the buffer limit while looking for a separator.
286
287 .. attribute:: consumed
288
289 Total number of to be consumed bytes.
290
291
Victor Stinner5121a9b2014-10-11 15:52:14 +0200292Stream examples
293===============
294
Victor Stinnered051592014-10-12 20:18:16 +0200295.. _asyncio-tcp-echo-client-streams:
296
297TCP echo client using streams
298-----------------------------
299
300TCP echo client using the :func:`asyncio.open_connection` function::
301
302 import asyncio
303
Andrew Svetlov88743422017-12-11 17:35:49 +0200304 async def tcp_echo_client(message, loop):
305 reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
306 loop=loop)
Victor Stinnered051592014-10-12 20:18:16 +0200307
308 print('Send: %r' % message)
309 writer.write(message.encode())
310
Andrew Svetlov88743422017-12-11 17:35:49 +0200311 data = await reader.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200312 print('Received: %r' % data.decode())
313
314 print('Close the socket')
315 writer.close()
316
317 message = 'Hello World!'
318 loop = asyncio.get_event_loop()
319 loop.run_until_complete(tcp_echo_client(message, loop))
320 loop.close()
321
322.. seealso::
323
324 The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700325 example uses the :meth:`AbstractEventLoop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200326
327
328.. _asyncio-tcp-echo-server-streams:
329
330TCP echo server using streams
331-----------------------------
332
333TCP echo server using the :func:`asyncio.start_server` function::
334
335 import asyncio
336
Andrew Svetlov88743422017-12-11 17:35:49 +0200337 async def handle_echo(reader, writer):
338 data = await reader.read(100)
Victor Stinnered051592014-10-12 20:18:16 +0200339 message = data.decode()
340 addr = writer.get_extra_info('peername')
341 print("Received %r from %r" % (message, addr))
342
343 print("Send: %r" % message)
344 writer.write(data)
Andrew Svetlov88743422017-12-11 17:35:49 +0200345 await writer.drain()
Victor Stinnered051592014-10-12 20:18:16 +0200346
347 print("Close the client socket")
348 writer.close()
349
350 loop = asyncio.get_event_loop()
351 coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
352 server = loop.run_until_complete(coro)
353
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +0300354 # Serve requests until Ctrl+C is pressed
Victor Stinnered051592014-10-12 20:18:16 +0200355 print('Serving on {}'.format(server.sockets[0].getsockname()))
356 try:
357 loop.run_forever()
358 except KeyboardInterrupt:
359 pass
360
361 # Close the server
362 server.close()
363 loop.run_until_complete(server.wait_closed())
364 loop.close()
365
366.. seealso::
367
368 The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700369 example uses the :meth:`AbstractEventLoop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200370
371
Victor Stinner5121a9b2014-10-11 15:52:14 +0200372Get HTTP headers
373----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100374
375Simple example querying HTTP headers of the URL passed on the command line::
376
377 import asyncio
378 import urllib.parse
379 import sys
380
381 @asyncio.coroutine
382 def print_http_headers(url):
383 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200384 if url.scheme == 'https':
385 connect = asyncio.open_connection(url.hostname, 443, ssl=True)
386 else:
387 connect = asyncio.open_connection(url.hostname, 80)
Andrew Svetlov88743422017-12-11 17:35:49 +0200388 reader, writer = await connect
Victor Stinner5121a9b2014-10-11 15:52:14 +0200389 query = ('HEAD {path} HTTP/1.0\r\n'
390 'Host: {hostname}\r\n'
391 '\r\n').format(path=url.path or '/', hostname=url.hostname)
Victor Stinnerc520edc2014-01-23 11:25:48 +0100392 writer.write(query.encode('latin-1'))
393 while True:
Andrew Svetlov88743422017-12-11 17:35:49 +0200394 line = await reader.readline()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100395 if not line:
396 break
397 line = line.decode('latin1').rstrip()
398 if line:
399 print('HTTP header> %s' % line)
400
Victor Stinner5121a9b2014-10-11 15:52:14 +0200401 # Ignore the body, close the socket
402 writer.close()
403
Victor Stinnerc520edc2014-01-23 11:25:48 +0100404 url = sys.argv[1]
405 loop = asyncio.get_event_loop()
Yury Selivanovd7e19bb2015-05-11 16:33:41 -0400406 task = asyncio.ensure_future(print_http_headers(url))
Victor Stinnerc520edc2014-01-23 11:25:48 +0100407 loop.run_until_complete(task)
Victor Stinnerf40c6632014-01-28 23:32:40 +0100408 loop.close()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100409
410Usage::
411
412 python example.py http://example.com/path/page.html
413
Victor Stinner04e6df32014-10-11 16:16:27 +0200414or with HTTPS::
415
416 python example.py https://example.com/path/page.html
417
418.. _asyncio-register-socket-streams:
419
420Register an open socket to wait for data using streams
421------------------------------------------------------
422
423Coroutine waiting until a socket receives data using the
424:func:`open_connection` function::
425
426 import asyncio
Victor Stinnerac577d72017-11-28 21:33:20 +0100427 from socket import socketpair
Victor Stinner04e6df32014-10-11 16:16:27 +0200428
Andrew Svetlov88743422017-12-11 17:35:49 +0200429 async def wait_for_data(loop):
Victor Stinner04e6df32014-10-11 16:16:27 +0200430 # Create a pair of connected sockets
Victor Stinnerccd8e342014-10-11 16:30:02 +0200431 rsock, wsock = socketpair()
Victor Stinner04e6df32014-10-11 16:16:27 +0200432
433 # Register the open socket to wait for data
Andrew Svetlov88743422017-12-11 17:35:49 +0200434 reader, writer = await asyncio.open_connection(sock=rsock, loop=loop)
Victor Stinner04e6df32014-10-11 16:16:27 +0200435
436 # Simulate the reception of data from the network
437 loop.call_soon(wsock.send, 'abc'.encode())
438
439 # Wait for data
Andrew Svetlov88743422017-12-11 17:35:49 +0200440 data = await reader.read(100)
Victor Stinner04e6df32014-10-11 16:16:27 +0200441
442 # Got data, we are done: close the socket
443 print("Received:", data.decode())
444 writer.close()
445
446 # Close the second socket
447 wsock.close()
448
449 loop = asyncio.get_event_loop()
450 loop.run_until_complete(wait_for_data(loop))
451 loop.close()
452
453.. seealso::
454
455 The :ref:`register an open socket to wait for data using a protocol
456 <asyncio-register-socket>` example uses a low-level protocol created by the
Guido van Rossumf68afd82016-08-08 09:41:21 -0700457 :meth:`AbstractEventLoop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200458
459 The :ref:`watch a file descriptor for read events
460 <asyncio-watch-read-event>` example uses the low-level
Guido van Rossumf68afd82016-08-08 09:41:21 -0700461 :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
Victor Stinner04e6df32014-10-11 16:16:27 +0200462 socket.
463