blob: b709dc11477ee9c27e35eca875f685809e73ae30 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Andrew Svetlov23b4b692019-05-27 22:56:22 +03002 'Stream', 'StreamMode',
3 'open_connection', 'start_server',
4 'connect', 'connect_read_pipe', 'connect_write_pipe',
5 'StreamServer')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Andrew Svetlov23b4b692019-05-27 22:56:22 +03007import enum
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07009import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -040010import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070011import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -050012
Guido van Rossume3e786c2014-02-18 10:24:30 -080013if hasattr(socket, 'AF_UNIX'):
Andrew Svetlov23b4b692019-05-27 22:56:22 +030014 __all__ += ('open_unix_connection', 'start_unix_server',
15 'connect_unix',
16 'UnixStreamServer')
Guido van Rossume3e786c2014-02-18 10:24:30 -080017
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070020from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070021from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020023from .log import logger
Andrew Svetlov23b4b692019-05-27 22:56:22 +030024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner9551f772018-05-29 16:02:07 +020027_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossuma849be92014-01-30 16:05:28 -080029
Andrew Svetlov23b4b692019-05-27 22:56:22 +030030class StreamMode(enum.Flag):
31 READ = enum.auto()
32 WRITE = enum.auto()
33 READWRITE = READ | WRITE
34
35
36def _ensure_can_read(mode):
37 if not mode & StreamMode.READ:
38 raise RuntimeError("The stream is write-only")
39
40
41def _ensure_can_write(mode):
42 if not mode & StreamMode.WRITE:
43 raise RuntimeError("The stream is read-only")
44
45
46class _ContextManagerHelper:
47 __slots__ = ('_awaitable', '_result')
48
49 def __init__(self, awaitable):
50 self._awaitable = awaitable
51 self._result = None
52
53 def __await__(self):
54 return self._awaitable.__await__()
55
56 async def __aenter__(self):
57 ret = await self._awaitable
58 result = await ret.__aenter__()
59 self._result = result
60 return result
61
62 async def __aexit__(self, exc_type, exc_val, exc_tb):
63 return await self._result.__aexit__(exc_type, exc_val, exc_tb)
64
65
66def connect(host=None, port=None, *,
67 limit=_DEFAULT_LIMIT,
68 ssl=None, family=0, proto=0,
69 flags=0, sock=None, local_addr=None,
70 server_hostname=None,
71 ssl_handshake_timeout=None,
72 happy_eyeballs_delay=None, interleave=None):
Xtreakd31b3152019-09-13 11:52:38 +010073 """Connect to TCP socket on *host* : *port* address to send and receive data.
74
75 *limit* determines the buffer size limit used by the returned `Stream`
76 instance. By default the *limit* is set to 64 KiB.
77
78 The rest of the arguments are passed directly to `loop.create_connection()`.
79 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +030080 # Design note:
Min ho Kim39d87b52019-08-31 06:21:19 +100081 # Don't use decorator approach but explicit non-async
Andrew Svetlov23b4b692019-05-27 22:56:22 +030082 # function to fail fast and explicitly
83 # if passed arguments don't match the function signature
84 return _ContextManagerHelper(_connect(host, port, limit,
85 ssl, family, proto,
86 flags, sock, local_addr,
87 server_hostname,
88 ssl_handshake_timeout,
89 happy_eyeballs_delay,
90 interleave))
91
92
93async def _connect(host, port,
94 limit,
95 ssl, family, proto,
96 flags, sock, local_addr,
97 server_hostname,
98 ssl_handshake_timeout,
99 happy_eyeballs_delay, interleave):
100 loop = events.get_running_loop()
101 stream = Stream(mode=StreamMode.READWRITE,
102 limit=limit,
103 loop=loop,
104 _asyncio_internal=True)
105 await loop.create_connection(
106 lambda: _StreamProtocol(stream, loop=loop,
107 _asyncio_internal=True),
108 host, port,
109 ssl=ssl, family=family, proto=proto,
110 flags=flags, sock=sock, local_addr=local_addr,
111 server_hostname=server_hostname,
112 ssl_handshake_timeout=ssl_handshake_timeout,
113 happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
114 return stream
115
116
117def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
Xtreakd31b3152019-09-13 11:52:38 +0100118 """Establish a connection to a file-like object *pipe* to receive data.
119
120 Takes a file-like object *pipe* to return a Stream object of the mode
121 StreamMode.READ that has similar API of StreamReader. It can also be used
122 as an async context manager.
123 """
124
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300125 # Design note:
126 # Don't use decorator approach but explicit non-async
127 # function to fail fast and explicitly
128 # if passed arguments don't match the function signature
129 return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
130
131
132async def _connect_read_pipe(pipe, limit):
133 loop = events.get_running_loop()
134 stream = Stream(mode=StreamMode.READ,
135 limit=limit,
136 loop=loop,
137 _asyncio_internal=True)
138 await loop.connect_read_pipe(
139 lambda: _StreamProtocol(stream, loop=loop,
140 _asyncio_internal=True),
141 pipe)
142 return stream
143
144
145def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
Xtreakd31b3152019-09-13 11:52:38 +0100146 """Establish a connection to a file-like object *pipe* to send data.
147
148 Takes a file-like object *pipe* to return a Stream object of the mode
149 StreamMode.WRITE that has similar API of StreamWriter. It can also be used
150 as an async context manager.
151 """
152
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300153 # Design note:
154 # Don't use decorator approach but explicit non-async
155 # function to fail fast and explicitly
156 # if passed arguments don't match the function signature
157 return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
158
159
160async def _connect_write_pipe(pipe, limit):
161 loop = events.get_running_loop()
162 stream = Stream(mode=StreamMode.WRITE,
163 limit=limit,
164 loop=loop,
165 _asyncio_internal=True)
166 await loop.connect_write_pipe(
167 lambda: _StreamProtocol(stream, loop=loop,
168 _asyncio_internal=True),
169 pipe)
170 return stream
171
172
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200173async def open_connection(host=None, port=None, *,
174 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 """A wrapper for create_connection() returning a (reader, writer) pair.
176
177 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +0100178 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179
180 The arguments are all the usual arguments to create_connection()
181 except protocol_factory; most common are positional host and port,
182 with various optional keyword arguments following.
183
184 Additional optional keyword arguments are loop (to set the event loop
185 instance to use) and limit (to set the buffer limit passed to the
186 StreamReader).
187
188 (If you want to customize the StreamReader and/or
189 StreamReaderProtocol classes, just copy the code -- there's
190 really nothing special here except some convenience.)
191 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300192 warnings.warn("open_connection() is deprecated since Python 3.8 "
193 "in favor of connect(), and scheduled for removal "
194 "in Python 3.10",
195 DeprecationWarning,
196 stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 if loop is None:
198 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300199 else:
200 warnings.warn("The loop argument is deprecated since Python 3.8, "
201 "and scheduled for removal in Python 3.10.",
202 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300203 reader = StreamReader(limit=limit, loop=loop)
204 protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200205 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300207 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700208 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209
210
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211async def start_server(client_connected_cb, host=None, port=None, *,
212 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -0800213 """Start a socket server, call back for each client connected.
214
215 The first parameter, `client_connected_cb`, takes two parameters:
216 client_reader, client_writer. client_reader is a StreamReader
217 object, while client_writer is a StreamWriter object. This
218 parameter can either be a plain callback function or a coroutine;
219 if it is a coroutine, it will be automatically converted into a
220 Task.
221
222 The rest of the arguments are all the usual arguments to
223 loop.create_server() except protocol_factory; most common are
224 positional host and port, with various optional keyword arguments
225 following. The return value is the same as loop.create_server().
226
227 Additional optional keyword arguments are loop (to set the event loop
228 instance to use) and limit (to set the buffer limit passed to the
229 StreamReader).
230
231 The return value is the same as loop.create_server(), i.e. a
232 Server object which can be used to stop the service.
233 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300234 warnings.warn("start_server() is deprecated since Python 3.8 "
235 "in favor of StreamServer(), and scheduled for removal "
236 "in Python 3.10",
237 DeprecationWarning,
238 stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -0800239 if loop is None:
240 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300241 else:
242 warnings.warn("The loop argument is deprecated since Python 3.8, "
243 "and scheduled for removal in Python 3.10.",
244 DeprecationWarning, stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -0800245
246 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300247 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -0800248 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400249 loop=loop,
250 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -0800251 return protocol
252
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200253 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800254
255
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300256class _BaseStreamServer:
257 # Design notes.
258 # StreamServer and UnixStreamServer are exposed as FINAL classes,
259 # not function factories.
260 # async with serve(host, port) as server:
261 # server.start_serving()
262 # looks ugly.
263 # The class doesn't provide API for enumerating connected streams
264 # It can be a subject for improvements in Python 3.9
265
266 _server_impl = None
267
268 def __init__(self, client_connected_cb,
269 /,
270 limit=_DEFAULT_LIMIT,
271 shutdown_timeout=60,
272 _asyncio_internal=False):
273 if not _asyncio_internal:
274 raise RuntimeError("_ServerStream is a private asyncio class")
275 self._client_connected_cb = client_connected_cb
276 self._limit = limit
277 self._loop = events.get_running_loop()
278 self._streams = {}
279 self._shutdown_timeout = shutdown_timeout
280
281 def __init_subclass__(cls):
282 if not cls.__module__.startswith('asyncio.'):
283 raise TypeError(f"asyncio.{cls.__name__} "
284 "class cannot be inherited from")
285
286 async def bind(self):
287 if self._server_impl is not None:
288 return
289 self._server_impl = await self._bind()
290
291 def is_bound(self):
292 return self._server_impl is not None
293
294 @property
295 def sockets(self):
296 # multiple value for socket bound to both IPv4 and IPv6 families
297 if self._server_impl is None:
298 return ()
299 return self._server_impl.sockets
300
301 def is_serving(self):
302 if self._server_impl is None:
303 return False
304 return self._server_impl.is_serving()
305
306 async def start_serving(self):
307 await self.bind()
308 await self._server_impl.start_serving()
309
310 async def serve_forever(self):
311 await self.start_serving()
312 await self._server_impl.serve_forever()
313
314 async def close(self):
315 if self._server_impl is None:
316 return
317 self._server_impl.close()
318 streams = list(self._streams.keys())
319 active_tasks = list(self._streams.values())
320 if streams:
321 await tasks.wait([stream.close() for stream in streams])
322 await self._server_impl.wait_closed()
323 self._server_impl = None
324 await self._shutdown_active_tasks(active_tasks)
325
326 async def abort(self):
327 if self._server_impl is None:
328 return
329 self._server_impl.close()
330 streams = list(self._streams.keys())
331 active_tasks = list(self._streams.values())
332 if streams:
333 await tasks.wait([stream.abort() for stream in streams])
334 await self._server_impl.wait_closed()
335 self._server_impl = None
336 await self._shutdown_active_tasks(active_tasks)
337
338 async def __aenter__(self):
339 await self.bind()
340 return self
341
342 async def __aexit__(self, exc_type, exc_value, exc_tb):
343 await self.close()
344
345 def _attach(self, stream, task):
346 self._streams[stream] = task
347
348 def _detach(self, stream, task):
349 del self._streams[stream]
350
351 async def _shutdown_active_tasks(self, active_tasks):
352 if not active_tasks:
353 return
354 # NOTE: tasks finished with exception are reported
355 # by the Task.__del__() method.
356 done, pending = await tasks.wait(active_tasks,
357 timeout=self._shutdown_timeout)
358 if not pending:
359 return
360 for task in pending:
361 task.cancel()
362 done, pending = await tasks.wait(pending,
363 timeout=self._shutdown_timeout)
364 for task in pending:
365 self._loop.call_exception_handler({
366 "message": (f'{task!r} ignored cancellation request '
367 f'from a closing {self!r}'),
368 "stream_server": self
369 })
370
371 def __repr__(self):
372 ret = [f'{self.__class__.__name__}']
373 if self.is_serving():
374 ret.append('serving')
375 if self.sockets:
376 ret.append(f'sockets={self.sockets!r}')
377 return '<' + ' '.join(ret) + '>'
378
379 def __del__(self, _warn=warnings.warn):
380 if self._server_impl is not None:
381 _warn(f"unclosed stream server {self!r}",
382 ResourceWarning, source=self)
383 self._server_impl.close()
384
385
386class StreamServer(_BaseStreamServer):
387
388 def __init__(self, client_connected_cb, /, host=None, port=None, *,
389 limit=_DEFAULT_LIMIT,
390 family=socket.AF_UNSPEC,
391 flags=socket.AI_PASSIVE, sock=None, backlog=100,
392 ssl=None, reuse_address=None, reuse_port=None,
393 ssl_handshake_timeout=None,
394 shutdown_timeout=60):
395 super().__init__(client_connected_cb,
396 limit=limit,
397 shutdown_timeout=shutdown_timeout,
398 _asyncio_internal=True)
399 self._host = host
400 self._port = port
401 self._family = family
402 self._flags = flags
403 self._sock = sock
404 self._backlog = backlog
405 self._ssl = ssl
406 self._reuse_address = reuse_address
407 self._reuse_port = reuse_port
408 self._ssl_handshake_timeout = ssl_handshake_timeout
409
410 async def _bind(self):
411 def factory():
412 protocol = _ServerStreamProtocol(self,
413 self._limit,
414 self._client_connected_cb,
415 loop=self._loop,
416 _asyncio_internal=True)
417 return protocol
418 return await self._loop.create_server(
419 factory,
420 self._host,
421 self._port,
422 start_serving=False,
423 family=self._family,
424 flags=self._flags,
425 sock=self._sock,
426 backlog=self._backlog,
427 ssl=self._ssl,
428 reuse_address=self._reuse_address,
429 reuse_port=self._reuse_port,
430 ssl_handshake_timeout=self._ssl_handshake_timeout)
431
432
Yury Selivanovb057c522014-02-18 12:15:06 -0500433if hasattr(socket, 'AF_UNIX'):
434 # UNIX Domain Sockets are supported on this platform
435
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200436 async def open_unix_connection(path=None, *,
437 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500438 """Similar to `open_connection` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300439 warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
440 "in favor of connect_unix(), and scheduled for removal "
441 "in Python 3.10",
442 DeprecationWarning,
443 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500444 if loop is None:
445 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300446 else:
447 warnings.warn("The loop argument is deprecated since Python 3.8, "
448 "and scheduled for removal in Python 3.10.",
449 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300450 reader = StreamReader(limit=limit, loop=loop)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400451 protocol = StreamReaderProtocol(reader, loop=loop,
452 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200453 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500454 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300455 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500456 return reader, writer
457
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300458
459 def connect_unix(path=None, *,
460 limit=_DEFAULT_LIMIT,
461 ssl=None, sock=None,
462 server_hostname=None,
463 ssl_handshake_timeout=None):
464 """Similar to `connect()` but works with UNIX Domain Sockets."""
465 # Design note:
Min ho Kim39d87b52019-08-31 06:21:19 +1000466 # Don't use decorator approach but explicit non-async
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300467 # function to fail fast and explicitly
468 # if passed arguments don't match the function signature
469 return _ContextManagerHelper(_connect_unix(path,
470 limit,
471 ssl, sock,
472 server_hostname,
473 ssl_handshake_timeout))
474
475
476 async def _connect_unix(path,
477 limit,
478 ssl, sock,
479 server_hostname,
480 ssl_handshake_timeout):
481 """Similar to `connect()` but works with UNIX Domain Sockets."""
482 loop = events.get_running_loop()
483 stream = Stream(mode=StreamMode.READWRITE,
484 limit=limit,
485 loop=loop,
486 _asyncio_internal=True)
487 await loop.create_unix_connection(
488 lambda: _StreamProtocol(stream,
489 loop=loop,
490 _asyncio_internal=True),
491 path,
492 ssl=ssl,
493 sock=sock,
494 server_hostname=server_hostname,
495 ssl_handshake_timeout=ssl_handshake_timeout)
496 return stream
497
498
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200499 async def start_unix_server(client_connected_cb, path=None, *,
500 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500501 """Similar to `start_server` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300502 warnings.warn("start_unix_server() is deprecated since Python 3.8 "
503 "in favor of UnixStreamServer(), and scheduled "
504 "for removal in Python 3.10",
505 DeprecationWarning,
506 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500507 if loop is None:
508 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300509 else:
510 warnings.warn("The loop argument is deprecated since Python 3.8, "
511 "and scheduled for removal in Python 3.10.",
512 DeprecationWarning, stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500513
514 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300515 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500516 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400517 loop=loop,
518 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500519 return protocol
520
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200521 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500522
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300523 class UnixStreamServer(_BaseStreamServer):
524
525 def __init__(self, client_connected_cb, /, path=None, *,
526 limit=_DEFAULT_LIMIT,
527 sock=None,
528 backlog=100,
529 ssl=None,
530 ssl_handshake_timeout=None,
531 shutdown_timeout=60):
532 super().__init__(client_connected_cb,
533 limit=limit,
534 shutdown_timeout=shutdown_timeout,
535 _asyncio_internal=True)
536 self._path = path
537 self._sock = sock
538 self._backlog = backlog
539 self._ssl = ssl
540 self._ssl_handshake_timeout = ssl_handshake_timeout
541
542 async def _bind(self):
543 def factory():
544 protocol = _ServerStreamProtocol(self,
545 self._limit,
546 self._client_connected_cb,
547 loop=self._loop,
548 _asyncio_internal=True)
549 return protocol
550 return await self._loop.create_unix_server(
551 factory,
552 self._path,
553 start_serving=False,
554 sock=self._sock,
555 backlog=self._backlog,
556 ssl=self._ssl,
557 ssl_handshake_timeout=self._ssl_handshake_timeout)
558
Yury Selivanovb057c522014-02-18 12:15:06 -0500559
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800560class FlowControlMixin(protocols.Protocol):
561 """Reusable flow control logic for StreamWriter.drain().
562
563 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800564 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800565 these it must call the super methods.
566
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200567 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800568 """
569
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400570 def __init__(self, loop=None, *, _asyncio_internal=False):
Victor Stinner70db9e42015-01-09 21:32:05 +0100571 if loop is None:
572 self._loop = events.get_event_loop()
573 else:
574 self._loop = loop
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400575 if not _asyncio_internal:
576 # NOTE:
577 # Avoid inheritance from FlowControlMixin
578 # Copy-paste the code to your project
579 # if you need flow control helpers
Min ho Kimc4cacc82019-07-31 08:16:13 +1000580 warnings.warn(f"{self.__class__} should be instantiated "
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400581 "by asyncio internals only, "
582 "please avoid its creation from user code",
583 DeprecationWarning)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800584 self._paused = False
585 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200586 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800587
588 def pause_writing(self):
589 assert not self._paused
590 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200591 if self._loop.get_debug():
592 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800593
594 def resume_writing(self):
595 assert self._paused
596 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200597 if self._loop.get_debug():
598 logger.debug("%r resumes writing", self)
599
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800600 waiter = self._drain_waiter
601 if waiter is not None:
602 self._drain_waiter = None
603 if not waiter.done():
604 waiter.set_result(None)
605
606 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200607 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800608 # Wake up the writer if currently paused.
609 if not self._paused:
610 return
611 waiter = self._drain_waiter
612 if waiter is None:
613 return
614 self._drain_waiter = None
615 if waiter.done():
616 return
617 if exc is None:
618 waiter.set_result(None)
619 else:
620 waiter.set_exception(exc)
621
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200622 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200623 if self._connection_lost:
624 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800625 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200626 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800627 waiter = self._drain_waiter
628 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400629 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800630 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200631 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800632
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400633 def _get_close_waiter(self, stream):
634 raise NotImplementedError
635
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800636
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300637# begin legacy stream APIs
638
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800639class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
640 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641
642 (This is a helper class instead of making StreamReader itself a
643 Protocol subclass, because the StreamReader has other potential
644 uses, and to prevent the user of the StreamReader to accidentally
645 call inappropriate methods of the protocol.)
646 """
647
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400648 def __init__(self, stream_reader, client_connected_cb=None, loop=None,
649 *, _asyncio_internal=False):
650 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300651 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800652 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800653 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400654 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200655 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
657 def connection_made(self, transport):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300658 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400659 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800660 if self._client_connected_cb is not None:
661 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300662 self._stream_reader,
663 self._loop)
664 res = self._client_connected_cb(self._stream_reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800665 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200666 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200667 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
669 def connection_lost(self, exc):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300670 if self._stream_reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400671 if exc is None:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300672 self._stream_reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400673 else:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300674 self._stream_reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200675 if not self._closed.done():
676 if exc is None:
677 self._closed.set_result(None)
678 else:
679 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800680 super().connection_lost(exc)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300681 self._stream_reader = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400682 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683
684 def data_received(self, data):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300685 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
687 def eof_received(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300688 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400689 if self._over_ssl:
690 # Prevent a warning in SSLProtocol.eof_received:
691 # "returning true from eof_received()
692 # has no effect when using ssl"
693 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200694 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700695
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200696 def __del__(self):
697 # Prevent reports about unhandled exceptions.
698 # Better than self._closed._log_traceback = False hack
699 closed = self._closed
700 if closed.done() and not closed.cancelled():
701 closed.exception()
702
Guido van Rossum355491d2013-10-18 15:17:11 -0700703
704class StreamWriter:
705 """Wraps a Transport.
706
707 This exposes write(), writelines(), [can_]write_eof(),
708 get_extra_info() and close(). It adds drain() which returns an
709 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800710 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700711 directly.
712 """
713
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300714 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700715 self._transport = transport
716 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000717 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200718 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700719 self._reader = reader
720 self._loop = loop
721
Victor Stinneracdb7822014-07-14 18:33:40 +0200722 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500723 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200724 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500725 info.append(f'reader={self._reader!r}')
726 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200727
Guido van Rossum355491d2013-10-18 15:17:11 -0700728 @property
729 def transport(self):
730 return self._transport
731
732 def write(self, data):
733 self._transport.write(data)
734
735 def writelines(self, data):
736 self._transport.writelines(data)
737
738 def write_eof(self):
739 return self._transport.write_eof()
740
741 def can_write_eof(self):
742 return self._transport.can_write_eof()
743
Victor Stinner406204c2015-01-15 21:50:19 +0100744 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300745 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100746
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200747 def is_closing(self):
748 return self._transport.is_closing()
749
750 async def wait_closed(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300751 await self._protocol._closed
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200752
Guido van Rossum355491d2013-10-18 15:17:11 -0700753 def get_extra_info(self, name, default=None):
754 return self._transport.get_extra_info(name, default)
755
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200756 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200757 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700758
759 The intended use is to write
760
761 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200762 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700763 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200764 if self._reader is not None:
765 exc = self._reader.exception()
766 if exc is not None:
767 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200768 if self._transport.is_closing():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300769 # Yield to the event loop so connection_lost() may be
770 # called. Without this, _drain_helper() would return
771 # immediately, and code that calls
772 # write(...); await drain()
773 # in a loop would never call connection_lost(), so it
774 # would not see an error when the socket is closed.
775 await tasks.sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200776 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777
778
779class StreamReader:
780
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300781 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700782 # The line length limit is a security feature;
783 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500784
785 if limit <= 0:
786 raise ValueError('Limit cannot be <= 0')
787
Guido van Rossum355491d2013-10-18 15:17:11 -0700788 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100790 self._loop = events.get_event_loop()
791 else:
792 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500793 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100794 self._eof = False # Whether we're done.
795 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796 self._exception = None
797 self._transport = None
798 self._paused = False
799
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200800 def __repr__(self):
801 info = ['StreamReader']
802 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500803 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200804 if self._eof:
805 info.append('eof')
806 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500807 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200808 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500809 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200810 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500811 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200812 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500813 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200814 if self._paused:
815 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500816 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200817
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818 def exception(self):
819 return self._exception
820
821 def set_exception(self, exc):
822 self._exception = exc
823
Guido van Rossum355491d2013-10-18 15:17:11 -0700824 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700826 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 if not waiter.cancelled():
828 waiter.set_exception(exc)
829
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100830 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500831 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100832 waiter = self._waiter
833 if waiter is not None:
834 self._waiter = None
835 if not waiter.cancelled():
836 waiter.set_result(None)
837
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700838 def set_transport(self, transport):
839 assert self._transport is None, 'Transport already set'
840 self._transport = transport
841
842 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500843 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700844 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700845 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700846
847 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700848 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100849 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700850
Yury Selivanovf0020f52014-02-06 00:14:30 -0500851 def at_eof(self):
852 """Return True if the buffer is empty and 'feed_eof' was called."""
853 return self._eof and not self._buffer
854
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700855 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500856 assert not self._eof, 'feed_data after feed_eof'
857
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700858 if not data:
859 return
860
Yury Selivanove694c972014-02-05 18:11:13 -0500861 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100862 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700863
864 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400865 not self._paused and
866 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700867 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700868 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869 except NotImplementedError:
870 # The transport can't be paused.
871 # We'll just have to buffer all data.
872 # Forget the transport so we don't keep trying.
873 self._transport = None
874 else:
875 self._paused = True
876
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200877 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500878 """Wait until feed_data() or feed_eof() is called.
879
880 If stream was paused, automatically resume it.
881 """
Victor Stinner183e3472014-01-23 17:40:03 +0100882 # StreamReader uses a future to link the protocol feed_data() method
883 # to a read coroutine. Running two read coroutines at the same time
884 # would have an unexpected behaviour. It would not possible to know
885 # which coroutine would get the next data.
886 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500887 raise RuntimeError(
888 f'{func_name}() called while another coroutine is '
889 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100890
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500891 assert not self._eof, '_wait_for_data after EOF'
892
893 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400894 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500895 if self._paused:
896 self._paused = False
897 self._transport.resume_reading()
898
Yury Selivanov7661db62016-05-16 15:38:39 -0400899 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100900 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200901 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100902 finally:
903 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100904
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200905 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500906 """Read chunk of data from the stream until newline (b'\n') is found.
907
908 On success, return chunk that ends with newline. If only partial
909 line can be read due to EOF, return incomplete line without
910 terminating newline. When EOF was reached while no bytes read, empty
911 bytes object is returned.
912
913 If limit is reached, ValueError will be raised. In that case, if
914 newline was found, complete line including newline will be removed
915 from internal buffer. Else, internal buffer will be cleared. Limit is
916 compared against part of the line without newline.
917
918 If stream was paused, this function will automatically resume it if
919 needed.
920 """
921 sep = b'\n'
922 seplen = len(sep)
923 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200924 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700925 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500926 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700927 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500928 if self._buffer.startswith(sep, e.consumed):
929 del self._buffer[:e.consumed + seplen]
930 else:
931 self._buffer.clear()
932 self._maybe_resume_transport()
933 raise ValueError(e.args[0])
934 return line
935
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200936 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400937 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500938
Yury Selivanovb4617912016-05-16 16:32:38 -0400939 On success, the data and separator will be removed from the
940 internal buffer (consumed). Returned data will include the
941 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500942
Yury Selivanovb4617912016-05-16 16:32:38 -0400943 Configured stream limit is used to check result. Limit sets the
944 maximal length of data that can be returned, not counting the
945 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500946
Yury Selivanovb4617912016-05-16 16:32:38 -0400947 If an EOF occurs and the complete separator is still not found,
948 an IncompleteReadError exception will be raised, and the internal
949 buffer will be reset. The IncompleteReadError.partial attribute
950 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500951
Yury Selivanovb4617912016-05-16 16:32:38 -0400952 If the data cannot be read because of over limit, a
953 LimitOverrunError exception will be raised, and the data
954 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500955 """
956 seplen = len(separator)
957 if seplen == 0:
958 raise ValueError('Separator should be at least one-byte string')
959
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700960 if self._exception is not None:
961 raise self._exception
962
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500963 # Consume whole buffer except last bytes, which length is
964 # one less than seplen. Let's check corner cases with
965 # separator='SEPARATOR':
966 # * we have received almost complete separator (without last
967 # byte). i.e buffer='some textSEPARATO'. In this case we
968 # can safely consume len(separator) - 1 bytes.
969 # * last byte of buffer is first byte of separator, i.e.
970 # buffer='abcdefghijklmnopqrS'. We may safely consume
971 # everything except that last byte, but this require to
972 # analyze bytes of buffer that match partial separator.
973 # This is slow and/or require FSM. For this case our
974 # implementation is not optimal, since require rescanning
975 # of data that is known to not belong to separator. In
976 # real world, separator will not be so long to notice
977 # performance problems. Even when reading MIME-encoded
978 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700979
Yury Selivanovb4617912016-05-16 16:32:38 -0400980 # `offset` is the number of bytes from the beginning of the buffer
981 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500982 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700983
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500984 # Loop until we find `separator` in the buffer, exceed the buffer size,
985 # or an EOF has happened.
986 while True:
987 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700988
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500989 # Check if we now have enough data in the buffer for `separator` to
990 # fit.
991 if buflen - offset >= seplen:
992 isep = self._buffer.find(separator, offset)
993
994 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400995 # `separator` is in the buffer. `isep` will be used later
996 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500997 break
998
999 # see upper comment for explanation.
1000 offset = buflen + 1 - seplen
1001 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001002 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -04001003 'Separator is not found, and chunk exceed the limit',
1004 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001005
1006 # Complete message (with full separator) may be present in buffer
1007 # even when EOF flag is set. This may happen when the last chunk
1008 # adds data which makes separator be found. That's why we check for
1009 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -07001010 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001011 chunk = bytes(self._buffer)
1012 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001013 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001014
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001015 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001016 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001017
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001018 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001019 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -04001020 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001021
1022 chunk = self._buffer[:isep + seplen]
1023 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001024 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001025 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001026
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001027 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001028 """Read up to `n` bytes from the stream.
1029
1030 If n is not provided, or set to -1, read until EOF and return all read
1031 bytes. If the EOF was received and the internal buffer is empty, return
1032 an empty bytes object.
1033
Martin Panter0be894b2016-09-07 12:03:06 +00001034 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001035
1036 If n is positive, this function try to read `n` bytes, and may return
1037 less or equal bytes than requested, but at least one byte. If EOF was
1038 received before any byte is read, this function returns empty byte
1039 object.
1040
Yury Selivanovb4617912016-05-16 16:32:38 -04001041 Returned value is not limited with limit, configured at stream
1042 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001043
1044 If stream was paused, this function will automatically resume it if
1045 needed.
1046 """
1047
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001048 if self._exception is not None:
1049 raise self._exception
1050
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001051 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001052 return b''
1053
1054 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001055 # This used to just loop creating a new waiter hoping to
1056 # collect everything in self._buffer, but that would
1057 # deadlock if the subprocess sends more than self.limit
1058 # bytes. So just call self.read(self._limit) until EOF.
1059 blocks = []
1060 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001061 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001062 if not block:
1063 break
1064 blocks.append(block)
1065 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001066
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001067 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001068 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001069
1070 # This will work right even if buffer is less than n bytes
1071 data = bytes(self._buffer[:n])
1072 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001073
Yury Selivanove694c972014-02-05 18:11:13 -05001074 self._maybe_resume_transport()
1075 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001076
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001077 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001078 """Read exactly `n` bytes.
1079
Yury Selivanovb4617912016-05-16 16:32:38 -04001080 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1081 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001082 contain the partial read bytes.
1083
1084 if n is zero, return empty bytes object.
1085
Yury Selivanovb4617912016-05-16 16:32:38 -04001086 Returned value is not limited with limit, configured at stream
1087 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001088
1089 If stream was paused, this function will automatically resume it if
1090 needed.
1091 """
Yury Selivanovdddc7812015-12-11 11:32:59 -05001092 if n < 0:
1093 raise ValueError('readexactly size can not be less than zero')
1094
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001095 if self._exception is not None:
1096 raise self._exception
1097
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001098 if n == 0:
1099 return b''
1100
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001101 while len(self._buffer) < n:
1102 if self._eof:
1103 incomplete = bytes(self._buffer)
1104 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001105 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001106
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001107 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001108
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001109 if len(self._buffer) == n:
1110 data = bytes(self._buffer)
1111 self._buffer.clear()
1112 else:
1113 data = bytes(self._buffer[:n])
1114 del self._buffer[:n]
1115 self._maybe_resume_transport()
1116 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -04001117
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001118 def __aiter__(self):
1119 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -04001120
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001121 async def __anext__(self):
1122 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001123 if val == b'':
1124 raise StopAsyncIteration
1125 return val
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001126
1127
1128# end legacy stream APIs
1129
1130
1131class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
1132 """Helper class to adapt between Protocol and StreamReader.
1133
1134 (This is a helper class instead of making StreamReader itself a
1135 Protocol subclass, because the StreamReader has other potential
1136 uses, and to prevent the user of the StreamReader to accidentally
1137 call inappropriate methods of the protocol.)
1138 """
1139
1140 _stream = None # initialized in derived classes
1141
1142 def __init__(self, loop=None,
1143 *, _asyncio_internal=False):
1144 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1145 self._transport = None
1146 self._over_ssl = False
1147 self._closed = self._loop.create_future()
1148
1149 def connection_made(self, transport):
1150 self._transport = transport
1151 self._over_ssl = transport.get_extra_info('sslcontext') is not None
1152
1153 def connection_lost(self, exc):
1154 stream = self._stream
1155 if stream is not None:
1156 if exc is None:
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001157 stream._feed_eof()
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001158 else:
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001159 stream._set_exception(exc)
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001160 if not self._closed.done():
1161 if exc is None:
1162 self._closed.set_result(None)
1163 else:
1164 self._closed.set_exception(exc)
1165 super().connection_lost(exc)
1166 self._transport = None
1167
1168 def data_received(self, data):
1169 stream = self._stream
1170 if stream is not None:
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001171 stream._feed_data(data)
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001172
1173 def eof_received(self):
1174 stream = self._stream
1175 if stream is not None:
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001176 stream._feed_eof()
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001177 if self._over_ssl:
1178 # Prevent a warning in SSLProtocol.eof_received:
1179 # "returning true from eof_received()
1180 # has no effect when using ssl"
1181 return False
1182 return True
1183
1184 def _get_close_waiter(self, stream):
1185 return self._closed
1186
1187 def __del__(self):
1188 # Prevent reports about unhandled exceptions.
1189 # Better than self._closed._log_traceback = False hack
1190 closed = self._get_close_waiter(self._stream)
1191 if closed.done() and not closed.cancelled():
1192 closed.exception()
1193
1194
1195class _StreamProtocol(_BaseStreamProtocol):
1196 _source_traceback = None
1197
1198 def __init__(self, stream, loop=None,
1199 *, _asyncio_internal=False):
1200 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1201 self._source_traceback = stream._source_traceback
1202 self._stream_wr = weakref.ref(stream, self._on_gc)
1203 self._reject_connection = False
1204
1205 def _on_gc(self, wr):
1206 transport = self._transport
1207 if transport is not None:
1208 # connection_made was called
1209 context = {
1210 'message': ('An open stream object is being garbage '
1211 'collected; call "stream.close()" explicitly.')
1212 }
1213 if self._source_traceback:
1214 context['source_traceback'] = self._source_traceback
1215 self._loop.call_exception_handler(context)
1216 transport.abort()
1217 else:
1218 self._reject_connection = True
1219 self._stream_wr = None
1220
1221 @property
1222 def _stream(self):
1223 if self._stream_wr is None:
1224 return None
1225 return self._stream_wr()
1226
1227 def connection_made(self, transport):
1228 if self._reject_connection:
1229 context = {
1230 'message': ('An open stream was garbage collected prior to '
1231 'establishing network connection; '
1232 'call "stream.close()" explicitly.')
1233 }
1234 if self._source_traceback:
1235 context['source_traceback'] = self._source_traceback
1236 self._loop.call_exception_handler(context)
1237 transport.abort()
1238 return
1239 super().connection_made(transport)
1240 stream = self._stream
1241 if stream is None:
1242 return
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001243 stream._set_transport(transport)
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001244 stream._protocol = self
1245
1246 def connection_lost(self, exc):
1247 super().connection_lost(exc)
1248 self._stream_wr = None
1249
1250
1251class _ServerStreamProtocol(_BaseStreamProtocol):
1252 def __init__(self, server, limit, client_connected_cb, loop=None,
1253 *, _asyncio_internal=False):
1254 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1255 assert self._closed
1256 self._client_connected_cb = client_connected_cb
1257 self._limit = limit
1258 self._server = server
1259 self._task = None
1260
1261 def connection_made(self, transport):
1262 super().connection_made(transport)
1263 stream = Stream(mode=StreamMode.READWRITE,
1264 transport=transport,
1265 protocol=self,
1266 limit=self._limit,
1267 loop=self._loop,
1268 is_server_side=True,
1269 _asyncio_internal=True)
1270 self._stream = stream
1271 # If self._client_connected_cb(self._stream) fails
1272 # the exception is logged by transport
1273 self._task = self._loop.create_task(
1274 self._client_connected_cb(self._stream))
1275 self._server._attach(stream, self._task)
1276
1277 def connection_lost(self, exc):
1278 super().connection_lost(exc)
1279 self._server._detach(self._stream, self._task)
1280 self._stream = None
1281
1282
1283class _OptionalAwait:
1284 # The class doesn't create a coroutine
1285 # if not awaited
1286 # It prevents "coroutine is never awaited" message
1287
1288 __slots___ = ('_method',)
1289
1290 def __init__(self, method):
1291 self._method = method
1292
1293 def __await__(self):
1294 return self._method().__await__()
1295
1296
1297class Stream:
1298 """Wraps a Transport.
1299
1300 This exposes write(), writelines(), [can_]write_eof(),
1301 get_extra_info() and close(). It adds drain() which returns an
1302 optional Future on which you can wait for flow control. It also
1303 adds a transport property which references the Transport
1304 directly.
1305 """
1306
1307 _source_traceback = None
1308
1309 def __init__(self, mode, *,
1310 transport=None,
1311 protocol=None,
1312 loop=None,
1313 limit=_DEFAULT_LIMIT,
1314 is_server_side=False,
1315 _asyncio_internal=False):
1316 if not _asyncio_internal:
Andrew Svetlov97d15b12019-06-27 14:38:47 +03001317 raise RuntimeError(f"{self.__class__} should be instantiated "
1318 "by asyncio internals only")
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001319 self._mode = mode
1320 self._transport = transport
1321 self._protocol = protocol
1322 self._is_server_side = is_server_side
1323
1324 # The line length limit is a security feature;
1325 # it also doubles as half the buffer limit.
1326
1327 if limit <= 0:
1328 raise ValueError('Limit cannot be <= 0')
1329
1330 self._limit = limit
1331 if loop is None:
1332 self._loop = events.get_event_loop()
1333 else:
1334 self._loop = loop
1335 self._buffer = bytearray()
1336 self._eof = False # Whether we're done.
1337 self._waiter = None # A future used by _wait_for_data()
1338 self._exception = None
1339 self._paused = False
1340 self._complete_fut = self._loop.create_future()
1341 self._complete_fut.set_result(None)
1342
1343 if self._loop.get_debug():
1344 self._source_traceback = format_helpers.extract_stack(
1345 sys._getframe(1))
1346
1347 def __repr__(self):
1348 info = [self.__class__.__name__]
1349 info.append(f'mode={self._mode}')
1350 if self._buffer:
1351 info.append(f'{len(self._buffer)} bytes')
1352 if self._eof:
1353 info.append('eof')
1354 if self._limit != _DEFAULT_LIMIT:
1355 info.append(f'limit={self._limit}')
1356 if self._waiter:
1357 info.append(f'waiter={self._waiter!r}')
1358 if self._exception:
1359 info.append(f'exception={self._exception!r}')
1360 if self._transport:
1361 info.append(f'transport={self._transport!r}')
1362 if self._paused:
1363 info.append('paused')
1364 return '<{}>'.format(' '.join(info))
1365
1366 @property
1367 def mode(self):
1368 return self._mode
1369
1370 def is_server_side(self):
1371 return self._is_server_side
1372
1373 @property
1374 def transport(self):
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001375 warnings.warn("Stream.transport attribute is deprecated "
1376 "since Python 3.8 and is scheduled for removal in 3.10; "
1377 "it is an internal API",
1378 DeprecationWarning,
1379 stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001380 return self._transport
1381
1382 def write(self, data):
1383 _ensure_can_write(self._mode)
1384 self._transport.write(data)
1385 return self._fast_drain()
1386
1387 def writelines(self, data):
1388 _ensure_can_write(self._mode)
1389 self._transport.writelines(data)
1390 return self._fast_drain()
1391
1392 def _fast_drain(self):
1393 # The helper tries to use fast-path to return already existing
1394 # complete future object if underlying transport is not paused
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001395 # and actual waiting for writing resume is not needed
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001396 exc = self.exception()
1397 if exc is not None:
1398 fut = self._loop.create_future()
1399 fut.set_exception(exc)
1400 return fut
1401 if not self._transport.is_closing():
1402 if self._protocol._connection_lost:
1403 fut = self._loop.create_future()
1404 fut.set_exception(ConnectionResetError('Connection lost'))
1405 return fut
1406 if not self._protocol._paused:
1407 # fast path, the stream is not paused
1408 # no need to wait for resume signal
1409 return self._complete_fut
1410 return _OptionalAwait(self.drain)
1411
1412 def write_eof(self):
1413 _ensure_can_write(self._mode)
1414 return self._transport.write_eof()
1415
1416 def can_write_eof(self):
1417 if not self._mode.is_write():
1418 return False
1419 return self._transport.can_write_eof()
1420
1421 def close(self):
1422 self._transport.close()
1423 return _OptionalAwait(self.wait_closed)
1424
1425 def is_closing(self):
1426 return self._transport.is_closing()
1427
1428 async def abort(self):
1429 self._transport.abort()
1430 await self.wait_closed()
1431
1432 async def wait_closed(self):
1433 await self._protocol._get_close_waiter(self)
1434
1435 def get_extra_info(self, name, default=None):
1436 return self._transport.get_extra_info(name, default)
1437
1438 async def drain(self):
1439 """Flush the write buffer.
1440
1441 The intended use is to write
1442
1443 w.write(data)
1444 await w.drain()
1445 """
1446 _ensure_can_write(self._mode)
1447 exc = self.exception()
1448 if exc is not None:
1449 raise exc
1450 if self._transport.is_closing():
1451 # Wait for protocol.connection_lost() call
1452 # Raise connection closing error if any,
1453 # ConnectionResetError otherwise
1454 await tasks.sleep(0)
1455 await self._protocol._drain_helper()
1456
1457 async def sendfile(self, file, offset=0, count=None, *, fallback=True):
1458 await self.drain() # check for stream mode and exceptions
1459 return await self._loop.sendfile(self._transport, file,
1460 offset, count, fallback=fallback)
1461
1462 async def start_tls(self, sslcontext, *,
1463 server_hostname=None,
1464 ssl_handshake_timeout=None):
1465 await self.drain() # check for stream mode and exceptions
1466 transport = await self._loop.start_tls(
1467 self._transport, self._protocol, sslcontext,
1468 server_side=self._is_server_side,
1469 server_hostname=server_hostname,
1470 ssl_handshake_timeout=ssl_handshake_timeout)
1471 self._transport = transport
1472 self._protocol._transport = transport
1473 self._protocol._over_ssl = True
1474
1475 def exception(self):
1476 return self._exception
1477
1478 def set_exception(self, exc):
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001479 warnings.warn("Stream.set_exception() is deprecated "
1480 "since Python 3.8 and is scheduled for removal in 3.10; "
1481 "it is an internal API",
1482 DeprecationWarning,
1483 stacklevel=2)
1484 self._set_exception(exc)
1485
1486 def _set_exception(self, exc):
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001487 self._exception = exc
1488
1489 waiter = self._waiter
1490 if waiter is not None:
1491 self._waiter = None
1492 if not waiter.cancelled():
1493 waiter.set_exception(exc)
1494
1495 def _wakeup_waiter(self):
1496 """Wakeup read*() functions waiting for data or EOF."""
1497 waiter = self._waiter
1498 if waiter is not None:
1499 self._waiter = None
1500 if not waiter.cancelled():
1501 waiter.set_result(None)
1502
1503 def set_transport(self, transport):
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001504 warnings.warn("Stream.set_transport() is deprecated "
1505 "since Python 3.8 and is scheduled for removal in 3.10; "
1506 "it is an internal API",
1507 DeprecationWarning,
1508 stacklevel=2)
1509 self._set_transport(transport)
1510
1511 def _set_transport(self, transport):
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001512 if transport is self._transport:
1513 return
1514 assert self._transport is None, 'Transport already set'
1515 self._transport = transport
1516
1517 def _maybe_resume_transport(self):
1518 if self._paused and len(self._buffer) <= self._limit:
1519 self._paused = False
1520 self._transport.resume_reading()
1521
1522 def feed_eof(self):
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001523 warnings.warn("Stream.feed_eof() is deprecated "
1524 "since Python 3.8 and is scheduled for removal in 3.10; "
1525 "it is an internal API",
1526 DeprecationWarning,
1527 stacklevel=2)
1528 self._feed_eof()
1529
1530 def _feed_eof(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001531 self._eof = True
1532 self._wakeup_waiter()
1533
1534 def at_eof(self):
1535 """Return True if the buffer is empty and 'feed_eof' was called."""
1536 return self._eof and not self._buffer
1537
1538 def feed_data(self, data):
Andrew Svetlov12c122a2019-09-10 15:56:14 +03001539 warnings.warn("Stream.feed_data() is deprecated "
1540 "since Python 3.8 and is scheduled for removal in 3.10; "
1541 "it is an internal API",
1542 DeprecationWarning,
1543 stacklevel=2)
1544 self._feed_data(data)
1545
1546 def _feed_data(self, data):
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001547 _ensure_can_read(self._mode)
1548 assert not self._eof, 'feed_data after feed_eof'
1549
1550 if not data:
1551 return
1552
1553 self._buffer.extend(data)
1554 self._wakeup_waiter()
1555
1556 if (self._transport is not None and
1557 not self._paused and
1558 len(self._buffer) > 2 * self._limit):
1559 try:
1560 self._transport.pause_reading()
1561 except NotImplementedError:
1562 # The transport can't be paused.
1563 # We'll just have to buffer all data.
1564 # Forget the transport so we don't keep trying.
1565 self._transport = None
1566 else:
1567 self._paused = True
1568
1569 async def _wait_for_data(self, func_name):
1570 """Wait until feed_data() or feed_eof() is called.
1571
1572 If stream was paused, automatically resume it.
1573 """
1574 # StreamReader uses a future to link the protocol feed_data() method
1575 # to a read coroutine. Running two read coroutines at the same time
1576 # would have an unexpected behaviour. It would not possible to know
1577 # which coroutine would get the next data.
1578 if self._waiter is not None:
1579 raise RuntimeError(
1580 f'{func_name}() called while another coroutine is '
1581 f'already waiting for incoming data')
1582
1583 assert not self._eof, '_wait_for_data after EOF'
1584
1585 # Waiting for data while paused will make deadlock, so prevent it.
1586 # This is essential for readexactly(n) for case when n > self._limit.
1587 if self._paused:
1588 self._paused = False
1589 self._transport.resume_reading()
1590
1591 self._waiter = self._loop.create_future()
1592 try:
1593 await self._waiter
1594 finally:
1595 self._waiter = None
1596
1597 async def readline(self):
1598 """Read chunk of data from the stream until newline (b'\n') is found.
1599
1600 On success, return chunk that ends with newline. If only partial
1601 line can be read due to EOF, return incomplete line without
1602 terminating newline. When EOF was reached while no bytes read, empty
1603 bytes object is returned.
1604
1605 If limit is reached, ValueError will be raised. In that case, if
1606 newline was found, complete line including newline will be removed
1607 from internal buffer. Else, internal buffer will be cleared. Limit is
1608 compared against part of the line without newline.
1609
1610 If stream was paused, this function will automatically resume it if
1611 needed.
1612 """
1613 _ensure_can_read(self._mode)
1614 sep = b'\n'
1615 seplen = len(sep)
1616 try:
1617 line = await self.readuntil(sep)
1618 except exceptions.IncompleteReadError as e:
1619 return e.partial
1620 except exceptions.LimitOverrunError as e:
1621 if self._buffer.startswith(sep, e.consumed):
1622 del self._buffer[:e.consumed + seplen]
1623 else:
1624 self._buffer.clear()
1625 self._maybe_resume_transport()
1626 raise ValueError(e.args[0])
1627 return line
1628
1629 async def readuntil(self, separator=b'\n'):
1630 """Read data from the stream until ``separator`` is found.
1631
1632 On success, the data and separator will be removed from the
1633 internal buffer (consumed). Returned data will include the
1634 separator at the end.
1635
1636 Configured stream limit is used to check result. Limit sets the
1637 maximal length of data that can be returned, not counting the
1638 separator.
1639
1640 If an EOF occurs and the complete separator is still not found,
1641 an IncompleteReadError exception will be raised, and the internal
1642 buffer will be reset. The IncompleteReadError.partial attribute
1643 may contain the separator partially.
1644
1645 If the data cannot be read because of over limit, a
1646 LimitOverrunError exception will be raised, and the data
1647 will be left in the internal buffer, so it can be read again.
1648 """
1649 _ensure_can_read(self._mode)
1650 seplen = len(separator)
1651 if seplen == 0:
1652 raise ValueError('Separator should be at least one-byte string')
1653
1654 if self._exception is not None:
1655 raise self._exception
1656
1657 # Consume whole buffer except last bytes, which length is
1658 # one less than seplen. Let's check corner cases with
1659 # separator='SEPARATOR':
1660 # * we have received almost complete separator (without last
1661 # byte). i.e buffer='some textSEPARATO'. In this case we
1662 # can safely consume len(separator) - 1 bytes.
1663 # * last byte of buffer is first byte of separator, i.e.
1664 # buffer='abcdefghijklmnopqrS'. We may safely consume
1665 # everything except that last byte, but this require to
1666 # analyze bytes of buffer that match partial separator.
1667 # This is slow and/or require FSM. For this case our
1668 # implementation is not optimal, since require rescanning
1669 # of data that is known to not belong to separator. In
1670 # real world, separator will not be so long to notice
1671 # performance problems. Even when reading MIME-encoded
1672 # messages :)
1673
1674 # `offset` is the number of bytes from the beginning of the buffer
1675 # where there is no occurrence of `separator`.
1676 offset = 0
1677
1678 # Loop until we find `separator` in the buffer, exceed the buffer size,
1679 # or an EOF has happened.
1680 while True:
1681 buflen = len(self._buffer)
1682
1683 # Check if we now have enough data in the buffer for `separator` to
1684 # fit.
1685 if buflen - offset >= seplen:
1686 isep = self._buffer.find(separator, offset)
1687
1688 if isep != -1:
1689 # `separator` is in the buffer. `isep` will be used later
1690 # to retrieve the data.
1691 break
1692
1693 # see upper comment for explanation.
1694 offset = buflen + 1 - seplen
1695 if offset > self._limit:
1696 raise exceptions.LimitOverrunError(
1697 'Separator is not found, and chunk exceed the limit',
1698 offset)
1699
1700 # Complete message (with full separator) may be present in buffer
1701 # even when EOF flag is set. This may happen when the last chunk
1702 # adds data which makes separator be found. That's why we check for
1703 # EOF *ater* inspecting the buffer.
1704 if self._eof:
1705 chunk = bytes(self._buffer)
1706 self._buffer.clear()
1707 raise exceptions.IncompleteReadError(chunk, None)
1708
1709 # _wait_for_data() will resume reading if stream was paused.
1710 await self._wait_for_data('readuntil')
1711
1712 if isep > self._limit:
1713 raise exceptions.LimitOverrunError(
1714 'Separator is found, but chunk is longer than limit', isep)
1715
1716 chunk = self._buffer[:isep + seplen]
1717 del self._buffer[:isep + seplen]
1718 self._maybe_resume_transport()
1719 return bytes(chunk)
1720
1721 async def read(self, n=-1):
1722 """Read up to `n` bytes from the stream.
1723
1724 If n is not provided, or set to -1, read until EOF and return all read
1725 bytes. If the EOF was received and the internal buffer is empty, return
1726 an empty bytes object.
1727
1728 If n is zero, return empty bytes object immediately.
1729
1730 If n is positive, this function try to read `n` bytes, and may return
1731 less or equal bytes than requested, but at least one byte. If EOF was
1732 received before any byte is read, this function returns empty byte
1733 object.
1734
1735 Returned value is not limited with limit, configured at stream
1736 creation.
1737
1738 If stream was paused, this function will automatically resume it if
1739 needed.
1740 """
1741 _ensure_can_read(self._mode)
1742
1743 if self._exception is not None:
1744 raise self._exception
1745
1746 if n == 0:
1747 return b''
1748
1749 if n < 0:
1750 # This used to just loop creating a new waiter hoping to
1751 # collect everything in self._buffer, but that would
1752 # deadlock if the subprocess sends more than self.limit
1753 # bytes. So just call self.read(self._limit) until EOF.
1754 blocks = []
1755 while True:
1756 block = await self.read(self._limit)
1757 if not block:
1758 break
1759 blocks.append(block)
1760 return b''.join(blocks)
1761
1762 if not self._buffer and not self._eof:
1763 await self._wait_for_data('read')
1764
1765 # This will work right even if buffer is less than n bytes
1766 data = bytes(self._buffer[:n])
1767 del self._buffer[:n]
1768
1769 self._maybe_resume_transport()
1770 return data
1771
1772 async def readexactly(self, n):
1773 """Read exactly `n` bytes.
1774
1775 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1776 read. The IncompleteReadError.partial attribute of the exception will
1777 contain the partial read bytes.
1778
1779 if n is zero, return empty bytes object.
1780
1781 Returned value is not limited with limit, configured at stream
1782 creation.
1783
1784 If stream was paused, this function will automatically resume it if
1785 needed.
1786 """
1787 _ensure_can_read(self._mode)
1788 if n < 0:
1789 raise ValueError('readexactly size can not be less than zero')
1790
1791 if self._exception is not None:
1792 raise self._exception
1793
1794 if n == 0:
1795 return b''
1796
1797 while len(self._buffer) < n:
1798 if self._eof:
1799 incomplete = bytes(self._buffer)
1800 self._buffer.clear()
1801 raise exceptions.IncompleteReadError(incomplete, n)
1802
1803 await self._wait_for_data('readexactly')
1804
1805 if len(self._buffer) == n:
1806 data = bytes(self._buffer)
1807 self._buffer.clear()
1808 else:
1809 data = bytes(self._buffer[:n])
1810 del self._buffer[:n]
1811 self._maybe_resume_transport()
1812 return data
1813
1814 def __aiter__(self):
1815 _ensure_can_read(self._mode)
1816 return self
1817
1818 async def __anext__(self):
1819 val = await self.readline()
1820 if val == b'':
1821 raise StopAsyncIteration
1822 return val
1823
1824 async def __aenter__(self):
1825 return self
1826
1827 async def __aexit__(self, exc_type, exc_val, exc_tb):
1828 await self.close()