blob: 6177b4bb0f8b14fa53aa868e893b507a1274ea59 [file] [log] [blame]
Victor Stinner24f8ebf2014-01-23 11:05:01 +01001.. currentmodule:: asyncio
2
Victor Stinner9592edb2014-02-02 15:03:02 +01003.. _asyncio-streams:
Victor Stinner4b4f9eb2014-01-24 17:33:20 +01004
Yury Selivanovcba00532015-12-16 21:30:52 -05005+++++++++++++++++++++++++++++
6Streams (coroutine based API)
7+++++++++++++++++++++++++++++
Victor Stinner24f8ebf2014-01-23 11:05:01 +01008
9Stream functions
10================
11
Guido van Rossum19ff6972015-10-19 13:18:04 -070012.. note::
13
Ned Deilyf38c93f2016-02-16 13:27:04 +110014 The top-level functions in this module are meant as convenience wrappers
Guido van Rossum19ff6972015-10-19 13:18:04 -070015 only; there's really nothing special there, and if they don't do
16 exactly what you want, feel free to copy their code.
17
18
Victor Stinnerbdd574d2015-02-12 22:49:18 +010019.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010020
Guido van Rossumf68afd82016-08-08 09:41:21 -070021 A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
Victor Stinner24f8ebf2014-01-23 11:05:01 +010022 writer) pair.
23
24 The reader returned is a :class:`StreamReader` instance; the writer is
25 a :class:`StreamWriter` instance.
26
27 The arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070028 :meth:`AbstractEventLoop.create_connection` except *protocol_factory*; most
Victor Stinner24f8ebf2014-01-23 11:05:01 +010029 common are positional host and port, with various optional keyword arguments
30 following.
31
32 Additional optional keyword arguments are *loop* (to set the event loop
33 instance to use) and *limit* (to set the buffer limit passed to the
34 :class:`StreamReader`).
35
Yury Selivanov37f15bc2014-02-20 16:20:44 -050036 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010037
Victor Stinnerbdd574d2015-02-12 22:49:18 +010038.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, \*\*kwds)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010039
Victor Stinner8ebeb032014-07-11 23:47:40 +020040 Start a socket server, with a callback for each client connected. The return
Guido van Rossumf68afd82016-08-08 09:41:21 -070041 value is the same as :meth:`~AbstractEventLoop.create_server()`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010042
Victor Stinner8ebeb032014-07-11 23:47:40 +020043 The *client_connected_cb* parameter is called with two parameters:
Victor Stinner24f8ebf2014-01-23 11:05:01 +010044 *client_reader*, *client_writer*. *client_reader* is a
45 :class:`StreamReader` object, while *client_writer* is a
Victor Stinner8ebeb032014-07-11 23:47:40 +020046 :class:`StreamWriter` object. The *client_connected_cb* parameter can
47 either be a plain callback function or a :ref:`coroutine function
48 <coroutine>`; if it is a coroutine function, it will be automatically
Victor Stinner337e03f2014-08-11 01:11:13 +020049 converted into a :class:`Task`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010050
51 The rest of the arguments are all the usual arguments to
Guido van Rossumf68afd82016-08-08 09:41:21 -070052 :meth:`~AbstractEventLoop.create_server()` except *protocol_factory*; most
Victor Stinner8ebeb032014-07-11 23:47:40 +020053 common are positional *host* and *port*, with various optional keyword
54 arguments following.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010055
56 Additional optional keyword arguments are *loop* (to set the event loop
57 instance to use) and *limit* (to set the buffer limit passed to the
58 :class:`StreamReader`).
59
Yury Selivanov37f15bc2014-02-20 16:20:44 -050060 This function is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +010061
Victor Stinnerbdd574d2015-02-12 22:49:18 +010062.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050063
Guido van Rossumf68afd82016-08-08 09:41:21 -070064 A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
Yury Selivanovd3f8e302014-02-20 14:10:02 -050065 a (reader, writer) pair.
66
67 See :func:`open_connection` for information about return value and other
68 details.
69
Yury Selivanov37f15bc2014-02-20 16:20:44 -050070 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050071
72 Availability: UNIX.
73
Victor Stinnerbdd574d2015-02-12 22:49:18 +010074.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, **kwds)
Yury Selivanovd3f8e302014-02-20 14:10:02 -050075
76 Start a UNIX Domain Socket server, with a callback for each client connected.
77
78 See :func:`start_server` for information about return value and other
79 details.
80
Yury Selivanov37f15bc2014-02-20 16:20:44 -050081 This function is a :ref:`coroutine <coroutine>`.
Yury Selivanovd3f8e302014-02-20 14:10:02 -050082
83 Availability: UNIX.
84
Victor Stinner24f8ebf2014-01-23 11:05:01 +010085
86StreamReader
87============
88
Victor Stinner08444382014-02-02 22:43:39 +010089.. class:: StreamReader(limit=None, loop=None)
Victor Stinner24f8ebf2014-01-23 11:05:01 +010090
Victor Stinner83704962015-02-25 14:24:15 +010091 This class is :ref:`not thread safe <asyncio-multithreading>`.
92
Victor Stinner24f8ebf2014-01-23 11:05:01 +010093 .. method:: exception()
94
95 Get the exception.
96
97 .. method:: feed_eof()
98
Yury Selivanovd3f8e302014-02-20 14:10:02 -050099 Acknowledge the EOF.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100100
101 .. method:: feed_data(data)
102
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500103 Feed *data* bytes in the internal buffer. Any operations waiting
104 for the data will be resumed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100105
106 .. method:: set_exception(exc)
107
108 Set the exception.
109
110 .. method:: set_transport(transport)
111
112 Set the transport.
113
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100114 .. coroutinemethod:: read(n=-1)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100115
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500116 Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
117 read until EOF and return all read bytes.
118
119 If the EOF was received and the internal buffer is empty,
120 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100121
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500122 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100123
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100124 .. coroutinemethod:: readline()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100125
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500126 Read one line, where "line" is a sequence of bytes ending with ``\n``.
127
128 If EOF is received, and ``\n`` was not found, the method will
129 return the partial read bytes.
130
131 If the EOF was received and the internal buffer is empty,
132 return an empty ``bytes`` object.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100133
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500134 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100135
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100136 .. coroutinemethod:: readexactly(n)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100137
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100138 Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
139 the stream is reached before *n* can be read, the
140 :attr:`IncompleteReadError.partial` attribute of the exception contains
141 the partial read bytes.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100142
Yury Selivanov37f15bc2014-02-20 16:20:44 -0500143 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100144
Berker Peksage5b0bd12016-10-18 00:34:46 +0300145 .. coroutinemethod:: readuntil(separator=b'\\n')
Yury Selivanov950204d2016-05-16 16:23:00 -0400146
147 Read data from the stream until ``separator`` is found.
148
149 On success, the data and separator will be removed from the
150 internal buffer (consumed). Returned data will include the
151 separator at the end.
152
153 Configured stream limit is used to check result. Limit sets the
154 maximal length of data that can be returned, not counting the
155 separator.
156
157 If an EOF occurs and the complete separator is still not found,
158 an :exc:`IncompleteReadError` exception will be
159 raised, and the internal buffer will be reset. The
160 :attr:`IncompleteReadError.partial` attribute may contain the
161 separator partially.
162
163 If the data cannot be read because of over limit, a
164 :exc:`LimitOverrunError` exception will be raised, and the data
165 will be left in the internal buffer, so it can be read again.
166
167 .. versionadded:: 3.5.2
168
Yury Selivanovd3f8e302014-02-20 14:10:02 -0500169 .. method:: at_eof()
170
171 Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
172
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100173
174StreamWriter
175============
176
177.. class:: StreamWriter(transport, protocol, reader, loop)
178
179 Wraps a Transport.
180
181 This exposes :meth:`write`, :meth:`writelines`, :meth:`can_write_eof()`,
182 :meth:`write_eof`, :meth:`get_extra_info` and :meth:`close`. It adds
183 :meth:`drain` which returns an optional :class:`Future` on which you can
184 wait for flow control. It also adds a transport attribute which references
185 the :class:`Transport` directly.
186
Victor Stinner83704962015-02-25 14:24:15 +0100187 This class is :ref:`not thread safe <asyncio-multithreading>`.
188
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100189 .. attribute:: transport
190
191 Transport.
192
Victor Stinnerffbe3c62014-02-08 22:50:07 +0100193 .. method:: can_write_eof()
194
195 Return :const:`True` if the transport supports :meth:`write_eof`,
196 :const:`False` if not. See :meth:`WriteTransport.can_write_eof`.
197
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100198 .. method:: close()
199
200 Close the transport: see :meth:`BaseTransport.close`.
201
Victor Stinnerbdd574d2015-02-12 22:49:18 +0100202 .. coroutinemethod:: drain()
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100203
Victor Stinnere7182972014-11-28 17:45:41 +0100204 Let the write buffer of the underlying transport a chance to be flushed.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100205
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200206 The intended use is to write::
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100207
208 w.write(data)
209 yield from w.drain()
210
Victor Stinnere7182972014-11-28 17:45:41 +0100211 When the size of the transport buffer reaches the high-water limit (the
212 protocol is paused), block until the size of the buffer is drained down
213 to the low-water limit and the protocol is resumed. When there is nothing
214 to wait for, the yield-from continues immediately.
215
216 Yielding from :meth:`drain` gives the opportunity for the loop to
217 schedule the write operation and flush the buffer. It should especially
218 be used when a possibly large amount of data is written to the transport,
219 and the coroutine does not yield-from between calls to :meth:`write`.
Victor Stinnerd71dcbb2014-08-25 17:04:12 +0200220
221 This method is a :ref:`coroutine <coroutine>`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100222
223 .. method:: get_extra_info(name, default=None)
224
225 Return optional transport information: see
226 :meth:`BaseTransport.get_extra_info`.
227
228 .. method:: write(data)
229
230 Write some *data* bytes to the transport: see
231 :meth:`WriteTransport.write`.
232
233 .. method:: writelines(data)
234
235 Write a list (or any iterable) of data bytes to the transport:
236 see :meth:`WriteTransport.writelines`.
237
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100238 .. method:: write_eof()
239
240 Close the write end of the transport after flushing buffered data:
241 see :meth:`WriteTransport.write_eof`.
242
243
244StreamReaderProtocol
245====================
246
247.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
248
249 Trivial helper class to adapt between :class:`Protocol` and
Jesus Ceaded4c492016-04-19 21:50:19 +0200250 :class:`StreamReader`. Subclass of :class:`Protocol`.
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100251
252 *stream_reader* is a :class:`StreamReader` instance, *client_connected_cb*
253 is an optional function called with (stream_reader, stream_writer) when a
254 connection is made, *loop* is the event loop instance to use.
255
256 (This is a helper class instead of making :class:`StreamReader` itself a
257 :class:`Protocol` subclass, because the :class:`StreamReader` has other
R David Murray87d00662015-09-27 12:36:19 -0400258 potential uses, and to prevent the user of the :class:`StreamReader` from
259 accidentally calling inappropriate methods of the protocol.)
Victor Stinner24f8ebf2014-01-23 11:05:01 +0100260
Victor Stinnerc520edc2014-01-23 11:25:48 +0100261
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100262IncompleteReadError
263===================
264
265.. exception:: IncompleteReadError
266
Victor Stinner32970b82014-01-27 12:18:49 +0100267 Incomplete read error, subclass of :exc:`EOFError`.
Victor Stinnerb7f19ff2014-01-27 11:58:49 +0100268
269 .. attribute:: expected
270
271 Total number of expected bytes (:class:`int`).
272
273 .. attribute:: partial
274
275 Read bytes string before the end of stream was reached (:class:`bytes`).
276
277
Yury Selivanov950204d2016-05-16 16:23:00 -0400278LimitOverrunError
279=================
280
281.. exception:: LimitOverrunError
282
283 Reached the buffer limit while looking for a separator.
284
285 .. attribute:: consumed
286
287 Total number of to be consumed bytes.
288
289
Victor Stinner5121a9b2014-10-11 15:52:14 +0200290Stream examples
291===============
292
Victor Stinnered051592014-10-12 20:18:16 +0200293.. _asyncio-tcp-echo-client-streams:
294
295TCP echo client using streams
296-----------------------------
297
298TCP echo client using the :func:`asyncio.open_connection` function::
299
300 import asyncio
301
Victor Stinnered8e3a92014-10-13 00:55:50 +0200302 @asyncio.coroutine
Victor Stinnered051592014-10-12 20:18:16 +0200303 def tcp_echo_client(message, loop):
304 reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
305 loop=loop)
306
307 print('Send: %r' % message)
308 writer.write(message.encode())
309
310 data = yield from reader.read(100)
311 print('Received: %r' % data.decode())
312
313 print('Close the socket')
314 writer.close()
315
316 message = 'Hello World!'
317 loop = asyncio.get_event_loop()
318 loop.run_until_complete(tcp_echo_client(message, loop))
319 loop.close()
320
321.. seealso::
322
323 The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700324 example uses the :meth:`AbstractEventLoop.create_connection` method.
Victor Stinnered051592014-10-12 20:18:16 +0200325
326
327.. _asyncio-tcp-echo-server-streams:
328
329TCP echo server using streams
330-----------------------------
331
332TCP echo server using the :func:`asyncio.start_server` function::
333
334 import asyncio
335
336 @asyncio.coroutine
337 def handle_echo(reader, writer):
338 data = yield from reader.read(100)
339 message = data.decode()
340 addr = writer.get_extra_info('peername')
341 print("Received %r from %r" % (message, addr))
342
343 print("Send: %r" % message)
344 writer.write(data)
345 yield from writer.drain()
346
347 print("Close the client socket")
348 writer.close()
349
350 loop = asyncio.get_event_loop()
351 coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
352 server = loop.run_until_complete(coro)
353
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +0300354 # Serve requests until Ctrl+C is pressed
Victor Stinnered051592014-10-12 20:18:16 +0200355 print('Serving on {}'.format(server.sockets[0].getsockname()))
356 try:
357 loop.run_forever()
358 except KeyboardInterrupt:
359 pass
360
361 # Close the server
362 server.close()
363 loop.run_until_complete(server.wait_closed())
364 loop.close()
365
366.. seealso::
367
368 The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
Guido van Rossumf68afd82016-08-08 09:41:21 -0700369 example uses the :meth:`AbstractEventLoop.create_server` method.
Victor Stinnered051592014-10-12 20:18:16 +0200370
371
Victor Stinner5121a9b2014-10-11 15:52:14 +0200372Get HTTP headers
373----------------
Victor Stinnerc520edc2014-01-23 11:25:48 +0100374
375Simple example querying HTTP headers of the URL passed on the command line::
376
377 import asyncio
378 import urllib.parse
379 import sys
380
381 @asyncio.coroutine
382 def print_http_headers(url):
383 url = urllib.parse.urlsplit(url)
Victor Stinner5121a9b2014-10-11 15:52:14 +0200384 if url.scheme == 'https':
385 connect = asyncio.open_connection(url.hostname, 443, ssl=True)
386 else:
387 connect = asyncio.open_connection(url.hostname, 80)
388 reader, writer = yield from connect
389 query = ('HEAD {path} HTTP/1.0\r\n'
390 'Host: {hostname}\r\n'
391 '\r\n').format(path=url.path or '/', hostname=url.hostname)
Victor Stinnerc520edc2014-01-23 11:25:48 +0100392 writer.write(query.encode('latin-1'))
393 while True:
394 line = yield from reader.readline()
395 if not line:
396 break
397 line = line.decode('latin1').rstrip()
398 if line:
399 print('HTTP header> %s' % line)
400
Victor Stinner5121a9b2014-10-11 15:52:14 +0200401 # Ignore the body, close the socket
402 writer.close()
403
Victor Stinnerc520edc2014-01-23 11:25:48 +0100404 url = sys.argv[1]
405 loop = asyncio.get_event_loop()
Yury Selivanovd7e19bb2015-05-11 16:33:41 -0400406 task = asyncio.ensure_future(print_http_headers(url))
Victor Stinnerc520edc2014-01-23 11:25:48 +0100407 loop.run_until_complete(task)
Victor Stinnerf40c6632014-01-28 23:32:40 +0100408 loop.close()
Victor Stinnerc520edc2014-01-23 11:25:48 +0100409
410Usage::
411
412 python example.py http://example.com/path/page.html
413
Victor Stinner04e6df32014-10-11 16:16:27 +0200414or with HTTPS::
415
416 python example.py https://example.com/path/page.html
417
418.. _asyncio-register-socket-streams:
419
420Register an open socket to wait for data using streams
421------------------------------------------------------
422
423Coroutine waiting until a socket receives data using the
424:func:`open_connection` function::
425
426 import asyncio
Victor Stinnerccd8e342014-10-11 16:30:02 +0200427 try:
428 from socket import socketpair
429 except ImportError:
430 from asyncio.windows_utils import socketpair
Victor Stinner04e6df32014-10-11 16:16:27 +0200431
Victor Stinnered8e3a92014-10-13 00:55:50 +0200432 @asyncio.coroutine
Victor Stinner04e6df32014-10-11 16:16:27 +0200433 def wait_for_data(loop):
434 # Create a pair of connected sockets
Victor Stinnerccd8e342014-10-11 16:30:02 +0200435 rsock, wsock = socketpair()
Victor Stinner04e6df32014-10-11 16:16:27 +0200436
437 # Register the open socket to wait for data
438 reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
439
440 # Simulate the reception of data from the network
441 loop.call_soon(wsock.send, 'abc'.encode())
442
443 # Wait for data
444 data = yield from reader.read(100)
445
446 # Got data, we are done: close the socket
447 print("Received:", data.decode())
448 writer.close()
449
450 # Close the second socket
451 wsock.close()
452
453 loop = asyncio.get_event_loop()
454 loop.run_until_complete(wait_for_data(loop))
455 loop.close()
456
457.. seealso::
458
459 The :ref:`register an open socket to wait for data using a protocol
460 <asyncio-register-socket>` example uses a low-level protocol created by the
Guido van Rossumf68afd82016-08-08 09:41:21 -0700461 :meth:`AbstractEventLoop.create_connection` method.
Victor Stinner04e6df32014-10-11 16:16:27 +0200462
463 The :ref:`watch a file descriptor for read events
464 <asyncio-watch-read-event>` example uses the low-level
Guido van Rossumf68afd82016-08-08 09:41:21 -0700465 :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
Victor Stinner04e6df32014-10-11 16:16:27 +0200466 socket.
467