blob: 480f1a3fdd74edce0a518daf19e5f5e2e5027595 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Andrew Svetlov23b4b692019-05-27 22:56:22 +03002 'Stream', 'StreamMode',
3 'open_connection', 'start_server',
4 'connect', 'connect_read_pipe', 'connect_write_pipe',
5 'StreamServer')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Andrew Svetlov23b4b692019-05-27 22:56:22 +03007import enum
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07009import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -040010import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070011import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -050012
Guido van Rossume3e786c2014-02-18 10:24:30 -080013if hasattr(socket, 'AF_UNIX'):
Andrew Svetlov23b4b692019-05-27 22:56:22 +030014 __all__ += ('open_unix_connection', 'start_unix_server',
15 'connect_unix',
16 'UnixStreamServer')
Guido van Rossume3e786c2014-02-18 10:24:30 -080017
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070020from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070021from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020023from .log import logger
Andrew Svetlov23b4b692019-05-27 22:56:22 +030024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner9551f772018-05-29 16:02:07 +020027_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossuma849be92014-01-30 16:05:28 -080029
Andrew Svetlov23b4b692019-05-27 22:56:22 +030030class StreamMode(enum.Flag):
31 READ = enum.auto()
32 WRITE = enum.auto()
33 READWRITE = READ | WRITE
34
35
36def _ensure_can_read(mode):
37 if not mode & StreamMode.READ:
38 raise RuntimeError("The stream is write-only")
39
40
41def _ensure_can_write(mode):
42 if not mode & StreamMode.WRITE:
43 raise RuntimeError("The stream is read-only")
44
45
46class _ContextManagerHelper:
47 __slots__ = ('_awaitable', '_result')
48
49 def __init__(self, awaitable):
50 self._awaitable = awaitable
51 self._result = None
52
53 def __await__(self):
54 return self._awaitable.__await__()
55
56 async def __aenter__(self):
57 ret = await self._awaitable
58 result = await ret.__aenter__()
59 self._result = result
60 return result
61
62 async def __aexit__(self, exc_type, exc_val, exc_tb):
63 return await self._result.__aexit__(exc_type, exc_val, exc_tb)
64
65
66def connect(host=None, port=None, *,
67 limit=_DEFAULT_LIMIT,
68 ssl=None, family=0, proto=0,
69 flags=0, sock=None, local_addr=None,
70 server_hostname=None,
71 ssl_handshake_timeout=None,
72 happy_eyeballs_delay=None, interleave=None):
73 # Design note:
74 # Don't use decorator approach but exilicit non-async
75 # function to fail fast and explicitly
76 # if passed arguments don't match the function signature
77 return _ContextManagerHelper(_connect(host, port, limit,
78 ssl, family, proto,
79 flags, sock, local_addr,
80 server_hostname,
81 ssl_handshake_timeout,
82 happy_eyeballs_delay,
83 interleave))
84
85
86async def _connect(host, port,
87 limit,
88 ssl, family, proto,
89 flags, sock, local_addr,
90 server_hostname,
91 ssl_handshake_timeout,
92 happy_eyeballs_delay, interleave):
93 loop = events.get_running_loop()
94 stream = Stream(mode=StreamMode.READWRITE,
95 limit=limit,
96 loop=loop,
97 _asyncio_internal=True)
98 await loop.create_connection(
99 lambda: _StreamProtocol(stream, loop=loop,
100 _asyncio_internal=True),
101 host, port,
102 ssl=ssl, family=family, proto=proto,
103 flags=flags, sock=sock, local_addr=local_addr,
104 server_hostname=server_hostname,
105 ssl_handshake_timeout=ssl_handshake_timeout,
106 happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
107 return stream
108
109
110def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
111 # Design note:
112 # Don't use decorator approach but explicit non-async
113 # function to fail fast and explicitly
114 # if passed arguments don't match the function signature
115 return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
116
117
118async def _connect_read_pipe(pipe, limit):
119 loop = events.get_running_loop()
120 stream = Stream(mode=StreamMode.READ,
121 limit=limit,
122 loop=loop,
123 _asyncio_internal=True)
124 await loop.connect_read_pipe(
125 lambda: _StreamProtocol(stream, loop=loop,
126 _asyncio_internal=True),
127 pipe)
128 return stream
129
130
131def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
132 # Design note:
133 # Don't use decorator approach but explicit non-async
134 # function to fail fast and explicitly
135 # if passed arguments don't match the function signature
136 return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
137
138
139async def _connect_write_pipe(pipe, limit):
140 loop = events.get_running_loop()
141 stream = Stream(mode=StreamMode.WRITE,
142 limit=limit,
143 loop=loop,
144 _asyncio_internal=True)
145 await loop.connect_write_pipe(
146 lambda: _StreamProtocol(stream, loop=loop,
147 _asyncio_internal=True),
148 pipe)
149 return stream
150
151
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200152async def open_connection(host=None, port=None, *,
153 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """A wrapper for create_connection() returning a (reader, writer) pair.
155
156 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +0100157 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 The arguments are all the usual arguments to create_connection()
160 except protocol_factory; most common are positional host and port,
161 with various optional keyword arguments following.
162
163 Additional optional keyword arguments are loop (to set the event loop
164 instance to use) and limit (to set the buffer limit passed to the
165 StreamReader).
166
167 (If you want to customize the StreamReader and/or
168 StreamReaderProtocol classes, just copy the code -- there's
169 really nothing special here except some convenience.)
170 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300171 warnings.warn("open_connection() is deprecated since Python 3.8 "
172 "in favor of connect(), and scheduled for removal "
173 "in Python 3.10",
174 DeprecationWarning,
175 stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 if loop is None:
177 loop = events.get_event_loop()
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300178 reader = StreamReader(limit=limit, loop=loop)
179 protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200180 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300182 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700183 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200186async def start_server(client_connected_cb, host=None, port=None, *,
187 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -0800188 """Start a socket server, call back for each client connected.
189
190 The first parameter, `client_connected_cb`, takes two parameters:
191 client_reader, client_writer. client_reader is a StreamReader
192 object, while client_writer is a StreamWriter object. This
193 parameter can either be a plain callback function or a coroutine;
194 if it is a coroutine, it will be automatically converted into a
195 Task.
196
197 The rest of the arguments are all the usual arguments to
198 loop.create_server() except protocol_factory; most common are
199 positional host and port, with various optional keyword arguments
200 following. The return value is the same as loop.create_server().
201
202 Additional optional keyword arguments are loop (to set the event loop
203 instance to use) and limit (to set the buffer limit passed to the
204 StreamReader).
205
206 The return value is the same as loop.create_server(), i.e. a
207 Server object which can be used to stop the service.
208 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300209 warnings.warn("start_server() is deprecated since Python 3.8 "
210 "in favor of StreamServer(), and scheduled for removal "
211 "in Python 3.10",
212 DeprecationWarning,
213 stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -0800214 if loop is None:
215 loop = events.get_event_loop()
216
217 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300218 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -0800219 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400220 loop=loop,
221 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -0800222 return protocol
223
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200224 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800225
226
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300227class _BaseStreamServer:
228 # Design notes.
229 # StreamServer and UnixStreamServer are exposed as FINAL classes,
230 # not function factories.
231 # async with serve(host, port) as server:
232 # server.start_serving()
233 # looks ugly.
234 # The class doesn't provide API for enumerating connected streams
235 # It can be a subject for improvements in Python 3.9
236
237 _server_impl = None
238
239 def __init__(self, client_connected_cb,
240 /,
241 limit=_DEFAULT_LIMIT,
242 shutdown_timeout=60,
243 _asyncio_internal=False):
244 if not _asyncio_internal:
245 raise RuntimeError("_ServerStream is a private asyncio class")
246 self._client_connected_cb = client_connected_cb
247 self._limit = limit
248 self._loop = events.get_running_loop()
249 self._streams = {}
250 self._shutdown_timeout = shutdown_timeout
251
252 def __init_subclass__(cls):
253 if not cls.__module__.startswith('asyncio.'):
254 raise TypeError(f"asyncio.{cls.__name__} "
255 "class cannot be inherited from")
256
257 async def bind(self):
258 if self._server_impl is not None:
259 return
260 self._server_impl = await self._bind()
261
262 def is_bound(self):
263 return self._server_impl is not None
264
265 @property
266 def sockets(self):
267 # multiple value for socket bound to both IPv4 and IPv6 families
268 if self._server_impl is None:
269 return ()
270 return self._server_impl.sockets
271
272 def is_serving(self):
273 if self._server_impl is None:
274 return False
275 return self._server_impl.is_serving()
276
277 async def start_serving(self):
278 await self.bind()
279 await self._server_impl.start_serving()
280
281 async def serve_forever(self):
282 await self.start_serving()
283 await self._server_impl.serve_forever()
284
285 async def close(self):
286 if self._server_impl is None:
287 return
288 self._server_impl.close()
289 streams = list(self._streams.keys())
290 active_tasks = list(self._streams.values())
291 if streams:
292 await tasks.wait([stream.close() for stream in streams])
293 await self._server_impl.wait_closed()
294 self._server_impl = None
295 await self._shutdown_active_tasks(active_tasks)
296
297 async def abort(self):
298 if self._server_impl is None:
299 return
300 self._server_impl.close()
301 streams = list(self._streams.keys())
302 active_tasks = list(self._streams.values())
303 if streams:
304 await tasks.wait([stream.abort() for stream in streams])
305 await self._server_impl.wait_closed()
306 self._server_impl = None
307 await self._shutdown_active_tasks(active_tasks)
308
309 async def __aenter__(self):
310 await self.bind()
311 return self
312
313 async def __aexit__(self, exc_type, exc_value, exc_tb):
314 await self.close()
315
316 def _attach(self, stream, task):
317 self._streams[stream] = task
318
319 def _detach(self, stream, task):
320 del self._streams[stream]
321
322 async def _shutdown_active_tasks(self, active_tasks):
323 if not active_tasks:
324 return
325 # NOTE: tasks finished with exception are reported
326 # by the Task.__del__() method.
327 done, pending = await tasks.wait(active_tasks,
328 timeout=self._shutdown_timeout)
329 if not pending:
330 return
331 for task in pending:
332 task.cancel()
333 done, pending = await tasks.wait(pending,
334 timeout=self._shutdown_timeout)
335 for task in pending:
336 self._loop.call_exception_handler({
337 "message": (f'{task!r} ignored cancellation request '
338 f'from a closing {self!r}'),
339 "stream_server": self
340 })
341
342 def __repr__(self):
343 ret = [f'{self.__class__.__name__}']
344 if self.is_serving():
345 ret.append('serving')
346 if self.sockets:
347 ret.append(f'sockets={self.sockets!r}')
348 return '<' + ' '.join(ret) + '>'
349
350 def __del__(self, _warn=warnings.warn):
351 if self._server_impl is not None:
352 _warn(f"unclosed stream server {self!r}",
353 ResourceWarning, source=self)
354 self._server_impl.close()
355
356
357class StreamServer(_BaseStreamServer):
358
359 def __init__(self, client_connected_cb, /, host=None, port=None, *,
360 limit=_DEFAULT_LIMIT,
361 family=socket.AF_UNSPEC,
362 flags=socket.AI_PASSIVE, sock=None, backlog=100,
363 ssl=None, reuse_address=None, reuse_port=None,
364 ssl_handshake_timeout=None,
365 shutdown_timeout=60):
366 super().__init__(client_connected_cb,
367 limit=limit,
368 shutdown_timeout=shutdown_timeout,
369 _asyncio_internal=True)
370 self._host = host
371 self._port = port
372 self._family = family
373 self._flags = flags
374 self._sock = sock
375 self._backlog = backlog
376 self._ssl = ssl
377 self._reuse_address = reuse_address
378 self._reuse_port = reuse_port
379 self._ssl_handshake_timeout = ssl_handshake_timeout
380
381 async def _bind(self):
382 def factory():
383 protocol = _ServerStreamProtocol(self,
384 self._limit,
385 self._client_connected_cb,
386 loop=self._loop,
387 _asyncio_internal=True)
388 return protocol
389 return await self._loop.create_server(
390 factory,
391 self._host,
392 self._port,
393 start_serving=False,
394 family=self._family,
395 flags=self._flags,
396 sock=self._sock,
397 backlog=self._backlog,
398 ssl=self._ssl,
399 reuse_address=self._reuse_address,
400 reuse_port=self._reuse_port,
401 ssl_handshake_timeout=self._ssl_handshake_timeout)
402
403
Yury Selivanovb057c522014-02-18 12:15:06 -0500404if hasattr(socket, 'AF_UNIX'):
405 # UNIX Domain Sockets are supported on this platform
406
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200407 async def open_unix_connection(path=None, *,
408 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500409 """Similar to `open_connection` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300410 warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
411 "in favor of connect_unix(), and scheduled for removal "
412 "in Python 3.10",
413 DeprecationWarning,
414 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500415 if loop is None:
416 loop = events.get_event_loop()
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300417 reader = StreamReader(limit=limit, loop=loop)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400418 protocol = StreamReaderProtocol(reader, loop=loop,
419 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200420 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500421 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300422 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500423 return reader, writer
424
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300425
426 def connect_unix(path=None, *,
427 limit=_DEFAULT_LIMIT,
428 ssl=None, sock=None,
429 server_hostname=None,
430 ssl_handshake_timeout=None):
431 """Similar to `connect()` but works with UNIX Domain Sockets."""
432 # Design note:
433 # Don't use decorator approach but exilicit non-async
434 # function to fail fast and explicitly
435 # if passed arguments don't match the function signature
436 return _ContextManagerHelper(_connect_unix(path,
437 limit,
438 ssl, sock,
439 server_hostname,
440 ssl_handshake_timeout))
441
442
443 async def _connect_unix(path,
444 limit,
445 ssl, sock,
446 server_hostname,
447 ssl_handshake_timeout):
448 """Similar to `connect()` but works with UNIX Domain Sockets."""
449 loop = events.get_running_loop()
450 stream = Stream(mode=StreamMode.READWRITE,
451 limit=limit,
452 loop=loop,
453 _asyncio_internal=True)
454 await loop.create_unix_connection(
455 lambda: _StreamProtocol(stream,
456 loop=loop,
457 _asyncio_internal=True),
458 path,
459 ssl=ssl,
460 sock=sock,
461 server_hostname=server_hostname,
462 ssl_handshake_timeout=ssl_handshake_timeout)
463 return stream
464
465
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200466 async def start_unix_server(client_connected_cb, path=None, *,
467 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500468 """Similar to `start_server` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300469 warnings.warn("start_unix_server() is deprecated since Python 3.8 "
470 "in favor of UnixStreamServer(), and scheduled "
471 "for removal in Python 3.10",
472 DeprecationWarning,
473 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500474 if loop is None:
475 loop = events.get_event_loop()
476
477 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300478 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500479 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400480 loop=loop,
481 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500482 return protocol
483
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200484 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500485
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300486 class UnixStreamServer(_BaseStreamServer):
487
488 def __init__(self, client_connected_cb, /, path=None, *,
489 limit=_DEFAULT_LIMIT,
490 sock=None,
491 backlog=100,
492 ssl=None,
493 ssl_handshake_timeout=None,
494 shutdown_timeout=60):
495 super().__init__(client_connected_cb,
496 limit=limit,
497 shutdown_timeout=shutdown_timeout,
498 _asyncio_internal=True)
499 self._path = path
500 self._sock = sock
501 self._backlog = backlog
502 self._ssl = ssl
503 self._ssl_handshake_timeout = ssl_handshake_timeout
504
505 async def _bind(self):
506 def factory():
507 protocol = _ServerStreamProtocol(self,
508 self._limit,
509 self._client_connected_cb,
510 loop=self._loop,
511 _asyncio_internal=True)
512 return protocol
513 return await self._loop.create_unix_server(
514 factory,
515 self._path,
516 start_serving=False,
517 sock=self._sock,
518 backlog=self._backlog,
519 ssl=self._ssl,
520 ssl_handshake_timeout=self._ssl_handshake_timeout)
521
Yury Selivanovb057c522014-02-18 12:15:06 -0500522
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800523class FlowControlMixin(protocols.Protocol):
524 """Reusable flow control logic for StreamWriter.drain().
525
526 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800527 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800528 these it must call the super methods.
529
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200530 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800531 """
532
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400533 def __init__(self, loop=None, *, _asyncio_internal=False):
Victor Stinner70db9e42015-01-09 21:32:05 +0100534 if loop is None:
535 self._loop = events.get_event_loop()
536 else:
537 self._loop = loop
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400538 if not _asyncio_internal:
539 # NOTE:
540 # Avoid inheritance from FlowControlMixin
541 # Copy-paste the code to your project
542 # if you need flow control helpers
543 warnings.warn(f"{self.__class__} should be instaniated "
544 "by asyncio internals only, "
545 "please avoid its creation from user code",
546 DeprecationWarning)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800547 self._paused = False
548 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200549 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800550
551 def pause_writing(self):
552 assert not self._paused
553 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200554 if self._loop.get_debug():
555 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800556
557 def resume_writing(self):
558 assert self._paused
559 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200560 if self._loop.get_debug():
561 logger.debug("%r resumes writing", self)
562
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800563 waiter = self._drain_waiter
564 if waiter is not None:
565 self._drain_waiter = None
566 if not waiter.done():
567 waiter.set_result(None)
568
569 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200570 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800571 # Wake up the writer if currently paused.
572 if not self._paused:
573 return
574 waiter = self._drain_waiter
575 if waiter is None:
576 return
577 self._drain_waiter = None
578 if waiter.done():
579 return
580 if exc is None:
581 waiter.set_result(None)
582 else:
583 waiter.set_exception(exc)
584
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200585 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200586 if self._connection_lost:
587 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800588 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200589 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800590 waiter = self._drain_waiter
591 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400592 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800593 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200594 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800595
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400596 def _get_close_waiter(self, stream):
597 raise NotImplementedError
598
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800599
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300600# begin legacy stream APIs
601
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800602class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
603 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604
605 (This is a helper class instead of making StreamReader itself a
606 Protocol subclass, because the StreamReader has other potential
607 uses, and to prevent the user of the StreamReader to accidentally
608 call inappropriate methods of the protocol.)
609 """
610
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400611 def __init__(self, stream_reader, client_connected_cb=None, loop=None,
612 *, _asyncio_internal=False):
613 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300614 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800615 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800616 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400617 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200618 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
620 def connection_made(self, transport):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300621 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400622 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800623 if self._client_connected_cb is not None:
624 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300625 self._stream_reader,
626 self._loop)
627 res = self._client_connected_cb(self._stream_reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800628 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200629 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200630 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631
632 def connection_lost(self, exc):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300633 if self._stream_reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400634 if exc is None:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300635 self._stream_reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400636 else:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300637 self._stream_reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200638 if not self._closed.done():
639 if exc is None:
640 self._closed.set_result(None)
641 else:
642 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800643 super().connection_lost(exc)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300644 self._stream_reader = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400645 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646
647 def data_received(self, data):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300648 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649
650 def eof_received(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300651 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400652 if self._over_ssl:
653 # Prevent a warning in SSLProtocol.eof_received:
654 # "returning true from eof_received()
655 # has no effect when using ssl"
656 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200657 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700658
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200659 def __del__(self):
660 # Prevent reports about unhandled exceptions.
661 # Better than self._closed._log_traceback = False hack
662 closed = self._closed
663 if closed.done() and not closed.cancelled():
664 closed.exception()
665
Guido van Rossum355491d2013-10-18 15:17:11 -0700666
667class StreamWriter:
668 """Wraps a Transport.
669
670 This exposes write(), writelines(), [can_]write_eof(),
671 get_extra_info() and close(). It adds drain() which returns an
672 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800673 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700674 directly.
675 """
676
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300677 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700678 self._transport = transport
679 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000680 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200681 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700682 self._reader = reader
683 self._loop = loop
684
Victor Stinneracdb7822014-07-14 18:33:40 +0200685 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500686 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200687 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500688 info.append(f'reader={self._reader!r}')
689 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200690
Guido van Rossum355491d2013-10-18 15:17:11 -0700691 @property
692 def transport(self):
693 return self._transport
694
695 def write(self, data):
696 self._transport.write(data)
697
698 def writelines(self, data):
699 self._transport.writelines(data)
700
701 def write_eof(self):
702 return self._transport.write_eof()
703
704 def can_write_eof(self):
705 return self._transport.can_write_eof()
706
Victor Stinner406204c2015-01-15 21:50:19 +0100707 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300708 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100709
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200710 def is_closing(self):
711 return self._transport.is_closing()
712
713 async def wait_closed(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300714 await self._protocol._closed
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200715
Guido van Rossum355491d2013-10-18 15:17:11 -0700716 def get_extra_info(self, name, default=None):
717 return self._transport.get_extra_info(name, default)
718
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200719 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200720 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700721
722 The intended use is to write
723
724 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200725 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700726 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200727 if self._reader is not None:
728 exc = self._reader.exception()
729 if exc is not None:
730 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200731 if self._transport.is_closing():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300732 # Yield to the event loop so connection_lost() may be
733 # called. Without this, _drain_helper() would return
734 # immediately, and code that calls
735 # write(...); await drain()
736 # in a loop would never call connection_lost(), so it
737 # would not see an error when the socket is closed.
738 await tasks.sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200739 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700740
741
742class StreamReader:
743
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300744 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 # The line length limit is a security feature;
746 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500747
748 if limit <= 0:
749 raise ValueError('Limit cannot be <= 0')
750
Guido van Rossum355491d2013-10-18 15:17:11 -0700751 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100753 self._loop = events.get_event_loop()
754 else:
755 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500756 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100757 self._eof = False # Whether we're done.
758 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700759 self._exception = None
760 self._transport = None
761 self._paused = False
762
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200763 def __repr__(self):
764 info = ['StreamReader']
765 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500766 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200767 if self._eof:
768 info.append('eof')
769 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500770 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200771 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500772 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200773 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500774 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200775 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500776 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200777 if self._paused:
778 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500779 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200780
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 def exception(self):
782 return self._exception
783
784 def set_exception(self, exc):
785 self._exception = exc
786
Guido van Rossum355491d2013-10-18 15:17:11 -0700787 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700789 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 if not waiter.cancelled():
791 waiter.set_exception(exc)
792
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100793 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500794 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100795 waiter = self._waiter
796 if waiter is not None:
797 self._waiter = None
798 if not waiter.cancelled():
799 waiter.set_result(None)
800
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 def set_transport(self, transport):
802 assert self._transport is None, 'Transport already set'
803 self._transport = transport
804
805 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500806 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700807 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700808 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809
810 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700811 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100812 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813
Yury Selivanovf0020f52014-02-06 00:14:30 -0500814 def at_eof(self):
815 """Return True if the buffer is empty and 'feed_eof' was called."""
816 return self._eof and not self._buffer
817
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500819 assert not self._eof, 'feed_data after feed_eof'
820
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821 if not data:
822 return
823
Yury Selivanove694c972014-02-05 18:11:13 -0500824 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100825 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700826
827 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400828 not self._paused and
829 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700830 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700831 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832 except NotImplementedError:
833 # The transport can't be paused.
834 # We'll just have to buffer all data.
835 # Forget the transport so we don't keep trying.
836 self._transport = None
837 else:
838 self._paused = True
839
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200840 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500841 """Wait until feed_data() or feed_eof() is called.
842
843 If stream was paused, automatically resume it.
844 """
Victor Stinner183e3472014-01-23 17:40:03 +0100845 # StreamReader uses a future to link the protocol feed_data() method
846 # to a read coroutine. Running two read coroutines at the same time
847 # would have an unexpected behaviour. It would not possible to know
848 # which coroutine would get the next data.
849 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500850 raise RuntimeError(
851 f'{func_name}() called while another coroutine is '
852 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100853
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500854 assert not self._eof, '_wait_for_data after EOF'
855
856 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400857 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500858 if self._paused:
859 self._paused = False
860 self._transport.resume_reading()
861
Yury Selivanov7661db62016-05-16 15:38:39 -0400862 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100863 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200864 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100865 finally:
866 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100867
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200868 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500869 """Read chunk of data from the stream until newline (b'\n') is found.
870
871 On success, return chunk that ends with newline. If only partial
872 line can be read due to EOF, return incomplete line without
873 terminating newline. When EOF was reached while no bytes read, empty
874 bytes object is returned.
875
876 If limit is reached, ValueError will be raised. In that case, if
877 newline was found, complete line including newline will be removed
878 from internal buffer. Else, internal buffer will be cleared. Limit is
879 compared against part of the line without newline.
880
881 If stream was paused, this function will automatically resume it if
882 needed.
883 """
884 sep = b'\n'
885 seplen = len(sep)
886 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200887 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700888 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500889 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700890 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500891 if self._buffer.startswith(sep, e.consumed):
892 del self._buffer[:e.consumed + seplen]
893 else:
894 self._buffer.clear()
895 self._maybe_resume_transport()
896 raise ValueError(e.args[0])
897 return line
898
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200899 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400900 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500901
Yury Selivanovb4617912016-05-16 16:32:38 -0400902 On success, the data and separator will be removed from the
903 internal buffer (consumed). Returned data will include the
904 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500905
Yury Selivanovb4617912016-05-16 16:32:38 -0400906 Configured stream limit is used to check result. Limit sets the
907 maximal length of data that can be returned, not counting the
908 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500909
Yury Selivanovb4617912016-05-16 16:32:38 -0400910 If an EOF occurs and the complete separator is still not found,
911 an IncompleteReadError exception will be raised, and the internal
912 buffer will be reset. The IncompleteReadError.partial attribute
913 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500914
Yury Selivanovb4617912016-05-16 16:32:38 -0400915 If the data cannot be read because of over limit, a
916 LimitOverrunError exception will be raised, and the data
917 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500918 """
919 seplen = len(separator)
920 if seplen == 0:
921 raise ValueError('Separator should be at least one-byte string')
922
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700923 if self._exception is not None:
924 raise self._exception
925
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500926 # Consume whole buffer except last bytes, which length is
927 # one less than seplen. Let's check corner cases with
928 # separator='SEPARATOR':
929 # * we have received almost complete separator (without last
930 # byte). i.e buffer='some textSEPARATO'. In this case we
931 # can safely consume len(separator) - 1 bytes.
932 # * last byte of buffer is first byte of separator, i.e.
933 # buffer='abcdefghijklmnopqrS'. We may safely consume
934 # everything except that last byte, but this require to
935 # analyze bytes of buffer that match partial separator.
936 # This is slow and/or require FSM. For this case our
937 # implementation is not optimal, since require rescanning
938 # of data that is known to not belong to separator. In
939 # real world, separator will not be so long to notice
940 # performance problems. Even when reading MIME-encoded
941 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700942
Yury Selivanovb4617912016-05-16 16:32:38 -0400943 # `offset` is the number of bytes from the beginning of the buffer
944 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500945 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700946
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500947 # Loop until we find `separator` in the buffer, exceed the buffer size,
948 # or an EOF has happened.
949 while True:
950 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700951
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500952 # Check if we now have enough data in the buffer for `separator` to
953 # fit.
954 if buflen - offset >= seplen:
955 isep = self._buffer.find(separator, offset)
956
957 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400958 # `separator` is in the buffer. `isep` will be used later
959 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500960 break
961
962 # see upper comment for explanation.
963 offset = buflen + 1 - seplen
964 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700965 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400966 'Separator is not found, and chunk exceed the limit',
967 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500968
969 # Complete message (with full separator) may be present in buffer
970 # even when EOF flag is set. This may happen when the last chunk
971 # adds data which makes separator be found. That's why we check for
972 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700973 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500974 chunk = bytes(self._buffer)
975 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700976 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700977
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500978 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200979 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700980
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500981 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700982 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400983 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500984
985 chunk = self._buffer[:isep + seplen]
986 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700987 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500988 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200990 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500991 """Read up to `n` bytes from the stream.
992
993 If n is not provided, or set to -1, read until EOF and return all read
994 bytes. If the EOF was received and the internal buffer is empty, return
995 an empty bytes object.
996
Martin Panter0be894b2016-09-07 12:03:06 +0000997 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500998
999 If n is positive, this function try to read `n` bytes, and may return
1000 less or equal bytes than requested, but at least one byte. If EOF was
1001 received before any byte is read, this function returns empty byte
1002 object.
1003
Yury Selivanovb4617912016-05-16 16:32:38 -04001004 Returned value is not limited with limit, configured at stream
1005 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001006
1007 If stream was paused, this function will automatically resume it if
1008 needed.
1009 """
1010
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001011 if self._exception is not None:
1012 raise self._exception
1013
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001014 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001015 return b''
1016
1017 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001018 # This used to just loop creating a new waiter hoping to
1019 # collect everything in self._buffer, but that would
1020 # deadlock if the subprocess sends more than self.limit
1021 # bytes. So just call self.read(self._limit) until EOF.
1022 blocks = []
1023 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001024 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001025 if not block:
1026 break
1027 blocks.append(block)
1028 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001029
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001030 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001031 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001032
1033 # This will work right even if buffer is less than n bytes
1034 data = bytes(self._buffer[:n])
1035 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001036
Yury Selivanove694c972014-02-05 18:11:13 -05001037 self._maybe_resume_transport()
1038 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001039
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001040 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001041 """Read exactly `n` bytes.
1042
Yury Selivanovb4617912016-05-16 16:32:38 -04001043 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1044 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001045 contain the partial read bytes.
1046
1047 if n is zero, return empty bytes object.
1048
Yury Selivanovb4617912016-05-16 16:32:38 -04001049 Returned value is not limited with limit, configured at stream
1050 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001051
1052 If stream was paused, this function will automatically resume it if
1053 needed.
1054 """
Yury Selivanovdddc7812015-12-11 11:32:59 -05001055 if n < 0:
1056 raise ValueError('readexactly size can not be less than zero')
1057
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001058 if self._exception is not None:
1059 raise self._exception
1060
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001061 if n == 0:
1062 return b''
1063
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001064 while len(self._buffer) < n:
1065 if self._eof:
1066 incomplete = bytes(self._buffer)
1067 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001068 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001069
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001070 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001071
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001072 if len(self._buffer) == n:
1073 data = bytes(self._buffer)
1074 self._buffer.clear()
1075 else:
1076 data = bytes(self._buffer[:n])
1077 del self._buffer[:n]
1078 self._maybe_resume_transport()
1079 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -04001080
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001081 def __aiter__(self):
1082 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -04001083
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001084 async def __anext__(self):
1085 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001086 if val == b'':
1087 raise StopAsyncIteration
1088 return val
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001089
1090
1091# end legacy stream APIs
1092
1093
1094class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
1095 """Helper class to adapt between Protocol and StreamReader.
1096
1097 (This is a helper class instead of making StreamReader itself a
1098 Protocol subclass, because the StreamReader has other potential
1099 uses, and to prevent the user of the StreamReader to accidentally
1100 call inappropriate methods of the protocol.)
1101 """
1102
1103 _stream = None # initialized in derived classes
1104
1105 def __init__(self, loop=None,
1106 *, _asyncio_internal=False):
1107 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1108 self._transport = None
1109 self._over_ssl = False
1110 self._closed = self._loop.create_future()
1111
1112 def connection_made(self, transport):
1113 self._transport = transport
1114 self._over_ssl = transport.get_extra_info('sslcontext') is not None
1115
1116 def connection_lost(self, exc):
1117 stream = self._stream
1118 if stream is not None:
1119 if exc is None:
1120 stream.feed_eof()
1121 else:
1122 stream.set_exception(exc)
1123 if not self._closed.done():
1124 if exc is None:
1125 self._closed.set_result(None)
1126 else:
1127 self._closed.set_exception(exc)
1128 super().connection_lost(exc)
1129 self._transport = None
1130
1131 def data_received(self, data):
1132 stream = self._stream
1133 if stream is not None:
1134 stream.feed_data(data)
1135
1136 def eof_received(self):
1137 stream = self._stream
1138 if stream is not None:
1139 stream.feed_eof()
1140 if self._over_ssl:
1141 # Prevent a warning in SSLProtocol.eof_received:
1142 # "returning true from eof_received()
1143 # has no effect when using ssl"
1144 return False
1145 return True
1146
1147 def _get_close_waiter(self, stream):
1148 return self._closed
1149
1150 def __del__(self):
1151 # Prevent reports about unhandled exceptions.
1152 # Better than self._closed._log_traceback = False hack
1153 closed = self._get_close_waiter(self._stream)
1154 if closed.done() and not closed.cancelled():
1155 closed.exception()
1156
1157
1158class _StreamProtocol(_BaseStreamProtocol):
1159 _source_traceback = None
1160
1161 def __init__(self, stream, loop=None,
1162 *, _asyncio_internal=False):
1163 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1164 self._source_traceback = stream._source_traceback
1165 self._stream_wr = weakref.ref(stream, self._on_gc)
1166 self._reject_connection = False
1167
1168 def _on_gc(self, wr):
1169 transport = self._transport
1170 if transport is not None:
1171 # connection_made was called
1172 context = {
1173 'message': ('An open stream object is being garbage '
1174 'collected; call "stream.close()" explicitly.')
1175 }
1176 if self._source_traceback:
1177 context['source_traceback'] = self._source_traceback
1178 self._loop.call_exception_handler(context)
1179 transport.abort()
1180 else:
1181 self._reject_connection = True
1182 self._stream_wr = None
1183
1184 @property
1185 def _stream(self):
1186 if self._stream_wr is None:
1187 return None
1188 return self._stream_wr()
1189
1190 def connection_made(self, transport):
1191 if self._reject_connection:
1192 context = {
1193 'message': ('An open stream was garbage collected prior to '
1194 'establishing network connection; '
1195 'call "stream.close()" explicitly.')
1196 }
1197 if self._source_traceback:
1198 context['source_traceback'] = self._source_traceback
1199 self._loop.call_exception_handler(context)
1200 transport.abort()
1201 return
1202 super().connection_made(transport)
1203 stream = self._stream
1204 if stream is None:
1205 return
1206 stream.set_transport(transport)
1207 stream._protocol = self
1208
1209 def connection_lost(self, exc):
1210 super().connection_lost(exc)
1211 self._stream_wr = None
1212
1213
1214class _ServerStreamProtocol(_BaseStreamProtocol):
1215 def __init__(self, server, limit, client_connected_cb, loop=None,
1216 *, _asyncio_internal=False):
1217 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1218 assert self._closed
1219 self._client_connected_cb = client_connected_cb
1220 self._limit = limit
1221 self._server = server
1222 self._task = None
1223
1224 def connection_made(self, transport):
1225 super().connection_made(transport)
1226 stream = Stream(mode=StreamMode.READWRITE,
1227 transport=transport,
1228 protocol=self,
1229 limit=self._limit,
1230 loop=self._loop,
1231 is_server_side=True,
1232 _asyncio_internal=True)
1233 self._stream = stream
1234 # If self._client_connected_cb(self._stream) fails
1235 # the exception is logged by transport
1236 self._task = self._loop.create_task(
1237 self._client_connected_cb(self._stream))
1238 self._server._attach(stream, self._task)
1239
1240 def connection_lost(self, exc):
1241 super().connection_lost(exc)
1242 self._server._detach(self._stream, self._task)
1243 self._stream = None
1244
1245
1246class _OptionalAwait:
1247 # The class doesn't create a coroutine
1248 # if not awaited
1249 # It prevents "coroutine is never awaited" message
1250
1251 __slots___ = ('_method',)
1252
1253 def __init__(self, method):
1254 self._method = method
1255
1256 def __await__(self):
1257 return self._method().__await__()
1258
1259
1260class Stream:
1261 """Wraps a Transport.
1262
1263 This exposes write(), writelines(), [can_]write_eof(),
1264 get_extra_info() and close(). It adds drain() which returns an
1265 optional Future on which you can wait for flow control. It also
1266 adds a transport property which references the Transport
1267 directly.
1268 """
1269
1270 _source_traceback = None
1271
1272 def __init__(self, mode, *,
1273 transport=None,
1274 protocol=None,
1275 loop=None,
1276 limit=_DEFAULT_LIMIT,
1277 is_server_side=False,
1278 _asyncio_internal=False):
1279 if not _asyncio_internal:
1280 warnings.warn(f"{self.__class__} should be instaniated "
1281 "by asyncio internals only, "
1282 "please avoid its creation from user code",
1283 DeprecationWarning)
1284 self._mode = mode
1285 self._transport = transport
1286 self._protocol = protocol
1287 self._is_server_side = is_server_side
1288
1289 # The line length limit is a security feature;
1290 # it also doubles as half the buffer limit.
1291
1292 if limit <= 0:
1293 raise ValueError('Limit cannot be <= 0')
1294
1295 self._limit = limit
1296 if loop is None:
1297 self._loop = events.get_event_loop()
1298 else:
1299 self._loop = loop
1300 self._buffer = bytearray()
1301 self._eof = False # Whether we're done.
1302 self._waiter = None # A future used by _wait_for_data()
1303 self._exception = None
1304 self._paused = False
1305 self._complete_fut = self._loop.create_future()
1306 self._complete_fut.set_result(None)
1307
1308 if self._loop.get_debug():
1309 self._source_traceback = format_helpers.extract_stack(
1310 sys._getframe(1))
1311
1312 def __repr__(self):
1313 info = [self.__class__.__name__]
1314 info.append(f'mode={self._mode}')
1315 if self._buffer:
1316 info.append(f'{len(self._buffer)} bytes')
1317 if self._eof:
1318 info.append('eof')
1319 if self._limit != _DEFAULT_LIMIT:
1320 info.append(f'limit={self._limit}')
1321 if self._waiter:
1322 info.append(f'waiter={self._waiter!r}')
1323 if self._exception:
1324 info.append(f'exception={self._exception!r}')
1325 if self._transport:
1326 info.append(f'transport={self._transport!r}')
1327 if self._paused:
1328 info.append('paused')
1329 return '<{}>'.format(' '.join(info))
1330
1331 @property
1332 def mode(self):
1333 return self._mode
1334
1335 def is_server_side(self):
1336 return self._is_server_side
1337
1338 @property
1339 def transport(self):
1340 return self._transport
1341
1342 def write(self, data):
1343 _ensure_can_write(self._mode)
1344 self._transport.write(data)
1345 return self._fast_drain()
1346
1347 def writelines(self, data):
1348 _ensure_can_write(self._mode)
1349 self._transport.writelines(data)
1350 return self._fast_drain()
1351
1352 def _fast_drain(self):
1353 # The helper tries to use fast-path to return already existing
1354 # complete future object if underlying transport is not paused
1355 #and actual waiting for writing resume is not needed
1356 exc = self.exception()
1357 if exc is not None:
1358 fut = self._loop.create_future()
1359 fut.set_exception(exc)
1360 return fut
1361 if not self._transport.is_closing():
1362 if self._protocol._connection_lost:
1363 fut = self._loop.create_future()
1364 fut.set_exception(ConnectionResetError('Connection lost'))
1365 return fut
1366 if not self._protocol._paused:
1367 # fast path, the stream is not paused
1368 # no need to wait for resume signal
1369 return self._complete_fut
1370 return _OptionalAwait(self.drain)
1371
1372 def write_eof(self):
1373 _ensure_can_write(self._mode)
1374 return self._transport.write_eof()
1375
1376 def can_write_eof(self):
1377 if not self._mode.is_write():
1378 return False
1379 return self._transport.can_write_eof()
1380
1381 def close(self):
1382 self._transport.close()
1383 return _OptionalAwait(self.wait_closed)
1384
1385 def is_closing(self):
1386 return self._transport.is_closing()
1387
1388 async def abort(self):
1389 self._transport.abort()
1390 await self.wait_closed()
1391
1392 async def wait_closed(self):
1393 await self._protocol._get_close_waiter(self)
1394
1395 def get_extra_info(self, name, default=None):
1396 return self._transport.get_extra_info(name, default)
1397
1398 async def drain(self):
1399 """Flush the write buffer.
1400
1401 The intended use is to write
1402
1403 w.write(data)
1404 await w.drain()
1405 """
1406 _ensure_can_write(self._mode)
1407 exc = self.exception()
1408 if exc is not None:
1409 raise exc
1410 if self._transport.is_closing():
1411 # Wait for protocol.connection_lost() call
1412 # Raise connection closing error if any,
1413 # ConnectionResetError otherwise
1414 await tasks.sleep(0)
1415 await self._protocol._drain_helper()
1416
1417 async def sendfile(self, file, offset=0, count=None, *, fallback=True):
1418 await self.drain() # check for stream mode and exceptions
1419 return await self._loop.sendfile(self._transport, file,
1420 offset, count, fallback=fallback)
1421
1422 async def start_tls(self, sslcontext, *,
1423 server_hostname=None,
1424 ssl_handshake_timeout=None):
1425 await self.drain() # check for stream mode and exceptions
1426 transport = await self._loop.start_tls(
1427 self._transport, self._protocol, sslcontext,
1428 server_side=self._is_server_side,
1429 server_hostname=server_hostname,
1430 ssl_handshake_timeout=ssl_handshake_timeout)
1431 self._transport = transport
1432 self._protocol._transport = transport
1433 self._protocol._over_ssl = True
1434
1435 def exception(self):
1436 return self._exception
1437
1438 def set_exception(self, exc):
1439 self._exception = exc
1440
1441 waiter = self._waiter
1442 if waiter is not None:
1443 self._waiter = None
1444 if not waiter.cancelled():
1445 waiter.set_exception(exc)
1446
1447 def _wakeup_waiter(self):
1448 """Wakeup read*() functions waiting for data or EOF."""
1449 waiter = self._waiter
1450 if waiter is not None:
1451 self._waiter = None
1452 if not waiter.cancelled():
1453 waiter.set_result(None)
1454
1455 def set_transport(self, transport):
1456 if transport is self._transport:
1457 return
1458 assert self._transport is None, 'Transport already set'
1459 self._transport = transport
1460
1461 def _maybe_resume_transport(self):
1462 if self._paused and len(self._buffer) <= self._limit:
1463 self._paused = False
1464 self._transport.resume_reading()
1465
1466 def feed_eof(self):
1467 self._eof = True
1468 self._wakeup_waiter()
1469
1470 def at_eof(self):
1471 """Return True if the buffer is empty and 'feed_eof' was called."""
1472 return self._eof and not self._buffer
1473
1474 def feed_data(self, data):
1475 _ensure_can_read(self._mode)
1476 assert not self._eof, 'feed_data after feed_eof'
1477
1478 if not data:
1479 return
1480
1481 self._buffer.extend(data)
1482 self._wakeup_waiter()
1483
1484 if (self._transport is not None and
1485 not self._paused and
1486 len(self._buffer) > 2 * self._limit):
1487 try:
1488 self._transport.pause_reading()
1489 except NotImplementedError:
1490 # The transport can't be paused.
1491 # We'll just have to buffer all data.
1492 # Forget the transport so we don't keep trying.
1493 self._transport = None
1494 else:
1495 self._paused = True
1496
1497 async def _wait_for_data(self, func_name):
1498 """Wait until feed_data() or feed_eof() is called.
1499
1500 If stream was paused, automatically resume it.
1501 """
1502 # StreamReader uses a future to link the protocol feed_data() method
1503 # to a read coroutine. Running two read coroutines at the same time
1504 # would have an unexpected behaviour. It would not possible to know
1505 # which coroutine would get the next data.
1506 if self._waiter is not None:
1507 raise RuntimeError(
1508 f'{func_name}() called while another coroutine is '
1509 f'already waiting for incoming data')
1510
1511 assert not self._eof, '_wait_for_data after EOF'
1512
1513 # Waiting for data while paused will make deadlock, so prevent it.
1514 # This is essential for readexactly(n) for case when n > self._limit.
1515 if self._paused:
1516 self._paused = False
1517 self._transport.resume_reading()
1518
1519 self._waiter = self._loop.create_future()
1520 try:
1521 await self._waiter
1522 finally:
1523 self._waiter = None
1524
1525 async def readline(self):
1526 """Read chunk of data from the stream until newline (b'\n') is found.
1527
1528 On success, return chunk that ends with newline. If only partial
1529 line can be read due to EOF, return incomplete line without
1530 terminating newline. When EOF was reached while no bytes read, empty
1531 bytes object is returned.
1532
1533 If limit is reached, ValueError will be raised. In that case, if
1534 newline was found, complete line including newline will be removed
1535 from internal buffer. Else, internal buffer will be cleared. Limit is
1536 compared against part of the line without newline.
1537
1538 If stream was paused, this function will automatically resume it if
1539 needed.
1540 """
1541 _ensure_can_read(self._mode)
1542 sep = b'\n'
1543 seplen = len(sep)
1544 try:
1545 line = await self.readuntil(sep)
1546 except exceptions.IncompleteReadError as e:
1547 return e.partial
1548 except exceptions.LimitOverrunError as e:
1549 if self._buffer.startswith(sep, e.consumed):
1550 del self._buffer[:e.consumed + seplen]
1551 else:
1552 self._buffer.clear()
1553 self._maybe_resume_transport()
1554 raise ValueError(e.args[0])
1555 return line
1556
1557 async def readuntil(self, separator=b'\n'):
1558 """Read data from the stream until ``separator`` is found.
1559
1560 On success, the data and separator will be removed from the
1561 internal buffer (consumed). Returned data will include the
1562 separator at the end.
1563
1564 Configured stream limit is used to check result. Limit sets the
1565 maximal length of data that can be returned, not counting the
1566 separator.
1567
1568 If an EOF occurs and the complete separator is still not found,
1569 an IncompleteReadError exception will be raised, and the internal
1570 buffer will be reset. The IncompleteReadError.partial attribute
1571 may contain the separator partially.
1572
1573 If the data cannot be read because of over limit, a
1574 LimitOverrunError exception will be raised, and the data
1575 will be left in the internal buffer, so it can be read again.
1576 """
1577 _ensure_can_read(self._mode)
1578 seplen = len(separator)
1579 if seplen == 0:
1580 raise ValueError('Separator should be at least one-byte string')
1581
1582 if self._exception is not None:
1583 raise self._exception
1584
1585 # Consume whole buffer except last bytes, which length is
1586 # one less than seplen. Let's check corner cases with
1587 # separator='SEPARATOR':
1588 # * we have received almost complete separator (without last
1589 # byte). i.e buffer='some textSEPARATO'. In this case we
1590 # can safely consume len(separator) - 1 bytes.
1591 # * last byte of buffer is first byte of separator, i.e.
1592 # buffer='abcdefghijklmnopqrS'. We may safely consume
1593 # everything except that last byte, but this require to
1594 # analyze bytes of buffer that match partial separator.
1595 # This is slow and/or require FSM. For this case our
1596 # implementation is not optimal, since require rescanning
1597 # of data that is known to not belong to separator. In
1598 # real world, separator will not be so long to notice
1599 # performance problems. Even when reading MIME-encoded
1600 # messages :)
1601
1602 # `offset` is the number of bytes from the beginning of the buffer
1603 # where there is no occurrence of `separator`.
1604 offset = 0
1605
1606 # Loop until we find `separator` in the buffer, exceed the buffer size,
1607 # or an EOF has happened.
1608 while True:
1609 buflen = len(self._buffer)
1610
1611 # Check if we now have enough data in the buffer for `separator` to
1612 # fit.
1613 if buflen - offset >= seplen:
1614 isep = self._buffer.find(separator, offset)
1615
1616 if isep != -1:
1617 # `separator` is in the buffer. `isep` will be used later
1618 # to retrieve the data.
1619 break
1620
1621 # see upper comment for explanation.
1622 offset = buflen + 1 - seplen
1623 if offset > self._limit:
1624 raise exceptions.LimitOverrunError(
1625 'Separator is not found, and chunk exceed the limit',
1626 offset)
1627
1628 # Complete message (with full separator) may be present in buffer
1629 # even when EOF flag is set. This may happen when the last chunk
1630 # adds data which makes separator be found. That's why we check for
1631 # EOF *ater* inspecting the buffer.
1632 if self._eof:
1633 chunk = bytes(self._buffer)
1634 self._buffer.clear()
1635 raise exceptions.IncompleteReadError(chunk, None)
1636
1637 # _wait_for_data() will resume reading if stream was paused.
1638 await self._wait_for_data('readuntil')
1639
1640 if isep > self._limit:
1641 raise exceptions.LimitOverrunError(
1642 'Separator is found, but chunk is longer than limit', isep)
1643
1644 chunk = self._buffer[:isep + seplen]
1645 del self._buffer[:isep + seplen]
1646 self._maybe_resume_transport()
1647 return bytes(chunk)
1648
1649 async def read(self, n=-1):
1650 """Read up to `n` bytes from the stream.
1651
1652 If n is not provided, or set to -1, read until EOF and return all read
1653 bytes. If the EOF was received and the internal buffer is empty, return
1654 an empty bytes object.
1655
1656 If n is zero, return empty bytes object immediately.
1657
1658 If n is positive, this function try to read `n` bytes, and may return
1659 less or equal bytes than requested, but at least one byte. If EOF was
1660 received before any byte is read, this function returns empty byte
1661 object.
1662
1663 Returned value is not limited with limit, configured at stream
1664 creation.
1665
1666 If stream was paused, this function will automatically resume it if
1667 needed.
1668 """
1669 _ensure_can_read(self._mode)
1670
1671 if self._exception is not None:
1672 raise self._exception
1673
1674 if n == 0:
1675 return b''
1676
1677 if n < 0:
1678 # This used to just loop creating a new waiter hoping to
1679 # collect everything in self._buffer, but that would
1680 # deadlock if the subprocess sends more than self.limit
1681 # bytes. So just call self.read(self._limit) until EOF.
1682 blocks = []
1683 while True:
1684 block = await self.read(self._limit)
1685 if not block:
1686 break
1687 blocks.append(block)
1688 return b''.join(blocks)
1689
1690 if not self._buffer and not self._eof:
1691 await self._wait_for_data('read')
1692
1693 # This will work right even if buffer is less than n bytes
1694 data = bytes(self._buffer[:n])
1695 del self._buffer[:n]
1696
1697 self._maybe_resume_transport()
1698 return data
1699
1700 async def readexactly(self, n):
1701 """Read exactly `n` bytes.
1702
1703 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1704 read. The IncompleteReadError.partial attribute of the exception will
1705 contain the partial read bytes.
1706
1707 if n is zero, return empty bytes object.
1708
1709 Returned value is not limited with limit, configured at stream
1710 creation.
1711
1712 If stream was paused, this function will automatically resume it if
1713 needed.
1714 """
1715 _ensure_can_read(self._mode)
1716 if n < 0:
1717 raise ValueError('readexactly size can not be less than zero')
1718
1719 if self._exception is not None:
1720 raise self._exception
1721
1722 if n == 0:
1723 return b''
1724
1725 while len(self._buffer) < n:
1726 if self._eof:
1727 incomplete = bytes(self._buffer)
1728 self._buffer.clear()
1729 raise exceptions.IncompleteReadError(incomplete, n)
1730
1731 await self._wait_for_data('readexactly')
1732
1733 if len(self._buffer) == n:
1734 data = bytes(self._buffer)
1735 self._buffer.clear()
1736 else:
1737 data = bytes(self._buffer[:n])
1738 del self._buffer[:n]
1739 self._maybe_resume_transport()
1740 return data
1741
1742 def __aiter__(self):
1743 _ensure_can_read(self._mode)
1744 return self
1745
1746 async def __anext__(self):
1747 val = await self.readline()
1748 if val == b'':
1749 raise StopAsyncIteration
1750 return val
1751
1752 async def __aenter__(self):
1753 return self
1754
1755 async def __aexit__(self, exc_type, exc_val, exc_tb):
1756 await self.close()