blob: 204eaf7394c5bbb71d58eb2f7001a2e9c49b945b [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Andrew Svetlov23b4b692019-05-27 22:56:22 +03002 'Stream', 'StreamMode',
3 'open_connection', 'start_server',
4 'connect', 'connect_read_pipe', 'connect_write_pipe',
5 'StreamServer')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Andrew Svetlov23b4b692019-05-27 22:56:22 +03007import enum
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07009import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -040010import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070011import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -050012
Guido van Rossume3e786c2014-02-18 10:24:30 -080013if hasattr(socket, 'AF_UNIX'):
Andrew Svetlov23b4b692019-05-27 22:56:22 +030014 __all__ += ('open_unix_connection', 'start_unix_server',
15 'connect_unix',
16 'UnixStreamServer')
Guido van Rossume3e786c2014-02-18 10:24:30 -080017
Victor Stinnerf951d282014-06-29 00:46:45 +020018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070020from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070021from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020023from .log import logger
Andrew Svetlov23b4b692019-05-27 22:56:22 +030024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner9551f772018-05-29 16:02:07 +020027_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossuma849be92014-01-30 16:05:28 -080029
Andrew Svetlov23b4b692019-05-27 22:56:22 +030030class StreamMode(enum.Flag):
31 READ = enum.auto()
32 WRITE = enum.auto()
33 READWRITE = READ | WRITE
34
35
36def _ensure_can_read(mode):
37 if not mode & StreamMode.READ:
38 raise RuntimeError("The stream is write-only")
39
40
41def _ensure_can_write(mode):
42 if not mode & StreamMode.WRITE:
43 raise RuntimeError("The stream is read-only")
44
45
46class _ContextManagerHelper:
47 __slots__ = ('_awaitable', '_result')
48
49 def __init__(self, awaitable):
50 self._awaitable = awaitable
51 self._result = None
52
53 def __await__(self):
54 return self._awaitable.__await__()
55
56 async def __aenter__(self):
57 ret = await self._awaitable
58 result = await ret.__aenter__()
59 self._result = result
60 return result
61
62 async def __aexit__(self, exc_type, exc_val, exc_tb):
63 return await self._result.__aexit__(exc_type, exc_val, exc_tb)
64
65
66def connect(host=None, port=None, *,
67 limit=_DEFAULT_LIMIT,
68 ssl=None, family=0, proto=0,
69 flags=0, sock=None, local_addr=None,
70 server_hostname=None,
71 ssl_handshake_timeout=None,
72 happy_eyeballs_delay=None, interleave=None):
73 # Design note:
74 # Don't use decorator approach but exilicit non-async
75 # function to fail fast and explicitly
76 # if passed arguments don't match the function signature
77 return _ContextManagerHelper(_connect(host, port, limit,
78 ssl, family, proto,
79 flags, sock, local_addr,
80 server_hostname,
81 ssl_handshake_timeout,
82 happy_eyeballs_delay,
83 interleave))
84
85
86async def _connect(host, port,
87 limit,
88 ssl, family, proto,
89 flags, sock, local_addr,
90 server_hostname,
91 ssl_handshake_timeout,
92 happy_eyeballs_delay, interleave):
93 loop = events.get_running_loop()
94 stream = Stream(mode=StreamMode.READWRITE,
95 limit=limit,
96 loop=loop,
97 _asyncio_internal=True)
98 await loop.create_connection(
99 lambda: _StreamProtocol(stream, loop=loop,
100 _asyncio_internal=True),
101 host, port,
102 ssl=ssl, family=family, proto=proto,
103 flags=flags, sock=sock, local_addr=local_addr,
104 server_hostname=server_hostname,
105 ssl_handshake_timeout=ssl_handshake_timeout,
106 happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
107 return stream
108
109
110def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
111 # Design note:
112 # Don't use decorator approach but explicit non-async
113 # function to fail fast and explicitly
114 # if passed arguments don't match the function signature
115 return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
116
117
118async def _connect_read_pipe(pipe, limit):
119 loop = events.get_running_loop()
120 stream = Stream(mode=StreamMode.READ,
121 limit=limit,
122 loop=loop,
123 _asyncio_internal=True)
124 await loop.connect_read_pipe(
125 lambda: _StreamProtocol(stream, loop=loop,
126 _asyncio_internal=True),
127 pipe)
128 return stream
129
130
131def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
132 # Design note:
133 # Don't use decorator approach but explicit non-async
134 # function to fail fast and explicitly
135 # if passed arguments don't match the function signature
136 return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
137
138
139async def _connect_write_pipe(pipe, limit):
140 loop = events.get_running_loop()
141 stream = Stream(mode=StreamMode.WRITE,
142 limit=limit,
143 loop=loop,
144 _asyncio_internal=True)
145 await loop.connect_write_pipe(
146 lambda: _StreamProtocol(stream, loop=loop,
147 _asyncio_internal=True),
148 pipe)
149 return stream
150
151
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200152async def open_connection(host=None, port=None, *,
153 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """A wrapper for create_connection() returning a (reader, writer) pair.
155
156 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +0100157 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 The arguments are all the usual arguments to create_connection()
160 except protocol_factory; most common are positional host and port,
161 with various optional keyword arguments following.
162
163 Additional optional keyword arguments are loop (to set the event loop
164 instance to use) and limit (to set the buffer limit passed to the
165 StreamReader).
166
167 (If you want to customize the StreamReader and/or
168 StreamReaderProtocol classes, just copy the code -- there's
169 really nothing special here except some convenience.)
170 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300171 warnings.warn("open_connection() is deprecated since Python 3.8 "
172 "in favor of connect(), and scheduled for removal "
173 "in Python 3.10",
174 DeprecationWarning,
175 stacklevel=2)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 if loop is None:
177 loop = events.get_event_loop()
Miss Islington (bot)8899b112019-06-04 23:01:01 -0700178 else:
179 warnings.warn("The loop argument is deprecated since Python 3.8, "
180 "and scheduled for removal in Python 3.10.",
181 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300182 reader = StreamReader(limit=limit, loop=loop)
183 protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200184 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300186 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700187 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188
189
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200190async def start_server(client_connected_cb, host=None, port=None, *,
191 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -0800192 """Start a socket server, call back for each client connected.
193
194 The first parameter, `client_connected_cb`, takes two parameters:
195 client_reader, client_writer. client_reader is a StreamReader
196 object, while client_writer is a StreamWriter object. This
197 parameter can either be a plain callback function or a coroutine;
198 if it is a coroutine, it will be automatically converted into a
199 Task.
200
201 The rest of the arguments are all the usual arguments to
202 loop.create_server() except protocol_factory; most common are
203 positional host and port, with various optional keyword arguments
204 following. The return value is the same as loop.create_server().
205
206 Additional optional keyword arguments are loop (to set the event loop
207 instance to use) and limit (to set the buffer limit passed to the
208 StreamReader).
209
210 The return value is the same as loop.create_server(), i.e. a
211 Server object which can be used to stop the service.
212 """
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300213 warnings.warn("start_server() is deprecated since Python 3.8 "
214 "in favor of StreamServer(), and scheduled for removal "
215 "in Python 3.10",
216 DeprecationWarning,
217 stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -0800218 if loop is None:
219 loop = events.get_event_loop()
Miss Islington (bot)8899b112019-06-04 23:01:01 -0700220 else:
221 warnings.warn("The loop argument is deprecated since Python 3.8, "
222 "and scheduled for removal in Python 3.10.",
223 DeprecationWarning, stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -0800224
225 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300226 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400228 loop=loop,
229 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -0800230 return protocol
231
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200232 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800233
234
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300235class _BaseStreamServer:
236 # Design notes.
237 # StreamServer and UnixStreamServer are exposed as FINAL classes,
238 # not function factories.
239 # async with serve(host, port) as server:
240 # server.start_serving()
241 # looks ugly.
242 # The class doesn't provide API for enumerating connected streams
243 # It can be a subject for improvements in Python 3.9
244
245 _server_impl = None
246
247 def __init__(self, client_connected_cb,
248 /,
249 limit=_DEFAULT_LIMIT,
250 shutdown_timeout=60,
251 _asyncio_internal=False):
252 if not _asyncio_internal:
253 raise RuntimeError("_ServerStream is a private asyncio class")
254 self._client_connected_cb = client_connected_cb
255 self._limit = limit
256 self._loop = events.get_running_loop()
257 self._streams = {}
258 self._shutdown_timeout = shutdown_timeout
259
260 def __init_subclass__(cls):
261 if not cls.__module__.startswith('asyncio.'):
262 raise TypeError(f"asyncio.{cls.__name__} "
263 "class cannot be inherited from")
264
265 async def bind(self):
266 if self._server_impl is not None:
267 return
268 self._server_impl = await self._bind()
269
270 def is_bound(self):
271 return self._server_impl is not None
272
273 @property
274 def sockets(self):
275 # multiple value for socket bound to both IPv4 and IPv6 families
276 if self._server_impl is None:
277 return ()
278 return self._server_impl.sockets
279
280 def is_serving(self):
281 if self._server_impl is None:
282 return False
283 return self._server_impl.is_serving()
284
285 async def start_serving(self):
286 await self.bind()
287 await self._server_impl.start_serving()
288
289 async def serve_forever(self):
290 await self.start_serving()
291 await self._server_impl.serve_forever()
292
293 async def close(self):
294 if self._server_impl is None:
295 return
296 self._server_impl.close()
297 streams = list(self._streams.keys())
298 active_tasks = list(self._streams.values())
299 if streams:
300 await tasks.wait([stream.close() for stream in streams])
301 await self._server_impl.wait_closed()
302 self._server_impl = None
303 await self._shutdown_active_tasks(active_tasks)
304
305 async def abort(self):
306 if self._server_impl is None:
307 return
308 self._server_impl.close()
309 streams = list(self._streams.keys())
310 active_tasks = list(self._streams.values())
311 if streams:
312 await tasks.wait([stream.abort() for stream in streams])
313 await self._server_impl.wait_closed()
314 self._server_impl = None
315 await self._shutdown_active_tasks(active_tasks)
316
317 async def __aenter__(self):
318 await self.bind()
319 return self
320
321 async def __aexit__(self, exc_type, exc_value, exc_tb):
322 await self.close()
323
324 def _attach(self, stream, task):
325 self._streams[stream] = task
326
327 def _detach(self, stream, task):
328 del self._streams[stream]
329
330 async def _shutdown_active_tasks(self, active_tasks):
331 if not active_tasks:
332 return
333 # NOTE: tasks finished with exception are reported
334 # by the Task.__del__() method.
335 done, pending = await tasks.wait(active_tasks,
336 timeout=self._shutdown_timeout)
337 if not pending:
338 return
339 for task in pending:
340 task.cancel()
341 done, pending = await tasks.wait(pending,
342 timeout=self._shutdown_timeout)
343 for task in pending:
344 self._loop.call_exception_handler({
345 "message": (f'{task!r} ignored cancellation request '
346 f'from a closing {self!r}'),
347 "stream_server": self
348 })
349
350 def __repr__(self):
351 ret = [f'{self.__class__.__name__}']
352 if self.is_serving():
353 ret.append('serving')
354 if self.sockets:
355 ret.append(f'sockets={self.sockets!r}')
356 return '<' + ' '.join(ret) + '>'
357
358 def __del__(self, _warn=warnings.warn):
359 if self._server_impl is not None:
360 _warn(f"unclosed stream server {self!r}",
361 ResourceWarning, source=self)
362 self._server_impl.close()
363
364
365class StreamServer(_BaseStreamServer):
366
367 def __init__(self, client_connected_cb, /, host=None, port=None, *,
368 limit=_DEFAULT_LIMIT,
369 family=socket.AF_UNSPEC,
370 flags=socket.AI_PASSIVE, sock=None, backlog=100,
371 ssl=None, reuse_address=None, reuse_port=None,
372 ssl_handshake_timeout=None,
373 shutdown_timeout=60):
374 super().__init__(client_connected_cb,
375 limit=limit,
376 shutdown_timeout=shutdown_timeout,
377 _asyncio_internal=True)
378 self._host = host
379 self._port = port
380 self._family = family
381 self._flags = flags
382 self._sock = sock
383 self._backlog = backlog
384 self._ssl = ssl
385 self._reuse_address = reuse_address
386 self._reuse_port = reuse_port
387 self._ssl_handshake_timeout = ssl_handshake_timeout
388
389 async def _bind(self):
390 def factory():
391 protocol = _ServerStreamProtocol(self,
392 self._limit,
393 self._client_connected_cb,
394 loop=self._loop,
395 _asyncio_internal=True)
396 return protocol
397 return await self._loop.create_server(
398 factory,
399 self._host,
400 self._port,
401 start_serving=False,
402 family=self._family,
403 flags=self._flags,
404 sock=self._sock,
405 backlog=self._backlog,
406 ssl=self._ssl,
407 reuse_address=self._reuse_address,
408 reuse_port=self._reuse_port,
409 ssl_handshake_timeout=self._ssl_handshake_timeout)
410
411
Yury Selivanovb057c522014-02-18 12:15:06 -0500412if hasattr(socket, 'AF_UNIX'):
413 # UNIX Domain Sockets are supported on this platform
414
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200415 async def open_unix_connection(path=None, *,
416 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500417 """Similar to `open_connection` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300418 warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
419 "in favor of connect_unix(), and scheduled for removal "
420 "in Python 3.10",
421 DeprecationWarning,
422 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500423 if loop is None:
424 loop = events.get_event_loop()
Miss Islington (bot)8899b112019-06-04 23:01:01 -0700425 else:
426 warnings.warn("The loop argument is deprecated since Python 3.8, "
427 "and scheduled for removal in Python 3.10.",
428 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300429 reader = StreamReader(limit=limit, loop=loop)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400430 protocol = StreamReaderProtocol(reader, loop=loop,
431 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200432 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500433 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300434 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500435 return reader, writer
436
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300437
438 def connect_unix(path=None, *,
439 limit=_DEFAULT_LIMIT,
440 ssl=None, sock=None,
441 server_hostname=None,
442 ssl_handshake_timeout=None):
443 """Similar to `connect()` but works with UNIX Domain Sockets."""
444 # Design note:
445 # Don't use decorator approach but exilicit non-async
446 # function to fail fast and explicitly
447 # if passed arguments don't match the function signature
448 return _ContextManagerHelper(_connect_unix(path,
449 limit,
450 ssl, sock,
451 server_hostname,
452 ssl_handshake_timeout))
453
454
455 async def _connect_unix(path,
456 limit,
457 ssl, sock,
458 server_hostname,
459 ssl_handshake_timeout):
460 """Similar to `connect()` but works with UNIX Domain Sockets."""
461 loop = events.get_running_loop()
462 stream = Stream(mode=StreamMode.READWRITE,
463 limit=limit,
464 loop=loop,
465 _asyncio_internal=True)
466 await loop.create_unix_connection(
467 lambda: _StreamProtocol(stream,
468 loop=loop,
469 _asyncio_internal=True),
470 path,
471 ssl=ssl,
472 sock=sock,
473 server_hostname=server_hostname,
474 ssl_handshake_timeout=ssl_handshake_timeout)
475 return stream
476
477
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 async def start_unix_server(client_connected_cb, path=None, *,
479 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500480 """Similar to `start_server` but works with UNIX Domain Sockets."""
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300481 warnings.warn("start_unix_server() is deprecated since Python 3.8 "
482 "in favor of UnixStreamServer(), and scheduled "
483 "for removal in Python 3.10",
484 DeprecationWarning,
485 stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500486 if loop is None:
487 loop = events.get_event_loop()
Miss Islington (bot)8899b112019-06-04 23:01:01 -0700488 else:
489 warnings.warn("The loop argument is deprecated since Python 3.8, "
490 "and scheduled for removal in Python 3.10.",
491 DeprecationWarning, stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500492
493 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300494 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500495 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400496 loop=loop,
497 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500498 return protocol
499
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200500 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500501
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300502 class UnixStreamServer(_BaseStreamServer):
503
504 def __init__(self, client_connected_cb, /, path=None, *,
505 limit=_DEFAULT_LIMIT,
506 sock=None,
507 backlog=100,
508 ssl=None,
509 ssl_handshake_timeout=None,
510 shutdown_timeout=60):
511 super().__init__(client_connected_cb,
512 limit=limit,
513 shutdown_timeout=shutdown_timeout,
514 _asyncio_internal=True)
515 self._path = path
516 self._sock = sock
517 self._backlog = backlog
518 self._ssl = ssl
519 self._ssl_handshake_timeout = ssl_handshake_timeout
520
521 async def _bind(self):
522 def factory():
523 protocol = _ServerStreamProtocol(self,
524 self._limit,
525 self._client_connected_cb,
526 loop=self._loop,
527 _asyncio_internal=True)
528 return protocol
529 return await self._loop.create_unix_server(
530 factory,
531 self._path,
532 start_serving=False,
533 sock=self._sock,
534 backlog=self._backlog,
535 ssl=self._ssl,
536 ssl_handshake_timeout=self._ssl_handshake_timeout)
537
Yury Selivanovb057c522014-02-18 12:15:06 -0500538
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800539class FlowControlMixin(protocols.Protocol):
540 """Reusable flow control logic for StreamWriter.drain().
541
542 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800543 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800544 these it must call the super methods.
545
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200546 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800547 """
548
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400549 def __init__(self, loop=None, *, _asyncio_internal=False):
Victor Stinner70db9e42015-01-09 21:32:05 +0100550 if loop is None:
551 self._loop = events.get_event_loop()
552 else:
553 self._loop = loop
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400554 if not _asyncio_internal:
555 # NOTE:
556 # Avoid inheritance from FlowControlMixin
557 # Copy-paste the code to your project
558 # if you need flow control helpers
559 warnings.warn(f"{self.__class__} should be instaniated "
560 "by asyncio internals only, "
561 "please avoid its creation from user code",
562 DeprecationWarning)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800563 self._paused = False
564 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200565 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800566
567 def pause_writing(self):
568 assert not self._paused
569 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200570 if self._loop.get_debug():
571 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800572
573 def resume_writing(self):
574 assert self._paused
575 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200576 if self._loop.get_debug():
577 logger.debug("%r resumes writing", self)
578
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800579 waiter = self._drain_waiter
580 if waiter is not None:
581 self._drain_waiter = None
582 if not waiter.done():
583 waiter.set_result(None)
584
585 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200586 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800587 # Wake up the writer if currently paused.
588 if not self._paused:
589 return
590 waiter = self._drain_waiter
591 if waiter is None:
592 return
593 self._drain_waiter = None
594 if waiter.done():
595 return
596 if exc is None:
597 waiter.set_result(None)
598 else:
599 waiter.set_exception(exc)
600
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200601 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200602 if self._connection_lost:
603 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800604 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200605 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800606 waiter = self._drain_waiter
607 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400608 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800609 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200610 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800611
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400612 def _get_close_waiter(self, stream):
613 raise NotImplementedError
614
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800615
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300616# begin legacy stream APIs
617
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800618class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
619 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620
621 (This is a helper class instead of making StreamReader itself a
622 Protocol subclass, because the StreamReader has other potential
623 uses, and to prevent the user of the StreamReader to accidentally
624 call inappropriate methods of the protocol.)
625 """
626
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400627 def __init__(self, stream_reader, client_connected_cb=None, loop=None,
628 *, _asyncio_internal=False):
629 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300630 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800631 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800632 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400633 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200634 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635
636 def connection_made(self, transport):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300637 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400638 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800639 if self._client_connected_cb is not None:
640 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300641 self._stream_reader,
642 self._loop)
643 res = self._client_connected_cb(self._stream_reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800644 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200645 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200646 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647
648 def connection_lost(self, exc):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300649 if self._stream_reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400650 if exc is None:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300651 self._stream_reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400652 else:
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300653 self._stream_reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200654 if not self._closed.done():
655 if exc is None:
656 self._closed.set_result(None)
657 else:
658 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800659 super().connection_lost(exc)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300660 self._stream_reader = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400661 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662
663 def data_received(self, data):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300664 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
666 def eof_received(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300667 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400668 if self._over_ssl:
669 # Prevent a warning in SSLProtocol.eof_received:
670 # "returning true from eof_received()
671 # has no effect when using ssl"
672 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200673 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700674
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200675 def __del__(self):
676 # Prevent reports about unhandled exceptions.
677 # Better than self._closed._log_traceback = False hack
678 closed = self._closed
679 if closed.done() and not closed.cancelled():
680 closed.exception()
681
Guido van Rossum355491d2013-10-18 15:17:11 -0700682
683class StreamWriter:
684 """Wraps a Transport.
685
686 This exposes write(), writelines(), [can_]write_eof(),
687 get_extra_info() and close(). It adds drain() which returns an
688 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800689 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700690 directly.
691 """
692
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300693 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700694 self._transport = transport
695 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000696 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200697 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700698 self._reader = reader
699 self._loop = loop
700
Victor Stinneracdb7822014-07-14 18:33:40 +0200701 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500702 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200703 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500704 info.append(f'reader={self._reader!r}')
705 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200706
Guido van Rossum355491d2013-10-18 15:17:11 -0700707 @property
708 def transport(self):
709 return self._transport
710
711 def write(self, data):
712 self._transport.write(data)
713
714 def writelines(self, data):
715 self._transport.writelines(data)
716
717 def write_eof(self):
718 return self._transport.write_eof()
719
720 def can_write_eof(self):
721 return self._transport.can_write_eof()
722
Victor Stinner406204c2015-01-15 21:50:19 +0100723 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300724 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100725
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200726 def is_closing(self):
727 return self._transport.is_closing()
728
729 async def wait_closed(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300730 await self._protocol._closed
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200731
Guido van Rossum355491d2013-10-18 15:17:11 -0700732 def get_extra_info(self, name, default=None):
733 return self._transport.get_extra_info(name, default)
734
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200735 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200736 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700737
738 The intended use is to write
739
740 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200741 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700742 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200743 if self._reader is not None:
744 exc = self._reader.exception()
745 if exc is not None:
746 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200747 if self._transport.is_closing():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300748 # Yield to the event loop so connection_lost() may be
749 # called. Without this, _drain_helper() would return
750 # immediately, and code that calls
751 # write(...); await drain()
752 # in a loop would never call connection_lost(), so it
753 # would not see an error when the socket is closed.
754 await tasks.sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200755 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756
757
758class StreamReader:
759
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300760 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 # The line length limit is a security feature;
762 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500763
764 if limit <= 0:
765 raise ValueError('Limit cannot be <= 0')
766
Guido van Rossum355491d2013-10-18 15:17:11 -0700767 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100769 self._loop = events.get_event_loop()
770 else:
771 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500772 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100773 self._eof = False # Whether we're done.
774 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700775 self._exception = None
776 self._transport = None
777 self._paused = False
778
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200779 def __repr__(self):
780 info = ['StreamReader']
781 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500782 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200783 if self._eof:
784 info.append('eof')
785 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500786 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200787 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500788 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200789 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500790 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200791 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500792 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200793 if self._paused:
794 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500795 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200796
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700797 def exception(self):
798 return self._exception
799
800 def set_exception(self, exc):
801 self._exception = exc
802
Guido van Rossum355491d2013-10-18 15:17:11 -0700803 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700805 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700806 if not waiter.cancelled():
807 waiter.set_exception(exc)
808
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100809 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500810 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100811 waiter = self._waiter
812 if waiter is not None:
813 self._waiter = None
814 if not waiter.cancelled():
815 waiter.set_result(None)
816
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817 def set_transport(self, transport):
818 assert self._transport is None, 'Transport already set'
819 self._transport = transport
820
821 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500822 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700823 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700824 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825
826 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700827 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100828 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700829
Yury Selivanovf0020f52014-02-06 00:14:30 -0500830 def at_eof(self):
831 """Return True if the buffer is empty and 'feed_eof' was called."""
832 return self._eof and not self._buffer
833
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700834 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500835 assert not self._eof, 'feed_data after feed_eof'
836
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700837 if not data:
838 return
839
Yury Selivanove694c972014-02-05 18:11:13 -0500840 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100841 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700842
843 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400844 not self._paused and
845 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700846 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700847 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700848 except NotImplementedError:
849 # The transport can't be paused.
850 # We'll just have to buffer all data.
851 # Forget the transport so we don't keep trying.
852 self._transport = None
853 else:
854 self._paused = True
855
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200856 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500857 """Wait until feed_data() or feed_eof() is called.
858
859 If stream was paused, automatically resume it.
860 """
Victor Stinner183e3472014-01-23 17:40:03 +0100861 # StreamReader uses a future to link the protocol feed_data() method
862 # to a read coroutine. Running two read coroutines at the same time
863 # would have an unexpected behaviour. It would not possible to know
864 # which coroutine would get the next data.
865 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500866 raise RuntimeError(
867 f'{func_name}() called while another coroutine is '
868 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100869
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500870 assert not self._eof, '_wait_for_data after EOF'
871
872 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400873 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500874 if self._paused:
875 self._paused = False
876 self._transport.resume_reading()
877
Yury Selivanov7661db62016-05-16 15:38:39 -0400878 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100879 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200880 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100881 finally:
882 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100883
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200884 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500885 """Read chunk of data from the stream until newline (b'\n') is found.
886
887 On success, return chunk that ends with newline. If only partial
888 line can be read due to EOF, return incomplete line without
889 terminating newline. When EOF was reached while no bytes read, empty
890 bytes object is returned.
891
892 If limit is reached, ValueError will be raised. In that case, if
893 newline was found, complete line including newline will be removed
894 from internal buffer. Else, internal buffer will be cleared. Limit is
895 compared against part of the line without newline.
896
897 If stream was paused, this function will automatically resume it if
898 needed.
899 """
900 sep = b'\n'
901 seplen = len(sep)
902 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200903 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700904 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500905 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700906 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500907 if self._buffer.startswith(sep, e.consumed):
908 del self._buffer[:e.consumed + seplen]
909 else:
910 self._buffer.clear()
911 self._maybe_resume_transport()
912 raise ValueError(e.args[0])
913 return line
914
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200915 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400916 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500917
Yury Selivanovb4617912016-05-16 16:32:38 -0400918 On success, the data and separator will be removed from the
919 internal buffer (consumed). Returned data will include the
920 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500921
Yury Selivanovb4617912016-05-16 16:32:38 -0400922 Configured stream limit is used to check result. Limit sets the
923 maximal length of data that can be returned, not counting the
924 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500925
Yury Selivanovb4617912016-05-16 16:32:38 -0400926 If an EOF occurs and the complete separator is still not found,
927 an IncompleteReadError exception will be raised, and the internal
928 buffer will be reset. The IncompleteReadError.partial attribute
929 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500930
Yury Selivanovb4617912016-05-16 16:32:38 -0400931 If the data cannot be read because of over limit, a
932 LimitOverrunError exception will be raised, and the data
933 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500934 """
935 seplen = len(separator)
936 if seplen == 0:
937 raise ValueError('Separator should be at least one-byte string')
938
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700939 if self._exception is not None:
940 raise self._exception
941
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500942 # Consume whole buffer except last bytes, which length is
943 # one less than seplen. Let's check corner cases with
944 # separator='SEPARATOR':
945 # * we have received almost complete separator (without last
946 # byte). i.e buffer='some textSEPARATO'. In this case we
947 # can safely consume len(separator) - 1 bytes.
948 # * last byte of buffer is first byte of separator, i.e.
949 # buffer='abcdefghijklmnopqrS'. We may safely consume
950 # everything except that last byte, but this require to
951 # analyze bytes of buffer that match partial separator.
952 # This is slow and/or require FSM. For this case our
953 # implementation is not optimal, since require rescanning
954 # of data that is known to not belong to separator. In
955 # real world, separator will not be so long to notice
956 # performance problems. Even when reading MIME-encoded
957 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700958
Yury Selivanovb4617912016-05-16 16:32:38 -0400959 # `offset` is the number of bytes from the beginning of the buffer
960 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500961 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700962
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500963 # Loop until we find `separator` in the buffer, exceed the buffer size,
964 # or an EOF has happened.
965 while True:
966 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700967
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500968 # Check if we now have enough data in the buffer for `separator` to
969 # fit.
970 if buflen - offset >= seplen:
971 isep = self._buffer.find(separator, offset)
972
973 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400974 # `separator` is in the buffer. `isep` will be used later
975 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500976 break
977
978 # see upper comment for explanation.
979 offset = buflen + 1 - seplen
980 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700981 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400982 'Separator is not found, and chunk exceed the limit',
983 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500984
985 # Complete message (with full separator) may be present in buffer
986 # even when EOF flag is set. This may happen when the last chunk
987 # adds data which makes separator be found. That's why we check for
988 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700989 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500990 chunk = bytes(self._buffer)
991 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700992 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700993
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500994 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200995 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700996
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500997 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700998 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400999 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001000
1001 chunk = self._buffer[:isep + seplen]
1002 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001003 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001004 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001005
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001006 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001007 """Read up to `n` bytes from the stream.
1008
1009 If n is not provided, or set to -1, read until EOF and return all read
1010 bytes. If the EOF was received and the internal buffer is empty, return
1011 an empty bytes object.
1012
Martin Panter0be894b2016-09-07 12:03:06 +00001013 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001014
1015 If n is positive, this function try to read `n` bytes, and may return
1016 less or equal bytes than requested, but at least one byte. If EOF was
1017 received before any byte is read, this function returns empty byte
1018 object.
1019
Yury Selivanovb4617912016-05-16 16:32:38 -04001020 Returned value is not limited with limit, configured at stream
1021 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001022
1023 If stream was paused, this function will automatically resume it if
1024 needed.
1025 """
1026
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001027 if self._exception is not None:
1028 raise self._exception
1029
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001030 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001031 return b''
1032
1033 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001034 # This used to just loop creating a new waiter hoping to
1035 # collect everything in self._buffer, but that would
1036 # deadlock if the subprocess sends more than self.limit
1037 # bytes. So just call self.read(self._limit) until EOF.
1038 blocks = []
1039 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001040 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -07001041 if not block:
1042 break
1043 blocks.append(block)
1044 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001045
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001046 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001047 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001048
1049 # This will work right even if buffer is less than n bytes
1050 data = bytes(self._buffer[:n])
1051 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001052
Yury Selivanove694c972014-02-05 18:11:13 -05001053 self._maybe_resume_transport()
1054 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001055
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001056 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001057 """Read exactly `n` bytes.
1058
Yury Selivanovb4617912016-05-16 16:32:38 -04001059 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1060 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001061 contain the partial read bytes.
1062
1063 if n is zero, return empty bytes object.
1064
Yury Selivanovb4617912016-05-16 16:32:38 -04001065 Returned value is not limited with limit, configured at stream
1066 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001067
1068 If stream was paused, this function will automatically resume it if
1069 needed.
1070 """
Yury Selivanovdddc7812015-12-11 11:32:59 -05001071 if n < 0:
1072 raise ValueError('readexactly size can not be less than zero')
1073
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001074 if self._exception is not None:
1075 raise self._exception
1076
Yury Selivanovd9d0e862016-01-11 12:28:19 -05001077 if n == 0:
1078 return b''
1079
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001080 while len(self._buffer) < n:
1081 if self._eof:
1082 incomplete = bytes(self._buffer)
1083 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07001084 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001085
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001086 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001087
Yury Selivanov3e56ff02016-10-05 18:01:12 -04001088 if len(self._buffer) == n:
1089 data = bytes(self._buffer)
1090 self._buffer.clear()
1091 else:
1092 data = bytes(self._buffer[:n])
1093 del self._buffer[:n]
1094 self._maybe_resume_transport()
1095 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -04001096
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001097 def __aiter__(self):
1098 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -04001099
Andrew Svetlov5f841b52017-12-09 00:23:48 +02001100 async def __anext__(self):
1101 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -04001102 if val == b'':
1103 raise StopAsyncIteration
1104 return val
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001105
1106
1107# end legacy stream APIs
1108
1109
1110class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
1111 """Helper class to adapt between Protocol and StreamReader.
1112
1113 (This is a helper class instead of making StreamReader itself a
1114 Protocol subclass, because the StreamReader has other potential
1115 uses, and to prevent the user of the StreamReader to accidentally
1116 call inappropriate methods of the protocol.)
1117 """
1118
1119 _stream = None # initialized in derived classes
1120
1121 def __init__(self, loop=None,
1122 *, _asyncio_internal=False):
1123 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1124 self._transport = None
1125 self._over_ssl = False
1126 self._closed = self._loop.create_future()
1127
1128 def connection_made(self, transport):
1129 self._transport = transport
1130 self._over_ssl = transport.get_extra_info('sslcontext') is not None
1131
1132 def connection_lost(self, exc):
1133 stream = self._stream
1134 if stream is not None:
1135 if exc is None:
1136 stream.feed_eof()
1137 else:
1138 stream.set_exception(exc)
1139 if not self._closed.done():
1140 if exc is None:
1141 self._closed.set_result(None)
1142 else:
1143 self._closed.set_exception(exc)
1144 super().connection_lost(exc)
1145 self._transport = None
1146
1147 def data_received(self, data):
1148 stream = self._stream
1149 if stream is not None:
1150 stream.feed_data(data)
1151
1152 def eof_received(self):
1153 stream = self._stream
1154 if stream is not None:
1155 stream.feed_eof()
1156 if self._over_ssl:
1157 # Prevent a warning in SSLProtocol.eof_received:
1158 # "returning true from eof_received()
1159 # has no effect when using ssl"
1160 return False
1161 return True
1162
1163 def _get_close_waiter(self, stream):
1164 return self._closed
1165
1166 def __del__(self):
1167 # Prevent reports about unhandled exceptions.
1168 # Better than self._closed._log_traceback = False hack
1169 closed = self._get_close_waiter(self._stream)
1170 if closed.done() and not closed.cancelled():
1171 closed.exception()
1172
1173
1174class _StreamProtocol(_BaseStreamProtocol):
1175 _source_traceback = None
1176
1177 def __init__(self, stream, loop=None,
1178 *, _asyncio_internal=False):
1179 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1180 self._source_traceback = stream._source_traceback
1181 self._stream_wr = weakref.ref(stream, self._on_gc)
1182 self._reject_connection = False
1183
1184 def _on_gc(self, wr):
1185 transport = self._transport
1186 if transport is not None:
1187 # connection_made was called
1188 context = {
1189 'message': ('An open stream object is being garbage '
1190 'collected; call "stream.close()" explicitly.')
1191 }
1192 if self._source_traceback:
1193 context['source_traceback'] = self._source_traceback
1194 self._loop.call_exception_handler(context)
1195 transport.abort()
1196 else:
1197 self._reject_connection = True
1198 self._stream_wr = None
1199
1200 @property
1201 def _stream(self):
1202 if self._stream_wr is None:
1203 return None
1204 return self._stream_wr()
1205
1206 def connection_made(self, transport):
1207 if self._reject_connection:
1208 context = {
1209 'message': ('An open stream was garbage collected prior to '
1210 'establishing network connection; '
1211 'call "stream.close()" explicitly.')
1212 }
1213 if self._source_traceback:
1214 context['source_traceback'] = self._source_traceback
1215 self._loop.call_exception_handler(context)
1216 transport.abort()
1217 return
1218 super().connection_made(transport)
1219 stream = self._stream
1220 if stream is None:
1221 return
1222 stream.set_transport(transport)
1223 stream._protocol = self
1224
1225 def connection_lost(self, exc):
1226 super().connection_lost(exc)
1227 self._stream_wr = None
1228
1229
1230class _ServerStreamProtocol(_BaseStreamProtocol):
1231 def __init__(self, server, limit, client_connected_cb, loop=None,
1232 *, _asyncio_internal=False):
1233 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
1234 assert self._closed
1235 self._client_connected_cb = client_connected_cb
1236 self._limit = limit
1237 self._server = server
1238 self._task = None
1239
1240 def connection_made(self, transport):
1241 super().connection_made(transport)
1242 stream = Stream(mode=StreamMode.READWRITE,
1243 transport=transport,
1244 protocol=self,
1245 limit=self._limit,
1246 loop=self._loop,
1247 is_server_side=True,
1248 _asyncio_internal=True)
1249 self._stream = stream
1250 # If self._client_connected_cb(self._stream) fails
1251 # the exception is logged by transport
1252 self._task = self._loop.create_task(
1253 self._client_connected_cb(self._stream))
1254 self._server._attach(stream, self._task)
1255
1256 def connection_lost(self, exc):
1257 super().connection_lost(exc)
1258 self._server._detach(self._stream, self._task)
1259 self._stream = None
1260
1261
1262class _OptionalAwait:
1263 # The class doesn't create a coroutine
1264 # if not awaited
1265 # It prevents "coroutine is never awaited" message
1266
1267 __slots___ = ('_method',)
1268
1269 def __init__(self, method):
1270 self._method = method
1271
1272 def __await__(self):
1273 return self._method().__await__()
1274
1275
1276class Stream:
1277 """Wraps a Transport.
1278
1279 This exposes write(), writelines(), [can_]write_eof(),
1280 get_extra_info() and close(). It adds drain() which returns an
1281 optional Future on which you can wait for flow control. It also
1282 adds a transport property which references the Transport
1283 directly.
1284 """
1285
1286 _source_traceback = None
1287
1288 def __init__(self, mode, *,
1289 transport=None,
1290 protocol=None,
1291 loop=None,
1292 limit=_DEFAULT_LIMIT,
1293 is_server_side=False,
1294 _asyncio_internal=False):
1295 if not _asyncio_internal:
Miss Islington (bot)4cbe7a32019-06-27 04:58:34 -07001296 raise RuntimeError(f"{self.__class__} should be instantiated "
1297 "by asyncio internals only")
Andrew Svetlov23b4b692019-05-27 22:56:22 +03001298 self._mode = mode
1299 self._transport = transport
1300 self._protocol = protocol
1301 self._is_server_side = is_server_side
1302
1303 # The line length limit is a security feature;
1304 # it also doubles as half the buffer limit.
1305
1306 if limit <= 0:
1307 raise ValueError('Limit cannot be <= 0')
1308
1309 self._limit = limit
1310 if loop is None:
1311 self._loop = events.get_event_loop()
1312 else:
1313 self._loop = loop
1314 self._buffer = bytearray()
1315 self._eof = False # Whether we're done.
1316 self._waiter = None # A future used by _wait_for_data()
1317 self._exception = None
1318 self._paused = False
1319 self._complete_fut = self._loop.create_future()
1320 self._complete_fut.set_result(None)
1321
1322 if self._loop.get_debug():
1323 self._source_traceback = format_helpers.extract_stack(
1324 sys._getframe(1))
1325
1326 def __repr__(self):
1327 info = [self.__class__.__name__]
1328 info.append(f'mode={self._mode}')
1329 if self._buffer:
1330 info.append(f'{len(self._buffer)} bytes')
1331 if self._eof:
1332 info.append('eof')
1333 if self._limit != _DEFAULT_LIMIT:
1334 info.append(f'limit={self._limit}')
1335 if self._waiter:
1336 info.append(f'waiter={self._waiter!r}')
1337 if self._exception:
1338 info.append(f'exception={self._exception!r}')
1339 if self._transport:
1340 info.append(f'transport={self._transport!r}')
1341 if self._paused:
1342 info.append('paused')
1343 return '<{}>'.format(' '.join(info))
1344
1345 @property
1346 def mode(self):
1347 return self._mode
1348
1349 def is_server_side(self):
1350 return self._is_server_side
1351
1352 @property
1353 def transport(self):
1354 return self._transport
1355
1356 def write(self, data):
1357 _ensure_can_write(self._mode)
1358 self._transport.write(data)
1359 return self._fast_drain()
1360
1361 def writelines(self, data):
1362 _ensure_can_write(self._mode)
1363 self._transport.writelines(data)
1364 return self._fast_drain()
1365
1366 def _fast_drain(self):
1367 # The helper tries to use fast-path to return already existing
1368 # complete future object if underlying transport is not paused
1369 #and actual waiting for writing resume is not needed
1370 exc = self.exception()
1371 if exc is not None:
1372 fut = self._loop.create_future()
1373 fut.set_exception(exc)
1374 return fut
1375 if not self._transport.is_closing():
1376 if self._protocol._connection_lost:
1377 fut = self._loop.create_future()
1378 fut.set_exception(ConnectionResetError('Connection lost'))
1379 return fut
1380 if not self._protocol._paused:
1381 # fast path, the stream is not paused
1382 # no need to wait for resume signal
1383 return self._complete_fut
1384 return _OptionalAwait(self.drain)
1385
1386 def write_eof(self):
1387 _ensure_can_write(self._mode)
1388 return self._transport.write_eof()
1389
1390 def can_write_eof(self):
1391 if not self._mode.is_write():
1392 return False
1393 return self._transport.can_write_eof()
1394
1395 def close(self):
1396 self._transport.close()
1397 return _OptionalAwait(self.wait_closed)
1398
1399 def is_closing(self):
1400 return self._transport.is_closing()
1401
1402 async def abort(self):
1403 self._transport.abort()
1404 await self.wait_closed()
1405
1406 async def wait_closed(self):
1407 await self._protocol._get_close_waiter(self)
1408
1409 def get_extra_info(self, name, default=None):
1410 return self._transport.get_extra_info(name, default)
1411
1412 async def drain(self):
1413 """Flush the write buffer.
1414
1415 The intended use is to write
1416
1417 w.write(data)
1418 await w.drain()
1419 """
1420 _ensure_can_write(self._mode)
1421 exc = self.exception()
1422 if exc is not None:
1423 raise exc
1424 if self._transport.is_closing():
1425 # Wait for protocol.connection_lost() call
1426 # Raise connection closing error if any,
1427 # ConnectionResetError otherwise
1428 await tasks.sleep(0)
1429 await self._protocol._drain_helper()
1430
1431 async def sendfile(self, file, offset=0, count=None, *, fallback=True):
1432 await self.drain() # check for stream mode and exceptions
1433 return await self._loop.sendfile(self._transport, file,
1434 offset, count, fallback=fallback)
1435
1436 async def start_tls(self, sslcontext, *,
1437 server_hostname=None,
1438 ssl_handshake_timeout=None):
1439 await self.drain() # check for stream mode and exceptions
1440 transport = await self._loop.start_tls(
1441 self._transport, self._protocol, sslcontext,
1442 server_side=self._is_server_side,
1443 server_hostname=server_hostname,
1444 ssl_handshake_timeout=ssl_handshake_timeout)
1445 self._transport = transport
1446 self._protocol._transport = transport
1447 self._protocol._over_ssl = True
1448
1449 def exception(self):
1450 return self._exception
1451
1452 def set_exception(self, exc):
1453 self._exception = exc
1454
1455 waiter = self._waiter
1456 if waiter is not None:
1457 self._waiter = None
1458 if not waiter.cancelled():
1459 waiter.set_exception(exc)
1460
1461 def _wakeup_waiter(self):
1462 """Wakeup read*() functions waiting for data or EOF."""
1463 waiter = self._waiter
1464 if waiter is not None:
1465 self._waiter = None
1466 if not waiter.cancelled():
1467 waiter.set_result(None)
1468
1469 def set_transport(self, transport):
1470 if transport is self._transport:
1471 return
1472 assert self._transport is None, 'Transport already set'
1473 self._transport = transport
1474
1475 def _maybe_resume_transport(self):
1476 if self._paused and len(self._buffer) <= self._limit:
1477 self._paused = False
1478 self._transport.resume_reading()
1479
1480 def feed_eof(self):
1481 self._eof = True
1482 self._wakeup_waiter()
1483
1484 def at_eof(self):
1485 """Return True if the buffer is empty and 'feed_eof' was called."""
1486 return self._eof and not self._buffer
1487
1488 def feed_data(self, data):
1489 _ensure_can_read(self._mode)
1490 assert not self._eof, 'feed_data after feed_eof'
1491
1492 if not data:
1493 return
1494
1495 self._buffer.extend(data)
1496 self._wakeup_waiter()
1497
1498 if (self._transport is not None and
1499 not self._paused and
1500 len(self._buffer) > 2 * self._limit):
1501 try:
1502 self._transport.pause_reading()
1503 except NotImplementedError:
1504 # The transport can't be paused.
1505 # We'll just have to buffer all data.
1506 # Forget the transport so we don't keep trying.
1507 self._transport = None
1508 else:
1509 self._paused = True
1510
1511 async def _wait_for_data(self, func_name):
1512 """Wait until feed_data() or feed_eof() is called.
1513
1514 If stream was paused, automatically resume it.
1515 """
1516 # StreamReader uses a future to link the protocol feed_data() method
1517 # to a read coroutine. Running two read coroutines at the same time
1518 # would have an unexpected behaviour. It would not possible to know
1519 # which coroutine would get the next data.
1520 if self._waiter is not None:
1521 raise RuntimeError(
1522 f'{func_name}() called while another coroutine is '
1523 f'already waiting for incoming data')
1524
1525 assert not self._eof, '_wait_for_data after EOF'
1526
1527 # Waiting for data while paused will make deadlock, so prevent it.
1528 # This is essential for readexactly(n) for case when n > self._limit.
1529 if self._paused:
1530 self._paused = False
1531 self._transport.resume_reading()
1532
1533 self._waiter = self._loop.create_future()
1534 try:
1535 await self._waiter
1536 finally:
1537 self._waiter = None
1538
1539 async def readline(self):
1540 """Read chunk of data from the stream until newline (b'\n') is found.
1541
1542 On success, return chunk that ends with newline. If only partial
1543 line can be read due to EOF, return incomplete line without
1544 terminating newline. When EOF was reached while no bytes read, empty
1545 bytes object is returned.
1546
1547 If limit is reached, ValueError will be raised. In that case, if
1548 newline was found, complete line including newline will be removed
1549 from internal buffer. Else, internal buffer will be cleared. Limit is
1550 compared against part of the line without newline.
1551
1552 If stream was paused, this function will automatically resume it if
1553 needed.
1554 """
1555 _ensure_can_read(self._mode)
1556 sep = b'\n'
1557 seplen = len(sep)
1558 try:
1559 line = await self.readuntil(sep)
1560 except exceptions.IncompleteReadError as e:
1561 return e.partial
1562 except exceptions.LimitOverrunError as e:
1563 if self._buffer.startswith(sep, e.consumed):
1564 del self._buffer[:e.consumed + seplen]
1565 else:
1566 self._buffer.clear()
1567 self._maybe_resume_transport()
1568 raise ValueError(e.args[0])
1569 return line
1570
1571 async def readuntil(self, separator=b'\n'):
1572 """Read data from the stream until ``separator`` is found.
1573
1574 On success, the data and separator will be removed from the
1575 internal buffer (consumed). Returned data will include the
1576 separator at the end.
1577
1578 Configured stream limit is used to check result. Limit sets the
1579 maximal length of data that can be returned, not counting the
1580 separator.
1581
1582 If an EOF occurs and the complete separator is still not found,
1583 an IncompleteReadError exception will be raised, and the internal
1584 buffer will be reset. The IncompleteReadError.partial attribute
1585 may contain the separator partially.
1586
1587 If the data cannot be read because of over limit, a
1588 LimitOverrunError exception will be raised, and the data
1589 will be left in the internal buffer, so it can be read again.
1590 """
1591 _ensure_can_read(self._mode)
1592 seplen = len(separator)
1593 if seplen == 0:
1594 raise ValueError('Separator should be at least one-byte string')
1595
1596 if self._exception is not None:
1597 raise self._exception
1598
1599 # Consume whole buffer except last bytes, which length is
1600 # one less than seplen. Let's check corner cases with
1601 # separator='SEPARATOR':
1602 # * we have received almost complete separator (without last
1603 # byte). i.e buffer='some textSEPARATO'. In this case we
1604 # can safely consume len(separator) - 1 bytes.
1605 # * last byte of buffer is first byte of separator, i.e.
1606 # buffer='abcdefghijklmnopqrS'. We may safely consume
1607 # everything except that last byte, but this require to
1608 # analyze bytes of buffer that match partial separator.
1609 # This is slow and/or require FSM. For this case our
1610 # implementation is not optimal, since require rescanning
1611 # of data that is known to not belong to separator. In
1612 # real world, separator will not be so long to notice
1613 # performance problems. Even when reading MIME-encoded
1614 # messages :)
1615
1616 # `offset` is the number of bytes from the beginning of the buffer
1617 # where there is no occurrence of `separator`.
1618 offset = 0
1619
1620 # Loop until we find `separator` in the buffer, exceed the buffer size,
1621 # or an EOF has happened.
1622 while True:
1623 buflen = len(self._buffer)
1624
1625 # Check if we now have enough data in the buffer for `separator` to
1626 # fit.
1627 if buflen - offset >= seplen:
1628 isep = self._buffer.find(separator, offset)
1629
1630 if isep != -1:
1631 # `separator` is in the buffer. `isep` will be used later
1632 # to retrieve the data.
1633 break
1634
1635 # see upper comment for explanation.
1636 offset = buflen + 1 - seplen
1637 if offset > self._limit:
1638 raise exceptions.LimitOverrunError(
1639 'Separator is not found, and chunk exceed the limit',
1640 offset)
1641
1642 # Complete message (with full separator) may be present in buffer
1643 # even when EOF flag is set. This may happen when the last chunk
1644 # adds data which makes separator be found. That's why we check for
1645 # EOF *ater* inspecting the buffer.
1646 if self._eof:
1647 chunk = bytes(self._buffer)
1648 self._buffer.clear()
1649 raise exceptions.IncompleteReadError(chunk, None)
1650
1651 # _wait_for_data() will resume reading if stream was paused.
1652 await self._wait_for_data('readuntil')
1653
1654 if isep > self._limit:
1655 raise exceptions.LimitOverrunError(
1656 'Separator is found, but chunk is longer than limit', isep)
1657
1658 chunk = self._buffer[:isep + seplen]
1659 del self._buffer[:isep + seplen]
1660 self._maybe_resume_transport()
1661 return bytes(chunk)
1662
1663 async def read(self, n=-1):
1664 """Read up to `n` bytes from the stream.
1665
1666 If n is not provided, or set to -1, read until EOF and return all read
1667 bytes. If the EOF was received and the internal buffer is empty, return
1668 an empty bytes object.
1669
1670 If n is zero, return empty bytes object immediately.
1671
1672 If n is positive, this function try to read `n` bytes, and may return
1673 less or equal bytes than requested, but at least one byte. If EOF was
1674 received before any byte is read, this function returns empty byte
1675 object.
1676
1677 Returned value is not limited with limit, configured at stream
1678 creation.
1679
1680 If stream was paused, this function will automatically resume it if
1681 needed.
1682 """
1683 _ensure_can_read(self._mode)
1684
1685 if self._exception is not None:
1686 raise self._exception
1687
1688 if n == 0:
1689 return b''
1690
1691 if n < 0:
1692 # This used to just loop creating a new waiter hoping to
1693 # collect everything in self._buffer, but that would
1694 # deadlock if the subprocess sends more than self.limit
1695 # bytes. So just call self.read(self._limit) until EOF.
1696 blocks = []
1697 while True:
1698 block = await self.read(self._limit)
1699 if not block:
1700 break
1701 blocks.append(block)
1702 return b''.join(blocks)
1703
1704 if not self._buffer and not self._eof:
1705 await self._wait_for_data('read')
1706
1707 # This will work right even if buffer is less than n bytes
1708 data = bytes(self._buffer[:n])
1709 del self._buffer[:n]
1710
1711 self._maybe_resume_transport()
1712 return data
1713
1714 async def readexactly(self, n):
1715 """Read exactly `n` bytes.
1716
1717 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
1718 read. The IncompleteReadError.partial attribute of the exception will
1719 contain the partial read bytes.
1720
1721 if n is zero, return empty bytes object.
1722
1723 Returned value is not limited with limit, configured at stream
1724 creation.
1725
1726 If stream was paused, this function will automatically resume it if
1727 needed.
1728 """
1729 _ensure_can_read(self._mode)
1730 if n < 0:
1731 raise ValueError('readexactly size can not be less than zero')
1732
1733 if self._exception is not None:
1734 raise self._exception
1735
1736 if n == 0:
1737 return b''
1738
1739 while len(self._buffer) < n:
1740 if self._eof:
1741 incomplete = bytes(self._buffer)
1742 self._buffer.clear()
1743 raise exceptions.IncompleteReadError(incomplete, n)
1744
1745 await self._wait_for_data('readexactly')
1746
1747 if len(self._buffer) == n:
1748 data = bytes(self._buffer)
1749 self._buffer.clear()
1750 else:
1751 data = bytes(self._buffer[:n])
1752 del self._buffer[:n]
1753 self._maybe_resume_transport()
1754 return data
1755
1756 def __aiter__(self):
1757 _ensure_can_read(self._mode)
1758 return self
1759
1760 async def __anext__(self):
1761 val = await self.readline()
1762 if val == b'':
1763 raise StopAsyncIteration
1764 return val
1765
1766 async def __aenter__(self):
1767 return self
1768
1769 async def __aexit__(self, exc_type, exc_val, exc_tb):
1770 await self.close()