blob: 146a33818d952a0a1e944db4e5d04d809213adc9 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07003 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -04007import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07008import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05009
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050011 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080012
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070016from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020019from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Victor Stinner9551f772018-05-29 16:02:07 +020022_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025async def open_connection(host=None, port=None, *,
26 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 """A wrapper for create_connection() returning a (reader, writer) pair.
28
29 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010030 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32 The arguments are all the usual arguments to create_connection()
33 except protocol_factory; most common are positional host and port,
34 with various optional keyword arguments following.
35
36 Additional optional keyword arguments are loop (to set the event loop
37 instance to use) and limit (to set the buffer limit passed to the
38 StreamReader).
39
40 (If you want to customize the StreamReader and/or
41 StreamReaderProtocol classes, just copy the code -- there's
42 really nothing special here except some convenience.)
43 """
44 if loop is None:
45 loop = events.get_event_loop()
Andrew Svetlovad4ed872019-05-06 22:52:11 -040046 reader = StreamReader(limit=limit, loop=loop,
47 _asyncio_internal=True)
48 protocol = StreamReaderProtocol(reader, loop=loop,
49 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020050 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 lambda: protocol, host, port, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -040052 writer = StreamWriter(transport, protocol, reader, loop,
53 _asyncio_internal=True)
Guido van Rossum355491d2013-10-18 15:17:11 -070054 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055
56
Andrew Svetlov5f841b52017-12-09 00:23:48 +020057async def start_server(client_connected_cb, host=None, port=None, *,
58 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080059 """Start a socket server, call back for each client connected.
60
61 The first parameter, `client_connected_cb`, takes two parameters:
62 client_reader, client_writer. client_reader is a StreamReader
63 object, while client_writer is a StreamWriter object. This
64 parameter can either be a plain callback function or a coroutine;
65 if it is a coroutine, it will be automatically converted into a
66 Task.
67
68 The rest of the arguments are all the usual arguments to
69 loop.create_server() except protocol_factory; most common are
70 positional host and port, with various optional keyword arguments
71 following. The return value is the same as loop.create_server().
72
73 Additional optional keyword arguments are loop (to set the event loop
74 instance to use) and limit (to set the buffer limit passed to the
75 StreamReader).
76
77 The return value is the same as loop.create_server(), i.e. a
78 Server object which can be used to stop the service.
79 """
80 if loop is None:
81 loop = events.get_event_loop()
82
83 def factory():
Andrew Svetlovad4ed872019-05-06 22:52:11 -040084 reader = StreamReader(limit=limit, loop=loop,
85 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -080086 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040087 loop=loop,
88 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -080089 return protocol
90
Andrew Svetlov5f841b52017-12-09 00:23:48 +020091 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080092
93
Yury Selivanovb057c522014-02-18 12:15:06 -050094if hasattr(socket, 'AF_UNIX'):
95 # UNIX Domain Sockets are supported on this platform
96
Andrew Svetlov5f841b52017-12-09 00:23:48 +020097 async def open_unix_connection(path=None, *,
98 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -050099 """Similar to `open_connection` but works with UNIX Domain Sockets."""
100 if loop is None:
101 loop = events.get_event_loop()
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400102 reader = StreamReader(limit=limit, loop=loop,
103 _asyncio_internal=True)
104 protocol = StreamReaderProtocol(reader, loop=loop,
105 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200106 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500107 lambda: protocol, path, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400108 writer = StreamWriter(transport, protocol, reader, loop,
109 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500110 return reader, writer
111
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200112 async def start_unix_server(client_connected_cb, path=None, *,
113 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500114 """Similar to `start_server` but works with UNIX Domain Sockets."""
115 if loop is None:
116 loop = events.get_event_loop()
117
118 def factory():
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400119 reader = StreamReader(limit=limit, loop=loop,
120 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500121 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400122 loop=loop,
123 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500124 return protocol
125
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200126 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500127
128
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800129class FlowControlMixin(protocols.Protocol):
130 """Reusable flow control logic for StreamWriter.drain().
131
132 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800133 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800134 these it must call the super methods.
135
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200136 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137 """
138
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400139 def __init__(self, loop=None, *, _asyncio_internal=False):
Victor Stinner70db9e42015-01-09 21:32:05 +0100140 if loop is None:
141 self._loop = events.get_event_loop()
142 else:
143 self._loop = loop
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400144 if not _asyncio_internal:
145 # NOTE:
146 # Avoid inheritance from FlowControlMixin
147 # Copy-paste the code to your project
148 # if you need flow control helpers
149 warnings.warn(f"{self.__class__} should be instaniated "
150 "by asyncio internals only, "
151 "please avoid its creation from user code",
152 DeprecationWarning)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153 self._paused = False
154 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156
157 def pause_writing(self):
158 assert not self._paused
159 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200160 if self._loop.get_debug():
161 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162
163 def resume_writing(self):
164 assert self._paused
165 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200166 if self._loop.get_debug():
167 logger.debug("%r resumes writing", self)
168
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800169 waiter = self._drain_waiter
170 if waiter is not None:
171 self._drain_waiter = None
172 if not waiter.done():
173 waiter.set_result(None)
174
175 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200176 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177 # Wake up the writer if currently paused.
178 if not self._paused:
179 return
180 waiter = self._drain_waiter
181 if waiter is None:
182 return
183 self._drain_waiter = None
184 if waiter.done():
185 return
186 if exc is None:
187 waiter.set_result(None)
188 else:
189 waiter.set_exception(exc)
190
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200191 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 if self._connection_lost:
193 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800194 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200195 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 waiter = self._drain_waiter
197 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400198 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800199 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800201
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400202 def _get_close_waiter(self, stream):
203 raise NotImplementedError
204
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205
206class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
207 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208
209 (This is a helper class instead of making StreamReader itself a
210 Protocol subclass, because the StreamReader has other potential
211 uses, and to prevent the user of the StreamReader to accidentally
212 call inappropriate methods of the protocol.)
213 """
214
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700215 _source_traceback = None
216
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400217 def __init__(self, stream_reader, client_connected_cb=None, loop=None,
218 *, _asyncio_internal=False):
219 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700220 if stream_reader is not None:
221 self._stream_reader_wr = weakref.ref(stream_reader,
222 self._on_reader_gc)
223 self._source_traceback = stream_reader._source_traceback
224 else:
225 self._stream_reader_wr = None
226 if client_connected_cb is not None:
227 # This is a stream created by the `create_server()` function.
228 # Keep a strong reference to the reader until a connection
229 # is established.
230 self._strong_reader = stream_reader
231 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800232 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700233 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800234 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400235 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200236 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700238 def _on_reader_gc(self, wr):
239 transport = self._transport
240 if transport is not None:
241 # connection_made was called
242 context = {
243 'message': ('An open stream object is being garbage '
244 'collected; call "stream.close()" explicitly.')
245 }
246 if self._source_traceback:
247 context['source_traceback'] = self._source_traceback
248 self._loop.call_exception_handler(context)
249 transport.abort()
250 else:
251 self._reject_connection = True
252 self._stream_reader_wr = None
253
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700254 @property
255 def _stream_reader(self):
256 if self._stream_reader_wr is None:
257 return None
258 return self._stream_reader_wr()
259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 def connection_made(self, transport):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700261 if self._reject_connection:
262 context = {
263 'message': ('An open stream was garbage collected prior to '
264 'establishing network connection; '
265 'call "stream.close()" explicitly.')
266 }
267 if self._source_traceback:
268 context['source_traceback'] = self._source_traceback
269 self._loop.call_exception_handler(context)
270 transport.abort()
271 return
272 self._transport = transport
273 reader = self._stream_reader
274 if reader is not None:
275 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400276 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800277 if self._client_connected_cb is not None:
278 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700279 reader,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400280 self._loop,
281 _asyncio_internal=True)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700282 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800283 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200284 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200285 self._loop.create_task(res)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700286 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288 def connection_lost(self, exc):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700289 reader = self._stream_reader
290 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400291 if exc is None:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700292 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400293 else:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700294 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200295 if not self._closed.done():
296 if exc is None:
297 self._closed.set_result(None)
298 else:
299 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800300 super().connection_lost(exc)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700301 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400302 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700303 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304
305 def data_received(self, data):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700306 reader = self._stream_reader
307 if reader is not None:
308 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309
310 def eof_received(self):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700311 reader = self._stream_reader
312 if reader is not None:
313 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400314 if self._over_ssl:
315 # Prevent a warning in SSLProtocol.eof_received:
316 # "returning true from eof_received()
317 # has no effect when using ssl"
318 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200319 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700320
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400321 def _get_close_waiter(self, stream):
322 return self._closed
323
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200324 def __del__(self):
325 # Prevent reports about unhandled exceptions.
326 # Better than self._closed._log_traceback = False hack
327 closed = self._closed
328 if closed.done() and not closed.cancelled():
329 closed.exception()
330
Guido van Rossum355491d2013-10-18 15:17:11 -0700331
Andrew Svetlovf12ba7c2019-05-14 19:09:44 +0300332def _swallow_unhandled_exception(task):
333 # Do a trick to suppress unhandled exception
334 # if stream.write() was used without await and
335 # stream.drain() was paused and resumed with an exception
336 task.exception()
337
338
Guido van Rossum355491d2013-10-18 15:17:11 -0700339class StreamWriter:
340 """Wraps a Transport.
341
342 This exposes write(), writelines(), [can_]write_eof(),
343 get_extra_info() and close(). It adds drain() which returns an
344 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800345 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700346 directly.
347 """
348
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400349 def __init__(self, transport, protocol, reader, loop,
350 *, _asyncio_internal=False):
351 if not _asyncio_internal:
352 warnings.warn(f"{self.__class__} should be instaniated "
353 "by asyncio internals only, "
354 "please avoid its creation from user code",
355 DeprecationWarning)
Guido van Rossum355491d2013-10-18 15:17:11 -0700356 self._transport = transport
357 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000358 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200359 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700360 self._reader = reader
361 self._loop = loop
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400362 self._complete_fut = self._loop.create_future()
363 self._complete_fut.set_result(None)
Guido van Rossum355491d2013-10-18 15:17:11 -0700364
Victor Stinneracdb7822014-07-14 18:33:40 +0200365 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500366 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200367 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500368 info.append(f'reader={self._reader!r}')
369 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200370
Guido van Rossum355491d2013-10-18 15:17:11 -0700371 @property
372 def transport(self):
373 return self._transport
374
375 def write(self, data):
376 self._transport.write(data)
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400377 return self._fast_drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700378
379 def writelines(self, data):
380 self._transport.writelines(data)
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400381 return self._fast_drain()
382
383 def _fast_drain(self):
384 # The helper tries to use fast-path to return already existing complete future
385 # object if underlying transport is not paused and actual waiting for writing
386 # resume is not needed
387 if self._reader is not None:
388 # this branch will be simplified after merging reader with writer
389 exc = self._reader.exception()
390 if exc is not None:
391 fut = self._loop.create_future()
392 fut.set_exception(exc)
393 return fut
394 if not self._transport.is_closing():
395 if self._protocol._connection_lost:
396 fut = self._loop.create_future()
397 fut.set_exception(ConnectionResetError('Connection lost'))
398 return fut
399 if not self._protocol._paused:
400 # fast path, the stream is not paused
401 # no need to wait for resume signal
402 return self._complete_fut
Andrew Svetlovf12ba7c2019-05-14 19:09:44 +0300403 ret = self._loop.create_task(self.drain())
404 ret.add_done_callback(_swallow_unhandled_exception)
405 return ret
Guido van Rossum355491d2013-10-18 15:17:11 -0700406
407 def write_eof(self):
408 return self._transport.write_eof()
409
410 def can_write_eof(self):
411 return self._transport.can_write_eof()
412
Victor Stinner406204c2015-01-15 21:50:19 +0100413 def close(self):
Andrew Svetlov11194c82018-09-13 16:53:49 -0700414 self._transport.close()
Andrew Svetlova076e4f2019-05-09 15:14:58 -0400415 return self._protocol._get_close_waiter(self)
Victor Stinner406204c2015-01-15 21:50:19 +0100416
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200417 def is_closing(self):
418 return self._transport.is_closing()
419
420 async def wait_closed(self):
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400421 await self._protocol._get_close_waiter(self)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200422
Guido van Rossum355491d2013-10-18 15:17:11 -0700423 def get_extra_info(self, name, default=None):
424 return self._transport.get_extra_info(name, default)
425
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200426 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200427 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700428
429 The intended use is to write
430
431 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200432 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700433 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200434 if self._reader is not None:
435 exc = self._reader.exception()
436 if exc is not None:
437 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200438 if self._transport.is_closing():
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400439 # Wait for protocol.connection_lost() call
440 # Raise connection closing error if any,
441 # ConnectionResetError otherwise
442 fut = self._protocol._get_close_waiter(self)
443 await fut
444 raise ConnectionResetError('Connection lost')
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200445 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447
448class StreamReader:
449
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700450 _source_traceback = None
451
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400452 def __init__(self, limit=_DEFAULT_LIMIT, loop=None,
453 *, _asyncio_internal=False):
454 if not _asyncio_internal:
455 warnings.warn(f"{self.__class__} should be instaniated "
456 "by asyncio internals only, "
457 "please avoid its creation from user code",
458 DeprecationWarning)
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 # The line length limit is a security feature;
461 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500462
463 if limit <= 0:
464 raise ValueError('Limit cannot be <= 0')
465
Guido van Rossum355491d2013-10-18 15:17:11 -0700466 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100468 self._loop = events.get_event_loop()
469 else:
470 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500471 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100472 self._eof = False # Whether we're done.
473 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 self._exception = None
475 self._transport = None
476 self._paused = False
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700477 if self._loop.get_debug():
478 self._source_traceback = format_helpers.extract_stack(
479 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200481 def __repr__(self):
482 info = ['StreamReader']
483 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500484 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200485 if self._eof:
486 info.append('eof')
487 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500488 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200489 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500490 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200491 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500492 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200493 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500494 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200495 if self._paused:
496 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500497 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200498
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 def exception(self):
500 return self._exception
501
502 def set_exception(self, exc):
503 self._exception = exc
504
Guido van Rossum355491d2013-10-18 15:17:11 -0700505 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700507 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 if not waiter.cancelled():
509 waiter.set_exception(exc)
510
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100511 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500512 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100513 waiter = self._waiter
514 if waiter is not None:
515 self._waiter = None
516 if not waiter.cancelled():
517 waiter.set_result(None)
518
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 def set_transport(self, transport):
520 assert self._transport is None, 'Transport already set'
521 self._transport = transport
522
523 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500524 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700526 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527
528 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700529 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100530 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
Yury Selivanovf0020f52014-02-06 00:14:30 -0500532 def at_eof(self):
533 """Return True if the buffer is empty and 'feed_eof' was called."""
534 return self._eof and not self._buffer
535
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500537 assert not self._eof, 'feed_data after feed_eof'
538
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 if not data:
540 return
541
Yury Selivanove694c972014-02-05 18:11:13 -0500542 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100543 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544
545 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400546 not self._paused and
547 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700549 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 except NotImplementedError:
551 # The transport can't be paused.
552 # We'll just have to buffer all data.
553 # Forget the transport so we don't keep trying.
554 self._transport = None
555 else:
556 self._paused = True
557
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200558 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500559 """Wait until feed_data() or feed_eof() is called.
560
561 If stream was paused, automatically resume it.
562 """
Victor Stinner183e3472014-01-23 17:40:03 +0100563 # StreamReader uses a future to link the protocol feed_data() method
564 # to a read coroutine. Running two read coroutines at the same time
565 # would have an unexpected behaviour. It would not possible to know
566 # which coroutine would get the next data.
567 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500568 raise RuntimeError(
569 f'{func_name}() called while another coroutine is '
570 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100571
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500572 assert not self._eof, '_wait_for_data after EOF'
573
574 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400575 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576 if self._paused:
577 self._paused = False
578 self._transport.resume_reading()
579
Yury Selivanov7661db62016-05-16 15:38:39 -0400580 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100581 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200582 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100583 finally:
584 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100585
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200586 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500587 """Read chunk of data from the stream until newline (b'\n') is found.
588
589 On success, return chunk that ends with newline. If only partial
590 line can be read due to EOF, return incomplete line without
591 terminating newline. When EOF was reached while no bytes read, empty
592 bytes object is returned.
593
594 If limit is reached, ValueError will be raised. In that case, if
595 newline was found, complete line including newline will be removed
596 from internal buffer. Else, internal buffer will be cleared. Limit is
597 compared against part of the line without newline.
598
599 If stream was paused, this function will automatically resume it if
600 needed.
601 """
602 sep = b'\n'
603 seplen = len(sep)
604 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200605 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700606 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500607 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700608 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500609 if self._buffer.startswith(sep, e.consumed):
610 del self._buffer[:e.consumed + seplen]
611 else:
612 self._buffer.clear()
613 self._maybe_resume_transport()
614 raise ValueError(e.args[0])
615 return line
616
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200617 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400618 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500619
Yury Selivanovb4617912016-05-16 16:32:38 -0400620 On success, the data and separator will be removed from the
621 internal buffer (consumed). Returned data will include the
622 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500623
Yury Selivanovb4617912016-05-16 16:32:38 -0400624 Configured stream limit is used to check result. Limit sets the
625 maximal length of data that can be returned, not counting the
626 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500627
Yury Selivanovb4617912016-05-16 16:32:38 -0400628 If an EOF occurs and the complete separator is still not found,
629 an IncompleteReadError exception will be raised, and the internal
630 buffer will be reset. The IncompleteReadError.partial attribute
631 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500632
Yury Selivanovb4617912016-05-16 16:32:38 -0400633 If the data cannot be read because of over limit, a
634 LimitOverrunError exception will be raised, and the data
635 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500636 """
637 seplen = len(separator)
638 if seplen == 0:
639 raise ValueError('Separator should be at least one-byte string')
640
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 if self._exception is not None:
642 raise self._exception
643
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 # Consume whole buffer except last bytes, which length is
645 # one less than seplen. Let's check corner cases with
646 # separator='SEPARATOR':
647 # * we have received almost complete separator (without last
648 # byte). i.e buffer='some textSEPARATO'. In this case we
649 # can safely consume len(separator) - 1 bytes.
650 # * last byte of buffer is first byte of separator, i.e.
651 # buffer='abcdefghijklmnopqrS'. We may safely consume
652 # everything except that last byte, but this require to
653 # analyze bytes of buffer that match partial separator.
654 # This is slow and/or require FSM. For this case our
655 # implementation is not optimal, since require rescanning
656 # of data that is known to not belong to separator. In
657 # real world, separator will not be so long to notice
658 # performance problems. Even when reading MIME-encoded
659 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660
Yury Selivanovb4617912016-05-16 16:32:38 -0400661 # `offset` is the number of bytes from the beginning of the buffer
662 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500663 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500665 # Loop until we find `separator` in the buffer, exceed the buffer size,
666 # or an EOF has happened.
667 while True:
668 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500670 # Check if we now have enough data in the buffer for `separator` to
671 # fit.
672 if buflen - offset >= seplen:
673 isep = self._buffer.find(separator, offset)
674
675 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400676 # `separator` is in the buffer. `isep` will be used later
677 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500678 break
679
680 # see upper comment for explanation.
681 offset = buflen + 1 - seplen
682 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700683 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400684 'Separator is not found, and chunk exceed the limit',
685 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500686
687 # Complete message (with full separator) may be present in buffer
688 # even when EOF flag is set. This may happen when the last chunk
689 # adds data which makes separator be found. That's why we check for
690 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700691 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500692 chunk = bytes(self._buffer)
693 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700694 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500696 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200697 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500699 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700700 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400701 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500702
703 chunk = self._buffer[:isep + seplen]
704 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500706 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200708 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500709 """Read up to `n` bytes from the stream.
710
711 If n is not provided, or set to -1, read until EOF and return all read
712 bytes. If the EOF was received and the internal buffer is empty, return
713 an empty bytes object.
714
Martin Panter0be894b2016-09-07 12:03:06 +0000715 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500716
717 If n is positive, this function try to read `n` bytes, and may return
718 less or equal bytes than requested, but at least one byte. If EOF was
719 received before any byte is read, this function returns empty byte
720 object.
721
Yury Selivanovb4617912016-05-16 16:32:38 -0400722 Returned value is not limited with limit, configured at stream
723 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500724
725 If stream was paused, this function will automatically resume it if
726 needed.
727 """
728
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 if self._exception is not None:
730 raise self._exception
731
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500732 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700733 return b''
734
735 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700736 # This used to just loop creating a new waiter hoping to
737 # collect everything in self._buffer, but that would
738 # deadlock if the subprocess sends more than self.limit
739 # bytes. So just call self.read(self._limit) until EOF.
740 blocks = []
741 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200742 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700743 if not block:
744 break
745 blocks.append(block)
746 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500748 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200749 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500750
751 # This will work right even if buffer is less than n bytes
752 data = bytes(self._buffer[:n])
753 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754
Yury Selivanove694c972014-02-05 18:11:13 -0500755 self._maybe_resume_transport()
756 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200758 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500759 """Read exactly `n` bytes.
760
Yury Selivanovb4617912016-05-16 16:32:38 -0400761 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
762 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500763 contain the partial read bytes.
764
765 if n is zero, return empty bytes object.
766
Yury Selivanovb4617912016-05-16 16:32:38 -0400767 Returned value is not limited with limit, configured at stream
768 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500769
770 If stream was paused, this function will automatically resume it if
771 needed.
772 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500773 if n < 0:
774 raise ValueError('readexactly size can not be less than zero')
775
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 if self._exception is not None:
777 raise self._exception
778
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500779 if n == 0:
780 return b''
781
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400782 while len(self._buffer) < n:
783 if self._eof:
784 incomplete = bytes(self._buffer)
785 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700786 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200788 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400790 if len(self._buffer) == n:
791 data = bytes(self._buffer)
792 self._buffer.clear()
793 else:
794 data = bytes(self._buffer[:n])
795 del self._buffer[:n]
796 self._maybe_resume_transport()
797 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400798
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400799 def __aiter__(self):
800 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400801
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200802 async def __anext__(self):
803 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400804 if val == b'':
805 raise StopAsyncIteration
806 return val