blob: 080d8a62cde1e219fa0866f8c3e7aa1751d0bc9b [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
Yury Selivanov6758e6e2019-09-29 21:59:55 -07002 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -04007import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07008import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05009
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6758e6e2019-09-29 21:59:55 -070011 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080012
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070016from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Yury Selivanov6758e6e2019-09-29 21:59:55 -070019from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Victor Stinner9551f772018-05-29 16:02:07 +020022_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025async def open_connection(host=None, port=None, *,
Yurii Karabasf533cb82020-11-26 09:36:37 +020026 limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 """A wrapper for create_connection() returning a (reader, writer) pair.
28
29 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010030 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32 The arguments are all the usual arguments to create_connection()
33 except protocol_factory; most common are positional host and port,
34 with various optional keyword arguments following.
35
36 Additional optional keyword arguments are loop (to set the event loop
37 instance to use) and limit (to set the buffer limit passed to the
38 StreamReader).
39
40 (If you want to customize the StreamReader and/or
41 StreamReaderProtocol classes, just copy the code -- there's
42 really nothing special here except some convenience.)
43 """
Yurii Karabasf533cb82020-11-26 09:36:37 +020044 loop = events.get_running_loop()
Andrew Svetlov23b4b692019-05-27 22:56:22 +030045 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -070046 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020047 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 lambda: protocol, host, port, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030049 writer = StreamWriter(transport, protocol, reader, loop)
Guido van Rossum355491d2013-10-18 15:17:11 -070050 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52
Andrew Svetlov5f841b52017-12-09 00:23:48 +020053async def start_server(client_connected_cb, host=None, port=None, *,
Yurii Karabasf533cb82020-11-26 09:36:37 +020054 limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080055 """Start a socket server, call back for each client connected.
56
57 The first parameter, `client_connected_cb`, takes two parameters:
58 client_reader, client_writer. client_reader is a StreamReader
59 object, while client_writer is a StreamWriter object. This
60 parameter can either be a plain callback function or a coroutine;
61 if it is a coroutine, it will be automatically converted into a
62 Task.
63
64 The rest of the arguments are all the usual arguments to
65 loop.create_server() except protocol_factory; most common are
66 positional host and port, with various optional keyword arguments
67 following. The return value is the same as loop.create_server().
68
69 Additional optional keyword arguments are loop (to set the event loop
70 instance to use) and limit (to set the buffer limit passed to the
71 StreamReader).
72
73 The return value is the same as loop.create_server(), i.e. a
74 Server object which can be used to stop the service.
75 """
Yurii Karabasf533cb82020-11-26 09:36:37 +020076 loop = events.get_running_loop()
Guido van Rossum1540b162013-11-19 11:43:38 -080077
78 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +030079 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080080 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -070081 loop=loop)
Guido van Rossum1540b162013-11-19 11:43:38 -080082 return protocol
83
Andrew Svetlov5f841b52017-12-09 00:23:48 +020084 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080085
86
Yury Selivanovb057c522014-02-18 12:15:06 -050087if hasattr(socket, 'AF_UNIX'):
88 # UNIX Domain Sockets are supported on this platform
89
Andrew Svetlov5f841b52017-12-09 00:23:48 +020090 async def open_unix_connection(path=None, *,
Yurii Karabasf533cb82020-11-26 09:36:37 +020091 limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -050092 """Similar to `open_connection` but works with UNIX Domain Sockets."""
Yurii Karabasf533cb82020-11-26 09:36:37 +020093 loop = events.get_running_loop()
94
Andrew Svetlov23b4b692019-05-27 22:56:22 +030095 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -070096 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020097 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -050098 lambda: protocol, path, **kwds)
Andrew Svetlov23b4b692019-05-27 22:56:22 +030099 writer = StreamWriter(transport, protocol, reader, loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500100 return reader, writer
101
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200102 async def start_unix_server(client_connected_cb, path=None, *,
Yurii Karabasf533cb82020-11-26 09:36:37 +0200103 limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500104 """Similar to `start_server` but works with UNIX Domain Sockets."""
Yurii Karabasf533cb82020-11-26 09:36:37 +0200105 loop = events.get_running_loop()
Yury Selivanovb057c522014-02-18 12:15:06 -0500106
107 def factory():
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300108 reader = StreamReader(limit=limit, loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500109 protocol = StreamReaderProtocol(reader, client_connected_cb,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700110 loop=loop)
Yury Selivanovb057c522014-02-18 12:15:06 -0500111 return protocol
112
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200113 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500114
115
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800116class FlowControlMixin(protocols.Protocol):
117 """Reusable flow control logic for StreamWriter.drain().
118
119 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800120 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800121 these it must call the super methods.
122
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200123 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800124 """
125
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700126 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100127 if loop is None:
Serhiy Storchaka172c0f22021-04-25 13:40:44 +0300128 self._loop = events._get_event_loop(stacklevel=4)
Victor Stinner70db9e42015-01-09 21:32:05 +0100129 else:
130 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800131 self._paused = False
132 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200133 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800134
135 def pause_writing(self):
136 assert not self._paused
137 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200138 if self._loop.get_debug():
139 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800140
141 def resume_writing(self):
142 assert self._paused
143 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200144 if self._loop.get_debug():
145 logger.debug("%r resumes writing", self)
146
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800147 waiter = self._drain_waiter
148 if waiter is not None:
149 self._drain_waiter = None
150 if not waiter.done():
151 waiter.set_result(None)
152
153 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200154 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800155 # Wake up the writer if currently paused.
156 if not self._paused:
157 return
158 waiter = self._drain_waiter
159 if waiter is None:
160 return
161 self._drain_waiter = None
162 if waiter.done():
163 return
164 if exc is None:
165 waiter.set_result(None)
166 else:
167 waiter.set_exception(exc)
168
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200169 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200170 if self._connection_lost:
171 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800172 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200173 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800174 waiter = self._drain_waiter
175 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400176 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200178 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800179
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400180 def _get_close_waiter(self, stream):
181 raise NotImplementedError
182
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800183
184class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
185 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
187 (This is a helper class instead of making StreamReader itself a
188 Protocol subclass, because the StreamReader has other potential
189 uses, and to prevent the user of the StreamReader to accidentally
190 call inappropriate methods of the protocol.)
191 """
192
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700193 _source_traceback = None
194
195 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
196 super().__init__(loop=loop)
197 if stream_reader is not None:
Andrew Svetlov7ddcd0c2019-12-07 13:22:00 +0200198 self._stream_reader_wr = weakref.ref(stream_reader)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700199 self._source_traceback = stream_reader._source_traceback
200 else:
201 self._stream_reader_wr = None
202 if client_connected_cb is not None:
203 # This is a stream created by the `create_server()` function.
204 # Keep a strong reference to the reader until a connection
205 # is established.
206 self._strong_reader = stream_reader
207 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800208 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700209 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800210 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400211 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200212 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700214 @property
215 def _stream_reader(self):
216 if self._stream_reader_wr is None:
217 return None
218 return self._stream_reader_wr()
219
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 def connection_made(self, transport):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700221 if self._reject_connection:
222 context = {
223 'message': ('An open stream was garbage collected prior to '
224 'establishing network connection; '
225 'call "stream.close()" explicitly.')
226 }
227 if self._source_traceback:
228 context['source_traceback'] = self._source_traceback
229 self._loop.call_exception_handler(context)
230 transport.abort()
231 return
232 self._transport = transport
233 reader = self._stream_reader
234 if reader is not None:
235 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400236 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800237 if self._client_connected_cb is not None:
238 self._stream_writer = StreamWriter(transport, self,
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700239 reader,
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300240 self._loop)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700241 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800242 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200244 self._loop.create_task(res)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700245 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246
247 def connection_lost(self, exc):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700248 reader = self._stream_reader
249 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400250 if exc is None:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700251 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400252 else:
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700253 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200254 if not self._closed.done():
255 if exc is None:
256 self._closed.set_result(None)
257 else:
258 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800259 super().connection_lost(exc)
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700260 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400261 self._stream_writer = None
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700262 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263
264 def data_received(self, data):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700265 reader = self._stream_reader
266 if reader is not None:
267 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268
269 def eof_received(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700270 reader = self._stream_reader
271 if reader is not None:
272 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400273 if self._over_ssl:
274 # Prevent a warning in SSLProtocol.eof_received:
275 # "returning true from eof_received()
276 # has no effect when using ssl"
277 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200278 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700279
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700280 def _get_close_waiter(self, stream):
281 return self._closed
282
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200283 def __del__(self):
284 # Prevent reports about unhandled exceptions.
285 # Better than self._closed._log_traceback = False hack
Serhiy Storchaka172c0f22021-04-25 13:40:44 +0300286 try:
287 closed = self._closed
288 except AttributeError:
289 pass # failed constructor
290 else:
291 if closed.done() and not closed.cancelled():
292 closed.exception()
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200293
Guido van Rossum355491d2013-10-18 15:17:11 -0700294
295class StreamWriter:
296 """Wraps a Transport.
297
298 This exposes write(), writelines(), [can_]write_eof(),
299 get_extra_info() and close(). It adds drain() which returns an
300 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800301 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700302 directly.
303 """
304
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300305 def __init__(self, transport, protocol, reader, loop):
Guido van Rossum355491d2013-10-18 15:17:11 -0700306 self._transport = transport
307 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000308 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200309 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700310 self._reader = reader
311 self._loop = loop
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700312 self._complete_fut = self._loop.create_future()
313 self._complete_fut.set_result(None)
Guido van Rossum355491d2013-10-18 15:17:11 -0700314
Victor Stinneracdb7822014-07-14 18:33:40 +0200315 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500316 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200317 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500318 info.append(f'reader={self._reader!r}')
319 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200320
Guido van Rossum355491d2013-10-18 15:17:11 -0700321 @property
322 def transport(self):
323 return self._transport
324
325 def write(self, data):
326 self._transport.write(data)
327
328 def writelines(self, data):
329 self._transport.writelines(data)
330
331 def write_eof(self):
332 return self._transport.write_eof()
333
334 def can_write_eof(self):
335 return self._transport.can_write_eof()
336
Victor Stinner406204c2015-01-15 21:50:19 +0100337 def close(self):
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300338 return self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100339
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200340 def is_closing(self):
341 return self._transport.is_closing()
342
343 async def wait_closed(self):
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700344 await self._protocol._get_close_waiter(self)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200345
Guido van Rossum355491d2013-10-18 15:17:11 -0700346 def get_extra_info(self, name, default=None):
347 return self._transport.get_extra_info(name, default)
348
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200349 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200350 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700351
352 The intended use is to write
353
354 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200355 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700356 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200357 if self._reader is not None:
358 exc = self._reader.exception()
359 if exc is not None:
360 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200361 if self._transport.is_closing():
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700362 # Wait for protocol.connection_lost() call
363 # Raise connection closing error if any,
364 # ConnectionResetError otherwise
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300365 # Yield to the event loop so connection_lost() may be
366 # called. Without this, _drain_helper() would return
367 # immediately, and code that calls
368 # write(...); await drain()
369 # in a loop would never call connection_lost(), so it
370 # would not see an error when the socket is closed.
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700371 await sleep(0)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374
375class StreamReader:
376
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700377 _source_traceback = None
378
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300379 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 # The line length limit is a security feature;
381 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500382
383 if limit <= 0:
384 raise ValueError('Limit cannot be <= 0')
385
Guido van Rossum355491d2013-10-18 15:17:11 -0700386 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 if loop is None:
Serhiy Storchaka172c0f22021-04-25 13:40:44 +0300388 self._loop = events._get_event_loop()
Victor Stinner70db9e42015-01-09 21:32:05 +0100389 else:
390 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500391 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100392 self._eof = False # Whether we're done.
393 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._exception = None
395 self._transport = None
396 self._paused = False
Yury Selivanov6758e6e2019-09-29 21:59:55 -0700397 if self._loop.get_debug():
398 self._source_traceback = format_helpers.extract_stack(
399 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200401 def __repr__(self):
402 info = ['StreamReader']
403 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500404 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200405 if self._eof:
406 info.append('eof')
407 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500408 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200409 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500410 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200411 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500412 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200413 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500414 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200415 if self._paused:
416 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500417 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200418
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 def exception(self):
420 return self._exception
421
422 def set_exception(self, exc):
423 self._exception = exc
424
Guido van Rossum355491d2013-10-18 15:17:11 -0700425 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700427 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 if not waiter.cancelled():
429 waiter.set_exception(exc)
430
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100431 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500432 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100433 waiter = self._waiter
434 if waiter is not None:
435 self._waiter = None
436 if not waiter.cancelled():
437 waiter.set_result(None)
438
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 def set_transport(self, transport):
440 assert self._transport is None, 'Transport already set'
441 self._transport = transport
442
443 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500444 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700446 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
448 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700449 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100450 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
Yury Selivanovf0020f52014-02-06 00:14:30 -0500452 def at_eof(self):
453 """Return True if the buffer is empty and 'feed_eof' was called."""
454 return self._eof and not self._buffer
455
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500457 assert not self._eof, 'feed_data after feed_eof'
458
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 if not data:
460 return
461
Yury Selivanove694c972014-02-05 18:11:13 -0500462 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100463 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
465 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400466 not self._paused and
467 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700469 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 except NotImplementedError:
471 # The transport can't be paused.
472 # We'll just have to buffer all data.
473 # Forget the transport so we don't keep trying.
474 self._transport = None
475 else:
476 self._paused = True
477
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500479 """Wait until feed_data() or feed_eof() is called.
480
481 If stream was paused, automatically resume it.
482 """
Victor Stinner183e3472014-01-23 17:40:03 +0100483 # StreamReader uses a future to link the protocol feed_data() method
484 # to a read coroutine. Running two read coroutines at the same time
485 # would have an unexpected behaviour. It would not possible to know
486 # which coroutine would get the next data.
487 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500488 raise RuntimeError(
489 f'{func_name}() called while another coroutine is '
490 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100491
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500492 assert not self._eof, '_wait_for_data after EOF'
493
494 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400495 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500496 if self._paused:
497 self._paused = False
498 self._transport.resume_reading()
499
Yury Selivanov7661db62016-05-16 15:38:39 -0400500 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100501 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200502 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100503 finally:
504 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100505
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200506 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500507 """Read chunk of data from the stream until newline (b'\n') is found.
508
509 On success, return chunk that ends with newline. If only partial
510 line can be read due to EOF, return incomplete line without
511 terminating newline. When EOF was reached while no bytes read, empty
512 bytes object is returned.
513
514 If limit is reached, ValueError will be raised. In that case, if
515 newline was found, complete line including newline will be removed
516 from internal buffer. Else, internal buffer will be cleared. Limit is
517 compared against part of the line without newline.
518
519 If stream was paused, this function will automatically resume it if
520 needed.
521 """
522 sep = b'\n'
523 seplen = len(sep)
524 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200525 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700526 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500527 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700528 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500529 if self._buffer.startswith(sep, e.consumed):
530 del self._buffer[:e.consumed + seplen]
531 else:
532 self._buffer.clear()
533 self._maybe_resume_transport()
534 raise ValueError(e.args[0])
535 return line
536
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200537 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400538 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500539
Yury Selivanovb4617912016-05-16 16:32:38 -0400540 On success, the data and separator will be removed from the
541 internal buffer (consumed). Returned data will include the
542 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500543
Yury Selivanovb4617912016-05-16 16:32:38 -0400544 Configured stream limit is used to check result. Limit sets the
545 maximal length of data that can be returned, not counting the
546 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500547
Yury Selivanovb4617912016-05-16 16:32:38 -0400548 If an EOF occurs and the complete separator is still not found,
549 an IncompleteReadError exception will be raised, and the internal
550 buffer will be reset. The IncompleteReadError.partial attribute
551 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500552
Yury Selivanovb4617912016-05-16 16:32:38 -0400553 If the data cannot be read because of over limit, a
554 LimitOverrunError exception will be raised, and the data
555 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500556 """
557 seplen = len(separator)
558 if seplen == 0:
559 raise ValueError('Separator should be at least one-byte string')
560
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 if self._exception is not None:
562 raise self._exception
563
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500564 # Consume whole buffer except last bytes, which length is
565 # one less than seplen. Let's check corner cases with
566 # separator='SEPARATOR':
567 # * we have received almost complete separator (without last
568 # byte). i.e buffer='some textSEPARATO'. In this case we
569 # can safely consume len(separator) - 1 bytes.
570 # * last byte of buffer is first byte of separator, i.e.
571 # buffer='abcdefghijklmnopqrS'. We may safely consume
572 # everything except that last byte, but this require to
573 # analyze bytes of buffer that match partial separator.
574 # This is slow and/or require FSM. For this case our
575 # implementation is not optimal, since require rescanning
576 # of data that is known to not belong to separator. In
577 # real world, separator will not be so long to notice
578 # performance problems. Even when reading MIME-encoded
579 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
Yury Selivanovb4617912016-05-16 16:32:38 -0400581 # `offset` is the number of bytes from the beginning of the buffer
582 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500583 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500585 # Loop until we find `separator` in the buffer, exceed the buffer size,
586 # or an EOF has happened.
587 while True:
588 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500590 # Check if we now have enough data in the buffer for `separator` to
591 # fit.
592 if buflen - offset >= seplen:
593 isep = self._buffer.find(separator, offset)
594
595 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400596 # `separator` is in the buffer. `isep` will be used later
597 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500598 break
599
600 # see upper comment for explanation.
601 offset = buflen + 1 - seplen
602 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700603 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400604 'Separator is not found, and chunk exceed the limit',
605 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500606
607 # Complete message (with full separator) may be present in buffer
608 # even when EOF flag is set. This may happen when the last chunk
609 # adds data which makes separator be found. That's why we check for
610 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700611 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500612 chunk = bytes(self._buffer)
613 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700614 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500616 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200617 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500619 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700620 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400621 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500622
623 chunk = self._buffer[:isep + seplen]
624 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500626 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200628 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500629 """Read up to `n` bytes from the stream.
630
631 If n is not provided, or set to -1, read until EOF and return all read
632 bytes. If the EOF was received and the internal buffer is empty, return
633 an empty bytes object.
634
Martin Panter0be894b2016-09-07 12:03:06 +0000635 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500636
637 If n is positive, this function try to read `n` bytes, and may return
638 less or equal bytes than requested, but at least one byte. If EOF was
639 received before any byte is read, this function returns empty byte
640 object.
641
Yury Selivanovb4617912016-05-16 16:32:38 -0400642 Returned value is not limited with limit, configured at stream
643 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644
645 If stream was paused, this function will automatically resume it if
646 needed.
647 """
648
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 if self._exception is not None:
650 raise self._exception
651
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500652 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 return b''
654
655 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700656 # This used to just loop creating a new waiter hoping to
657 # collect everything in self._buffer, but that would
658 # deadlock if the subprocess sends more than self.limit
659 # bytes. So just call self.read(self._limit) until EOF.
660 blocks = []
661 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200662 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700663 if not block:
664 break
665 blocks.append(block)
666 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500668 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200669 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500670
671 # This will work right even if buffer is less than n bytes
672 data = bytes(self._buffer[:n])
673 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674
Yury Selivanove694c972014-02-05 18:11:13 -0500675 self._maybe_resume_transport()
676 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200678 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500679 """Read exactly `n` bytes.
680
Yury Selivanovb4617912016-05-16 16:32:38 -0400681 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
682 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500683 contain the partial read bytes.
684
685 if n is zero, return empty bytes object.
686
Yury Selivanovb4617912016-05-16 16:32:38 -0400687 Returned value is not limited with limit, configured at stream
688 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500689
690 If stream was paused, this function will automatically resume it if
691 needed.
692 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500693 if n < 0:
694 raise ValueError('readexactly size can not be less than zero')
695
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 if self._exception is not None:
697 raise self._exception
698
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500699 if n == 0:
700 return b''
701
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400702 while len(self._buffer) < n:
703 if self._eof:
704 incomplete = bytes(self._buffer)
705 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700706 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200708 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400710 if len(self._buffer) == n:
711 data = bytes(self._buffer)
712 self._buffer.clear()
713 else:
714 data = bytes(self._buffer[:n])
715 del self._buffer[:n]
716 self._maybe_resume_transport()
717 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400718
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400719 def __aiter__(self):
720 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400721
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200722 async def __anext__(self):
723 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400724 if val == b'':
725 raise StopAsyncIteration
726 return val