blob: 3c80bb8892590550acda19dc54b040093dbe1114 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Yury Selivanov6758e6e2019-09-29 21:59:55 -07002 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -04007import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07008import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05009
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6758e6e2019-09-29 21:59:55 -070011 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080012
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070016from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Yury Selivanov6758e6e2019-09-29 21:59:55 -070019from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Victor Stinner9551f772018-05-29 16:02:07 +020022_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025async def open_connection(host=None, port=None, *,
26 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 """A wrapper for create_connection() returning a (reader, writer) pair.
28
29 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010030 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32 The arguments are all the usual arguments to create_connection()
33 except protocol_factory; most common are positional host and port,
34 with various optional keyword arguments following.
35
36 Additional optional keyword arguments are loop (to set the event loop
37 instance to use) and limit (to set the buffer limit passed to the
38 StreamReader).
39
40 (If you want to customize the StreamReader and/or
41 StreamReaderProtocol classes, just copy the code -- there's
42 really nothing special here except some convenience.)
43 """
44 if loop is None:
45 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -030046 else:
47 warnings.warn("The loop argument is deprecated since Python 3.8, "
48 "and scheduled for removal in Python 3.10.",
49 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030050 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -070051 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020052 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030054 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -070055 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
57
Andrew Svetlov5f841b52017-12-09 00:23:48 +020058async def start_server(client_connected_cb, host=None, port=None, *,
59 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080060 """Start a socket server, call back for each client connected.
61
62 The first parameter, `client_connected_cb`, takes two parameters:
63 client_reader, client_writer. client_reader is a StreamReader
64 object, while client_writer is a StreamWriter object. This
65 parameter can either be a plain callback function or a coroutine;
66 if it is a coroutine, it will be automatically converted into a
67 Task.
68
69 The rest of the arguments are all the usual arguments to
70 loop.create_server() except protocol_factory; most common are
71 positional host and port, with various optional keyword arguments
72 following. The return value is the same as loop.create_server().
73
74 Additional optional keyword arguments are loop (to set the event loop
75 instance to use) and limit (to set the buffer limit passed to the
76 StreamReader).
77
78 The return value is the same as loop.create_server(), i.e. a
79 Server object which can be used to stop the service.
80 """
81 if loop is None:
82 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -030083 else:
84 warnings.warn("The loop argument is deprecated since Python 3.8, "
85 "and scheduled for removal in Python 3.10.",
86 DeprecationWarning, stacklevel=2)
Guido van Rossum1540b162013-11-19 11:43:38 -080087
88 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +030089 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080090 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -070091 loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080092 return protocol
93
Andrew Svetlov5f841b52017-12-09 00:23:48 +020094 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080095
96
Yury Selivanovb057c522014-02-18 12:15:06 -050097if hasattr(socket, 'AF_UNIX'):
98 # UNIX Domain Sockets are supported on this platform
99
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200100 async def open_unix_connection(path=None, *,
101 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500102 """Similar to `open_connection` but works with UNIX Domain Sockets."""
103 if loop is None:
104 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300105 else:
106 warnings.warn("The loop argument is deprecated since Python 3.8, "
107 "and scheduled for removal in Python 3.10.",
108 DeprecationWarning, stacklevel=2)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300109 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700110 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200111 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500112 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300113 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500114 return reader, writer
115
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200116 async def start_unix_server(client_connected_cb, path=None, *,
117 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500118 """Similar to `start_server` but works with UNIX Domain Sockets."""
119 if loop is None:
120 loop = events.get_event_loop()
Emmanuel Arias6d64a8f2019-06-05 02:45:53 -0300121 else:
122 warnings.warn("The loop argument is deprecated since Python 3.8, "
123 "and scheduled for removal in Python 3.10.",
124 DeprecationWarning, stacklevel=2)
Yury Selivanovb057c522014-02-18 12:15:06 -0500125
126 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300127 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500128 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700129 loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500130 return protocol
131
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200132 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500133
134
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800135class FlowControlMixin(protocols.Protocol):
136 """Reusable flow control logic for StreamWriter.drain().
137
138 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800139 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800140 these it must call the super methods.
141
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200142 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800143 """
144
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700145 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100146 if loop is None:
147 self._loop = events.get_event_loop()
148 else:
149 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800150 self._paused = False
151 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200152 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153
154 def pause_writing(self):
155 assert not self._paused
156 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200157 if self._loop.get_debug():
158 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800159
160 def resume_writing(self):
161 assert self._paused
162 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200163 if self._loop.get_debug():
164 logger.debug("%r resumes writing", self)
165
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166 waiter = self._drain_waiter
167 if waiter is not None:
168 self._drain_waiter = None
169 if not waiter.done():
170 waiter.set_result(None)
171
172 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200173 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800174 # Wake up the writer if currently paused.
175 if not self._paused:
176 return
177 waiter = self._drain_waiter
178 if waiter is None:
179 return
180 self._drain_waiter = None
181 if waiter.done():
182 return
183 if exc is None:
184 waiter.set_result(None)
185 else:
186 waiter.set_exception(exc)
187
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200188 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200189 if self._connection_lost:
190 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800191 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800193 waiter = self._drain_waiter
194 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400195 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800198
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400199 def _get_close_waiter(self, stream):
200 raise NotImplementedError
201
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800202
203class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
204 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 (This is a helper class instead of making StreamReader itself a
207 Protocol subclass, because the StreamReader has other potential
208 uses, and to prevent the user of the StreamReader to accidentally
209 call inappropriate methods of the protocol.)
210 """
211
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700212 _source_traceback = None
213
214 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
215 super().__init__(loop=loop)
216 if stream_reader is not None:
Andrew Svetlov7ddcd0c2019-12-07 13:22:00 +0200217 self._stream_reader_wr = weakref.ref(stream_reader)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700218 self._source_traceback = stream_reader._source_traceback
219 else:
220 self._stream_reader_wr = None
221 if client_connected_cb is not None:
222 # This is a stream created by the `create_server()` function.
223 # Keep a strong reference to the reader until a connection
224 # is established.
225 self._strong_reader = stream_reader
226 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700228 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800229 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400230 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200231 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700233 @property
234 def _stream_reader(self):
235 if self._stream_reader_wr is None:
236 return None
237 return self._stream_reader_wr()
238
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 def connection_made(self, transport):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700240 if self._reject_connection:
241 context = {
242 'message': ('An open stream was garbage collected prior to '
243 'establishing network connection; '
244 'call "stream.close()" explicitly.')
245 }
246 if self._source_traceback:
247 context['source_traceback'] = self._source_traceback
248 self._loop.call_exception_handler(context)
249 transport.abort()
250 return
251 self._transport = transport
252 reader = self._stream_reader
253 if reader is not None:
254 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400255 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800256 if self._client_connected_cb is not None:
257 self._stream_writer = StreamWriter(transport, self,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700258 reader,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300259 self._loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700260 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800261 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200262 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200263 self._loop.create_task(res)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700264 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
266 def connection_lost(self, exc):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700267 reader = self._stream_reader
268 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400269 if exc is None:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700270 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400271 else:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700272 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200273 if not self._closed.done():
274 if exc is None:
275 self._closed.set_result(None)
276 else:
277 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800278 super().connection_lost(exc)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700279 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400280 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700281 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def data_received(self, data):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700284 reader = self._stream_reader
285 if reader is not None:
286 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288 def eof_received(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700289 reader = self._stream_reader
290 if reader is not None:
291 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400292 if self._over_ssl:
293 # Prevent a warning in SSLProtocol.eof_received:
294 # "returning true from eof_received()
295 # has no effect when using ssl"
296 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200297 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700298
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700299 def _get_close_waiter(self, stream):
300 return self._closed
301
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200302 def __del__(self):
303 # Prevent reports about unhandled exceptions.
304 # Better than self._closed._log_traceback = False hack
305 closed = self._closed
306 if closed.done() and not closed.cancelled():
307 closed.exception()
308
Guido van Rossum355491d2013-10-18 15:17:11 -0700309
310class StreamWriter:
311 """Wraps a Transport.
312
313 This exposes write(), writelines(), [can_]write_eof(),
314 get_extra_info() and close(). It adds drain() which returns an
315 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800316 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700317 directly.
318 """
319
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300320 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700321 self._transport = transport
322 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000323 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200324 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700325 self._reader = reader
326 self._loop = loop
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700327 self._complete_fut = self._loop.create_future()
328 self._complete_fut.set_result(None)
Guido van Rossum355491d2013-10-18 15:17:11 -0700329
Victor Stinneracdb7822014-07-14 18:33:40 +0200330 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500331 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200332 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500333 info.append(f'reader={self._reader!r}')
334 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200335
Guido van Rossum355491d2013-10-18 15:17:11 -0700336 @property
337 def transport(self):
338 return self._transport
339
340 def write(self, data):
341 self._transport.write(data)
342
343 def writelines(self, data):
344 self._transport.writelines(data)
345
346 def write_eof(self):
347 return self._transport.write_eof()
348
349 def can_write_eof(self):
350 return self._transport.can_write_eof()
351
Victor Stinner406204c2015-01-15 21:50:19 +0100352 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300353 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100354
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200355 def is_closing(self):
356 return self._transport.is_closing()
357
358 async def wait_closed(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700359 await self._protocol._get_close_waiter(self)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200360
Guido van Rossum355491d2013-10-18 15:17:11 -0700361 def get_extra_info(self, name, default=None):
362 return self._transport.get_extra_info(name, default)
363
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200364 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200365 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700366
367 The intended use is to write
368
369 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200370 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700371 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200372 if self._reader is not None:
373 exc = self._reader.exception()
374 if exc is not None:
375 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200376 if self._transport.is_closing():
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700377 # Wait for protocol.connection_lost() call
378 # Raise connection closing error if any,
379 # ConnectionResetError otherwise
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300380 # Yield to the event loop so connection_lost() may be
381 # called. Without this, _drain_helper() would return
382 # immediately, and code that calls
383 # write(...); await drain()
384 # in a loop would never call connection_lost(), so it
385 # would not see an error when the socket is closed.
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700386 await sleep(0)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200387 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
389
390class StreamReader:
391
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700392 _source_traceback = None
393
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300394 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 # The line length limit is a security feature;
396 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500397
398 if limit <= 0:
399 raise ValueError('Limit cannot be <= 0')
400
Guido van Rossum355491d2013-10-18 15:17:11 -0700401 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100403 self._loop = events.get_event_loop()
404 else:
405 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500406 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100407 self._eof = False # Whether we're done.
408 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 self._exception = None
410 self._transport = None
411 self._paused = False
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700412 if self._loop.get_debug():
413 self._source_traceback = format_helpers.extract_stack(
414 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200416 def __repr__(self):
417 info = ['StreamReader']
418 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500419 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200420 if self._eof:
421 info.append('eof')
422 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500423 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200424 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500425 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200426 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500427 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200428 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500429 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200430 if self._paused:
431 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500432 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200433
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def exception(self):
435 return self._exception
436
437 def set_exception(self, exc):
438 self._exception = exc
439
Guido van Rossum355491d2013-10-18 15:17:11 -0700440 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700442 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 if not waiter.cancelled():
444 waiter.set_exception(exc)
445
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100446 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500447 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100448 waiter = self._waiter
449 if waiter is not None:
450 self._waiter = None
451 if not waiter.cancelled():
452 waiter.set_result(None)
453
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 def set_transport(self, transport):
455 assert self._transport is None, 'Transport already set'
456 self._transport = transport
457
458 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500459 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700461 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462
463 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700464 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100465 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
Yury Selivanovf0020f52014-02-06 00:14:30 -0500467 def at_eof(self):
468 """Return True if the buffer is empty and 'feed_eof' was called."""
469 return self._eof and not self._buffer
470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500472 assert not self._eof, 'feed_data after feed_eof'
473
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 if not data:
475 return
476
Yury Selivanove694c972014-02-05 18:11:13 -0500477 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100478 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
480 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400481 not self._paused and
482 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700484 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 except NotImplementedError:
486 # The transport can't be paused.
487 # We'll just have to buffer all data.
488 # Forget the transport so we don't keep trying.
489 self._transport = None
490 else:
491 self._paused = True
492
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200493 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500494 """Wait until feed_data() or feed_eof() is called.
495
496 If stream was paused, automatically resume it.
497 """
Victor Stinner183e3472014-01-23 17:40:03 +0100498 # StreamReader uses a future to link the protocol feed_data() method
499 # to a read coroutine. Running two read coroutines at the same time
500 # would have an unexpected behaviour. It would not possible to know
501 # which coroutine would get the next data.
502 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500503 raise RuntimeError(
504 f'{func_name}() called while another coroutine is '
505 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100506
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500507 assert not self._eof, '_wait_for_data after EOF'
508
509 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400510 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500511 if self._paused:
512 self._paused = False
513 self._transport.resume_reading()
514
Yury Selivanov7661db62016-05-16 15:38:39 -0400515 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100516 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200517 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100518 finally:
519 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100520
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200521 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500522 """Read chunk of data from the stream until newline (b'\n') is found.
523
524 On success, return chunk that ends with newline. If only partial
525 line can be read due to EOF, return incomplete line without
526 terminating newline. When EOF was reached while no bytes read, empty
527 bytes object is returned.
528
529 If limit is reached, ValueError will be raised. In that case, if
530 newline was found, complete line including newline will be removed
531 from internal buffer. Else, internal buffer will be cleared. Limit is
532 compared against part of the line without newline.
533
534 If stream was paused, this function will automatically resume it if
535 needed.
536 """
537 sep = b'\n'
538 seplen = len(sep)
539 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200540 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700541 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500542 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700543 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500544 if self._buffer.startswith(sep, e.consumed):
545 del self._buffer[:e.consumed + seplen]
546 else:
547 self._buffer.clear()
548 self._maybe_resume_transport()
549 raise ValueError(e.args[0])
550 return line
551
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200552 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400553 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500554
Yury Selivanovb4617912016-05-16 16:32:38 -0400555 On success, the data and separator will be removed from the
556 internal buffer (consumed). Returned data will include the
557 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500558
Yury Selivanovb4617912016-05-16 16:32:38 -0400559 Configured stream limit is used to check result. Limit sets the
560 maximal length of data that can be returned, not counting the
561 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500562
Yury Selivanovb4617912016-05-16 16:32:38 -0400563 If an EOF occurs and the complete separator is still not found,
564 an IncompleteReadError exception will be raised, and the internal
565 buffer will be reset. The IncompleteReadError.partial attribute
566 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500567
Yury Selivanovb4617912016-05-16 16:32:38 -0400568 If the data cannot be read because of over limit, a
569 LimitOverrunError exception will be raised, and the data
570 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500571 """
572 seplen = len(separator)
573 if seplen == 0:
574 raise ValueError('Separator should be at least one-byte string')
575
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 if self._exception is not None:
577 raise self._exception
578
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500579 # Consume whole buffer except last bytes, which length is
580 # one less than seplen. Let's check corner cases with
581 # separator='SEPARATOR':
582 # * we have received almost complete separator (without last
583 # byte). i.e buffer='some textSEPARATO'. In this case we
584 # can safely consume len(separator) - 1 bytes.
585 # * last byte of buffer is first byte of separator, i.e.
586 # buffer='abcdefghijklmnopqrS'. We may safely consume
587 # everything except that last byte, but this require to
588 # analyze bytes of buffer that match partial separator.
589 # This is slow and/or require FSM. For this case our
590 # implementation is not optimal, since require rescanning
591 # of data that is known to not belong to separator. In
592 # real world, separator will not be so long to notice
593 # performance problems. Even when reading MIME-encoded
594 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595
Yury Selivanovb4617912016-05-16 16:32:38 -0400596 # `offset` is the number of bytes from the beginning of the buffer
597 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500598 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500600 # Loop until we find `separator` in the buffer, exceed the buffer size,
601 # or an EOF has happened.
602 while True:
603 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500605 # Check if we now have enough data in the buffer for `separator` to
606 # fit.
607 if buflen - offset >= seplen:
608 isep = self._buffer.find(separator, offset)
609
610 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400611 # `separator` is in the buffer. `isep` will be used later
612 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500613 break
614
615 # see upper comment for explanation.
616 offset = buflen + 1 - seplen
617 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700618 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400619 'Separator is not found, and chunk exceed the limit',
620 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500621
622 # Complete message (with full separator) may be present in buffer
623 # even when EOF flag is set. This may happen when the last chunk
624 # adds data which makes separator be found. That's why we check for
625 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700626 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500627 chunk = bytes(self._buffer)
628 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700629 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500631 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200632 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500634 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700635 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400636 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500637
638 chunk = self._buffer[:isep + seplen]
639 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700640 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500641 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200643 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 """Read up to `n` bytes from the stream.
645
646 If n is not provided, or set to -1, read until EOF and return all read
647 bytes. If the EOF was received and the internal buffer is empty, return
648 an empty bytes object.
649
Martin Panter0be894b2016-09-07 12:03:06 +0000650 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500651
652 If n is positive, this function try to read `n` bytes, and may return
653 less or equal bytes than requested, but at least one byte. If EOF was
654 received before any byte is read, this function returns empty byte
655 object.
656
Yury Selivanovb4617912016-05-16 16:32:38 -0400657 Returned value is not limited with limit, configured at stream
658 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500659
660 If stream was paused, this function will automatically resume it if
661 needed.
662 """
663
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 if self._exception is not None:
665 raise self._exception
666
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500667 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 return b''
669
670 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700671 # This used to just loop creating a new waiter hoping to
672 # collect everything in self._buffer, but that would
673 # deadlock if the subprocess sends more than self.limit
674 # bytes. So just call self.read(self._limit) until EOF.
675 blocks = []
676 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200677 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700678 if not block:
679 break
680 blocks.append(block)
681 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500683 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200684 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500685
686 # This will work right even if buffer is less than n bytes
687 data = bytes(self._buffer[:n])
688 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689
Yury Selivanove694c972014-02-05 18:11:13 -0500690 self._maybe_resume_transport()
691 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200693 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500694 """Read exactly `n` bytes.
695
Yury Selivanovb4617912016-05-16 16:32:38 -0400696 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
697 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500698 contain the partial read bytes.
699
700 if n is zero, return empty bytes object.
701
Yury Selivanovb4617912016-05-16 16:32:38 -0400702 Returned value is not limited with limit, configured at stream
703 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500704
705 If stream was paused, this function will automatically resume it if
706 needed.
707 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500708 if n < 0:
709 raise ValueError('readexactly size can not be less than zero')
710
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 if self._exception is not None:
712 raise self._exception
713
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500714 if n == 0:
715 return b''
716
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400717 while len(self._buffer) < n:
718 if self._eof:
719 incomplete = bytes(self._buffer)
720 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700721 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200723 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400725 if len(self._buffer) == n:
726 data = bytes(self._buffer)
727 self._buffer.clear()
728 else:
729 data = bytes(self._buffer[:n])
730 del self._buffer[:n]
731 self._maybe_resume_transport()
732 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400733
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400734 def __aiter__(self):
735 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400736
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200737 async def __anext__(self):
738 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400739 if val == b'':
740 raise StopAsyncIteration
741 return val