blob: 795530e6f69efed66df8a91a383091f65e455a6a [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Yury Selivanov6758e6e2019-09-29 21:59:55 -07002 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -04007import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07008import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05009
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6758e6e2019-09-29 21:59:55 -070011 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080012
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070016from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Yury Selivanov6758e6e2019-09-29 21:59:55 -070019from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Victor Stinner9551f772018-05-29 16:02:07 +020022_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025async def open_connection(host=None, port=None, *,
26 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 """A wrapper for create_connection() returning a (reader, writer) pair.
28
29 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010030 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32 The arguments are all the usual arguments to create_connection()
33 except protocol_factory; most common are positional host and port,
34 with various optional keyword arguments following.
35
36 Additional optional keyword arguments are loop (to set the event loop
37 instance to use) and limit (to set the buffer limit passed to the
38 StreamReader).
39
40 (If you want to customize the StreamReader and/or
41 StreamReaderProtocol classes, just copy the code -- there's
42 really nothing special here except some convenience.)
43 """
44 if loop is None:
45 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -030046 else:
47 warnings.warn("The loop argument is deprecated since Python 3.8, "
48 "and scheduled for removal in Python 3.10.",
49 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030050 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -070051 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020052 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030054 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -070055 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
57
Andrew Svetlov5f841b52017-12-09 00:23:48 +020058async def start_server(client_connected_cb, host=None, port=None, *,
59 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080060 """Start a socket server, call back for each client connected.
61
62 The first parameter, `client_connected_cb`, takes two parameters:
63 client_reader, client_writer. client_reader is a StreamReader
64 object, while client_writer is a StreamWriter object. This
65 parameter can either be a plain callback function or a coroutine;
66 if it is a coroutine, it will be automatically converted into a
67 Task.
68
69 The rest of the arguments are all the usual arguments to
70 loop.create_server() except protocol_factory; most common are
71 positional host and port, with various optional keyword arguments
72 following. The return value is the same as loop.create_server().
73
74 Additional optional keyword arguments are loop (to set the event loop
75 instance to use) and limit (to set the buffer limit passed to the
76 StreamReader).
77
78 The return value is the same as loop.create_server(), i.e. a
79 Server object which can be used to stop the service.
80 """
81 if loop is None:
82 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -030083 else:
84 warnings.warn("The loop argument is deprecated since Python 3.8, "
85 "and scheduled for removal in Python 3.10.",
86 DeprecationWarning, stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -080087
88 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +030089 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080090 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -070091 loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080092 return protocol
93
Andrew Svetlov5f841b52017-12-09 00:23:48 +020094 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080095
96
Yury Selivanovb057c522014-02-18 12:15:06 -050097if hasattr(socket, 'AF_UNIX'):
98 # UNIX Domain Sockets are supported on this platform
99
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200100 async def open_unix_connection(path=None, *,
101 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500102 """Similar to `open_connection` but works with UNIX Domain Sockets."""
103 if loop is None:
104 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300105 else:
106 warnings.warn("The loop argument is deprecated since Python 3.8, "
107 "and scheduled for removal in Python 3.10.",
108 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300109 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700110 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200111 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500112 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300113 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500114 return reader, writer
115
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200116 async def start_unix_server(client_connected_cb, path=None, *,
117 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500118 """Similar to `start_server` but works with UNIX Domain Sockets."""
119 if loop is None:
120 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300121 else:
122 warnings.warn("The loop argument is deprecated since Python 3.8, "
123 "and scheduled for removal in Python 3.10.",
124 DeprecationWarning, stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500125
126 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300127 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500128 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700129 loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500130 return protocol
131
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200132 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500133
134
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800135class FlowControlMixin(protocols.Protocol):
136 """Reusable flow control logic for StreamWriter.drain().
137
138 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800139 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800140 these it must call the super methods.
141
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200142 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800143 """
144
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700145 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100146 if loop is None:
147 self._loop = events.get_event_loop()
148 else:
149 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800150 self._paused = False
151 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200152 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153
154 def pause_writing(self):
155 assert not self._paused
156 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200157 if self._loop.get_debug():
158 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800159
160 def resume_writing(self):
161 assert self._paused
162 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200163 if self._loop.get_debug():
164 logger.debug("%r resumes writing", self)
165
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166 waiter = self._drain_waiter
167 if waiter is not None:
168 self._drain_waiter = None
169 if not waiter.done():
170 waiter.set_result(None)
171
172 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200173 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800174 # Wake up the writer if currently paused.
175 if not self._paused:
176 return
177 waiter = self._drain_waiter
178 if waiter is None:
179 return
180 self._drain_waiter = None
181 if waiter.done():
182 return
183 if exc is None:
184 waiter.set_result(None)
185 else:
186 waiter.set_exception(exc)
187
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200188 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200189 if self._connection_lost:
190 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800191 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800193 waiter = self._drain_waiter
194 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400195 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800198
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400199 def _get_close_waiter(self, stream):
200 raise NotImplementedError
201
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800202
203class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
204 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 (This is a helper class instead of making StreamReader itself a
207 Protocol subclass, because the StreamReader has other potential
208 uses, and to prevent the user of the StreamReader to accidentally
209 call inappropriate methods of the protocol.)
210 """
211
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700212 _source_traceback = None
213
214 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
215 super().__init__(loop=loop)
216 if stream_reader is not None:
217 self._stream_reader_wr = weakref.ref(stream_reader,
218 self._on_reader_gc)
219 self._source_traceback = stream_reader._source_traceback
220 else:
221 self._stream_reader_wr = None
222 if client_connected_cb is not None:
223 # This is a stream created by the `create_server()` function.
224 # Keep a strong reference to the reader until a connection
225 # is established.
226 self._strong_reader = stream_reader
227 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800228 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700229 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800230 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400231 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200232 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700234 def _on_reader_gc(self, wr):
235 transport = self._transport
236 if transport is not None:
237 # connection_made was called
238 context = {
239 'message': ('An open stream object is being garbage '
240 'collected; call "stream.close()" explicitly.')
241 }
242 if self._source_traceback:
243 context['source_traceback'] = self._source_traceback
244 self._loop.call_exception_handler(context)
245 transport.abort()
246 else:
247 self._reject_connection = True
248 self._stream_reader_wr = None
249
250 @property
251 def _stream_reader(self):
252 if self._stream_reader_wr is None:
253 return None
254 return self._stream_reader_wr()
255
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 def connection_made(self, transport):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700257 if self._reject_connection:
258 context = {
259 'message': ('An open stream was garbage collected prior to '
260 'establishing network connection; '
261 'call "stream.close()" explicitly.')
262 }
263 if self._source_traceback:
264 context['source_traceback'] = self._source_traceback
265 self._loop.call_exception_handler(context)
266 transport.abort()
267 return
268 self._transport = transport
269 reader = self._stream_reader
270 if reader is not None:
271 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400272 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800273 if self._client_connected_cb is not None:
274 self._stream_writer = StreamWriter(transport, self,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700275 reader,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300276 self._loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700277 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800278 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200279 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200280 self._loop.create_task(res)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700281 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def connection_lost(self, exc):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700284 reader = self._stream_reader
285 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400286 if exc is None:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700287 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400288 else:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700289 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200290 if not self._closed.done():
291 if exc is None:
292 self._closed.set_result(None)
293 else:
294 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800295 super().connection_lost(exc)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700296 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400297 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700298 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299
300 def data_received(self, data):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700301 reader = self._stream_reader
302 if reader is not None:
303 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304
305 def eof_received(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700306 reader = self._stream_reader
307 if reader is not None:
308 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400309 if self._over_ssl:
310 # Prevent a warning in SSLProtocol.eof_received:
311 # "returning true from eof_received()
312 # has no effect when using ssl"
313 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200314 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700315
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700316 def _get_close_waiter(self, stream):
317 return self._closed
318
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200319 def __del__(self):
320 # Prevent reports about unhandled exceptions.
321 # Better than self._closed._log_traceback = False hack
322 closed = self._closed
323 if closed.done() and not closed.cancelled():
324 closed.exception()
325
Guido van Rossum355491d2013-10-18 15:17:11 -0700326
327class StreamWriter:
328 """Wraps a Transport.
329
330 This exposes write(), writelines(), [can_]write_eof(),
331 get_extra_info() and close(). It adds drain() which returns an
332 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800333 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700334 directly.
335 """
336
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300337 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700338 self._transport = transport
339 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000340 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200341 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700342 self._reader = reader
343 self._loop = loop
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700344 self._complete_fut = self._loop.create_future()
345 self._complete_fut.set_result(None)
Guido van Rossum355491d2013-10-18 15:17:11 -0700346
Victor Stinneracdb7822014-07-14 18:33:40 +0200347 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500348 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200349 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500350 info.append(f'reader={self._reader!r}')
351 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200352
Guido van Rossum355491d2013-10-18 15:17:11 -0700353 @property
354 def transport(self):
355 return self._transport
356
357 def write(self, data):
358 self._transport.write(data)
359
360 def writelines(self, data):
361 self._transport.writelines(data)
362
363 def write_eof(self):
364 return self._transport.write_eof()
365
366 def can_write_eof(self):
367 return self._transport.can_write_eof()
368
Victor Stinner406204c2015-01-15 21:50:19 +0100369 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300370 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100371
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200372 def is_closing(self):
373 return self._transport.is_closing()
374
375 async def wait_closed(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700376 await self._protocol._get_close_waiter(self)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200377
Guido van Rossum355491d2013-10-18 15:17:11 -0700378 def get_extra_info(self, name, default=None):
379 return self._transport.get_extra_info(name, default)
380
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200381 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200382 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700383
384 The intended use is to write
385
386 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200387 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700388 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200389 if self._reader is not None:
390 exc = self._reader.exception()
391 if exc is not None:
392 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200393 if self._transport.is_closing():
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700394 # Wait for protocol.connection_lost() call
395 # Raise connection closing error if any,
396 # ConnectionResetError otherwise
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300397 # Yield to the event loop so connection_lost() may be
398 # called. Without this, _drain_helper() would return
399 # immediately, and code that calls
400 # write(...); await drain()
401 # in a loop would never call connection_lost(), so it
402 # would not see an error when the socket is closed.
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700403 await sleep(0)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200404 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406
407class StreamReader:
408
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700409 _source_traceback = None
410
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300411 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 # The line length limit is a security feature;
413 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500414
415 if limit <= 0:
416 raise ValueError('Limit cannot be <= 0')
417
Guido van Rossum355491d2013-10-18 15:17:11 -0700418 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100420 self._loop = events.get_event_loop()
421 else:
422 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500423 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100424 self._eof = False # Whether we're done.
425 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 self._exception = None
427 self._transport = None
428 self._paused = False
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700429 if self._loop.get_debug():
430 self._source_traceback = format_helpers.extract_stack(
431 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200433 def __repr__(self):
434 info = ['StreamReader']
435 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500436 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200437 if self._eof:
438 info.append('eof')
439 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500440 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200441 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500442 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200443 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500444 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200445 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500446 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200447 if self._paused:
448 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500449 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200450
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 def exception(self):
452 return self._exception
453
454 def set_exception(self, exc):
455 self._exception = exc
456
Guido van Rossum355491d2013-10-18 15:17:11 -0700457 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700459 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 if not waiter.cancelled():
461 waiter.set_exception(exc)
462
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100463 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500464 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100465 waiter = self._waiter
466 if waiter is not None:
467 self._waiter = None
468 if not waiter.cancelled():
469 waiter.set_result(None)
470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def set_transport(self, transport):
472 assert self._transport is None, 'Transport already set'
473 self._transport = transport
474
475 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500476 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700478 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
480 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700481 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100482 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
Yury Selivanovf0020f52014-02-06 00:14:30 -0500484 def at_eof(self):
485 """Return True if the buffer is empty and 'feed_eof' was called."""
486 return self._eof and not self._buffer
487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500489 assert not self._eof, 'feed_data after feed_eof'
490
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 if not data:
492 return
493
Yury Selivanove694c972014-02-05 18:11:13 -0500494 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100495 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496
497 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400498 not self._paused and
499 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700501 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 except NotImplementedError:
503 # The transport can't be paused.
504 # We'll just have to buffer all data.
505 # Forget the transport so we don't keep trying.
506 self._transport = None
507 else:
508 self._paused = True
509
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200510 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500511 """Wait until feed_data() or feed_eof() is called.
512
513 If stream was paused, automatically resume it.
514 """
Victor Stinner183e3472014-01-23 17:40:03 +0100515 # StreamReader uses a future to link the protocol feed_data() method
516 # to a read coroutine. Running two read coroutines at the same time
517 # would have an unexpected behaviour. It would not possible to know
518 # which coroutine would get the next data.
519 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500520 raise RuntimeError(
521 f'{func_name}() called while another coroutine is '
522 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100523
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500524 assert not self._eof, '_wait_for_data after EOF'
525
526 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400527 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500528 if self._paused:
529 self._paused = False
530 self._transport.resume_reading()
531
Yury Selivanov7661db62016-05-16 15:38:39 -0400532 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100533 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200534 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100535 finally:
536 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100537
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200538 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500539 """Read chunk of data from the stream until newline (b'\n') is found.
540
541 On success, return chunk that ends with newline. If only partial
542 line can be read due to EOF, return incomplete line without
543 terminating newline. When EOF was reached while no bytes read, empty
544 bytes object is returned.
545
546 If limit is reached, ValueError will be raised. In that case, if
547 newline was found, complete line including newline will be removed
548 from internal buffer. Else, internal buffer will be cleared. Limit is
549 compared against part of the line without newline.
550
551 If stream was paused, this function will automatically resume it if
552 needed.
553 """
554 sep = b'\n'
555 seplen = len(sep)
556 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200557 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700558 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500559 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700560 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561 if self._buffer.startswith(sep, e.consumed):
562 del self._buffer[:e.consumed + seplen]
563 else:
564 self._buffer.clear()
565 self._maybe_resume_transport()
566 raise ValueError(e.args[0])
567 return line
568
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200569 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400570 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500571
Yury Selivanovb4617912016-05-16 16:32:38 -0400572 On success, the data and separator will be removed from the
573 internal buffer (consumed). Returned data will include the
574 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500575
Yury Selivanovb4617912016-05-16 16:32:38 -0400576 Configured stream limit is used to check result. Limit sets the
577 maximal length of data that can be returned, not counting the
578 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500579
Yury Selivanovb4617912016-05-16 16:32:38 -0400580 If an EOF occurs and the complete separator is still not found,
581 an IncompleteReadError exception will be raised, and the internal
582 buffer will be reset. The IncompleteReadError.partial attribute
583 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500584
Yury Selivanovb4617912016-05-16 16:32:38 -0400585 If the data cannot be read because of over limit, a
586 LimitOverrunError exception will be raised, and the data
587 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500588 """
589 seplen = len(separator)
590 if seplen == 0:
591 raise ValueError('Separator should be at least one-byte string')
592
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 if self._exception is not None:
594 raise self._exception
595
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500596 # Consume whole buffer except last bytes, which length is
597 # one less than seplen. Let's check corner cases with
598 # separator='SEPARATOR':
599 # * we have received almost complete separator (without last
600 # byte). i.e buffer='some textSEPARATO'. In this case we
601 # can safely consume len(separator) - 1 bytes.
602 # * last byte of buffer is first byte of separator, i.e.
603 # buffer='abcdefghijklmnopqrS'. We may safely consume
604 # everything except that last byte, but this require to
605 # analyze bytes of buffer that match partial separator.
606 # This is slow and/or require FSM. For this case our
607 # implementation is not optimal, since require rescanning
608 # of data that is known to not belong to separator. In
609 # real world, separator will not be so long to notice
610 # performance problems. Even when reading MIME-encoded
611 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612
Yury Selivanovb4617912016-05-16 16:32:38 -0400613 # `offset` is the number of bytes from the beginning of the buffer
614 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500615 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500617 # Loop until we find `separator` in the buffer, exceed the buffer size,
618 # or an EOF has happened.
619 while True:
620 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500622 # Check if we now have enough data in the buffer for `separator` to
623 # fit.
624 if buflen - offset >= seplen:
625 isep = self._buffer.find(separator, offset)
626
627 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400628 # `separator` is in the buffer. `isep` will be used later
629 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500630 break
631
632 # see upper comment for explanation.
633 offset = buflen + 1 - seplen
634 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700635 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400636 'Separator is not found, and chunk exceed the limit',
637 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500638
639 # Complete message (with full separator) may be present in buffer
640 # even when EOF flag is set. This may happen when the last chunk
641 # adds data which makes separator be found. That's why we check for
642 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700643 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 chunk = bytes(self._buffer)
645 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700646 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500648 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200649 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500651 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700652 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400653 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500654
655 chunk = self._buffer[:isep + seplen]
656 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500658 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200660 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500661 """Read up to `n` bytes from the stream.
662
663 If n is not provided, or set to -1, read until EOF and return all read
664 bytes. If the EOF was received and the internal buffer is empty, return
665 an empty bytes object.
666
Martin Panter0be894b2016-09-07 12:03:06 +0000667 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500668
669 If n is positive, this function try to read `n` bytes, and may return
670 less or equal bytes than requested, but at least one byte. If EOF was
671 received before any byte is read, this function returns empty byte
672 object.
673
Yury Selivanovb4617912016-05-16 16:32:38 -0400674 Returned value is not limited with limit, configured at stream
675 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500676
677 If stream was paused, this function will automatically resume it if
678 needed.
679 """
680
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 if self._exception is not None:
682 raise self._exception
683
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500684 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 return b''
686
687 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700688 # This used to just loop creating a new waiter hoping to
689 # collect everything in self._buffer, but that would
690 # deadlock if the subprocess sends more than self.limit
691 # bytes. So just call self.read(self._limit) until EOF.
692 blocks = []
693 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200694 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700695 if not block:
696 break
697 blocks.append(block)
698 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500700 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200701 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500702
703 # This will work right even if buffer is less than n bytes
704 data = bytes(self._buffer[:n])
705 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706
Yury Selivanove694c972014-02-05 18:11:13 -0500707 self._maybe_resume_transport()
708 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200710 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500711 """Read exactly `n` bytes.
712
Yury Selivanovb4617912016-05-16 16:32:38 -0400713 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
714 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500715 contain the partial read bytes.
716
717 if n is zero, return empty bytes object.
718
Yury Selivanovb4617912016-05-16 16:32:38 -0400719 Returned value is not limited with limit, configured at stream
720 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500721
722 If stream was paused, this function will automatically resume it if
723 needed.
724 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500725 if n < 0:
726 raise ValueError('readexactly size can not be less than zero')
727
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700728 if self._exception is not None:
729 raise self._exception
730
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500731 if n == 0:
732 return b''
733
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400734 while len(self._buffer) < n:
735 if self._eof:
736 incomplete = bytes(self._buffer)
737 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700738 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200740 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700741
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400742 if len(self._buffer) == n:
743 data = bytes(self._buffer)
744 self._buffer.clear()
745 else:
746 data = bytes(self._buffer[:n])
747 del self._buffer[:n]
748 self._maybe_resume_transport()
749 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400750
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400751 def __aiter__(self):
752 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400753
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200754 async def __anext__(self):
755 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400756 if val == b'':
757 raise StopAsyncIteration
758 return val