blob: eef2b895f1e53f88525028e4817afa3db9095615 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server',
4 'IncompleteReadError', 'LimitOverrunError',
5)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Yury Selivanovb057c522014-02-18 12:15:06 -05007import socket
8
Guido van Rossume3e786c2014-02-18 10:24:30 -08009if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050010 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080011
Victor Stinnerf951d282014-06-29 00:46:45 +020012from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020015from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
Yury Selivanovb4617912016-05-16 16:32:38 -040019_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
Guido van Rossuma849be92014-01-30 16:05:28 -080021
Victor Stinner8dffc452014-01-25 15:32:06 +010022class IncompleteReadError(EOFError):
23 """
24 Incomplete read error. Attributes:
25
26 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050027 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010028 """
29 def __init__(self, partial, expected):
Yury Selivanov6370f342017-12-10 18:36:12 -050030 super().__init__(f'{len(partial)} bytes read on a total of '
31 f'{expected!r} expected bytes')
Victor Stinner8dffc452014-01-25 15:32:06 +010032 self.partial = partial
33 self.expected = expected
34
Yury Selivanov43605e62017-11-15 17:14:28 -050035 def __reduce__(self):
36 return type(self), (self.partial, self.expected)
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
Yury Selivanov43605e62017-11-15 17:14:28 -050049 def __reduce__(self):
50 return type(self), (self.args[0], self.consumed)
51
Yury Selivanovd9d0e862016-01-11 12:28:19 -050052
Andrew Svetlov5f841b52017-12-09 00:23:48 +020053async def open_connection(host=None, port=None, *,
54 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 """A wrapper for create_connection() returning a (reader, writer) pair.
56
57 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010058 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60 The arguments are all the usual arguments to create_connection()
61 except protocol_factory; most common are positional host and port,
62 with various optional keyword arguments following.
63
64 Additional optional keyword arguments are loop (to set the event loop
65 instance to use) and limit (to set the buffer limit passed to the
66 StreamReader).
67
68 (If you want to customize the StreamReader and/or
69 StreamReaderProtocol classes, just copy the code -- there's
70 really nothing special here except some convenience.)
71 """
72 if loop is None:
73 loop = events.get_event_loop()
74 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080075 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020076 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070078 writer = StreamWriter(transport, protocol, reader, loop)
79 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
81
Andrew Svetlov5f841b52017-12-09 00:23:48 +020082async def start_server(client_connected_cb, host=None, port=None, *,
83 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080084 """Start a socket server, call back for each client connected.
85
86 The first parameter, `client_connected_cb`, takes two parameters:
87 client_reader, client_writer. client_reader is a StreamReader
88 object, while client_writer is a StreamWriter object. This
89 parameter can either be a plain callback function or a coroutine;
90 if it is a coroutine, it will be automatically converted into a
91 Task.
92
93 The rest of the arguments are all the usual arguments to
94 loop.create_server() except protocol_factory; most common are
95 positional host and port, with various optional keyword arguments
96 following. The return value is the same as loop.create_server().
97
98 Additional optional keyword arguments are loop (to set the event loop
99 instance to use) and limit (to set the buffer limit passed to the
100 StreamReader).
101
102 The return value is the same as loop.create_server(), i.e. a
103 Server object which can be used to stop the service.
104 """
105 if loop is None:
106 loop = events.get_event_loop()
107
108 def factory():
109 reader = StreamReader(limit=limit, loop=loop)
110 protocol = StreamReaderProtocol(reader, client_connected_cb,
111 loop=loop)
112 return protocol
113
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200114 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800115
116
Yury Selivanovb057c522014-02-18 12:15:06 -0500117if hasattr(socket, 'AF_UNIX'):
118 # UNIX Domain Sockets are supported on this platform
119
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200120 async def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200127 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200132 async def start_unix_server(client_connected_cb, path=None, *,
133 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500134 """Similar to `start_server` but works with UNIX Domain Sockets."""
135 if loop is None:
136 loop = events.get_event_loop()
137
138 def factory():
139 reader = StreamReader(limit=limit, loop=loop)
140 protocol = StreamReaderProtocol(reader, client_connected_cb,
141 loop=loop)
142 return protocol
143
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200144 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500145
146
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800147class FlowControlMixin(protocols.Protocol):
148 """Reusable flow control logic for StreamWriter.drain().
149
150 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800151 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152 these it must call the super methods.
153
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200154 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800155 """
156
157 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100158 if loop is None:
159 self._loop = events.get_event_loop()
160 else:
161 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162 self._paused = False
163 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200164 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800165
166 def pause_writing(self):
167 assert not self._paused
168 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200169 if self._loop.get_debug():
170 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800171
172 def resume_writing(self):
173 assert self._paused
174 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200175 if self._loop.get_debug():
176 logger.debug("%r resumes writing", self)
177
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800178 waiter = self._drain_waiter
179 if waiter is not None:
180 self._drain_waiter = None
181 if not waiter.done():
182 waiter.set_result(None)
183
184 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200185 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800186 # Wake up the writer if currently paused.
187 if not self._paused:
188 return
189 waiter = self._drain_waiter
190 if waiter is None:
191 return
192 self._drain_waiter = None
193 if waiter.done():
194 return
195 if exc is None:
196 waiter.set_result(None)
197 else:
198 waiter.set_exception(exc)
199
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 if self._connection_lost:
202 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800203 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200204 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 waiter = self._drain_waiter
206 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400207 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800208 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200209 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210
211
212class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
213 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
215 (This is a helper class instead of making StreamReader itself a
216 Protocol subclass, because the StreamReader has other potential
217 uses, and to prevent the user of the StreamReader to accidentally
218 call inappropriate methods of the protocol.)
219 """
220
Guido van Rossum1540b162013-11-19 11:43:38 -0800221 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800222 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700223 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800224 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800225 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400226 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
228 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700229 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400230 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 if self._client_connected_cb is not None:
232 self._stream_writer = StreamWriter(transport, self,
233 self._stream_reader,
234 self._loop)
235 res = self._client_connected_cb(self._stream_reader,
236 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200237 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200238 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239
240 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400241 if self._stream_reader is not None:
242 if exc is None:
243 self._stream_reader.feed_eof()
244 else:
245 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800246 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400247 self._stream_reader = None
248 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249
250 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700251 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252
253 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700254 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400255 if self._over_ssl:
256 # Prevent a warning in SSLProtocol.eof_received:
257 # "returning true from eof_received()
258 # has no effect when using ssl"
259 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200260 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700261
Guido van Rossum355491d2013-10-18 15:17:11 -0700262
263class StreamWriter:
264 """Wraps a Transport.
265
266 This exposes write(), writelines(), [can_]write_eof(),
267 get_extra_info() and close(). It adds drain() which returns an
268 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800269 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700270 directly.
271 """
272
273 def __init__(self, transport, protocol, reader, loop):
274 self._transport = transport
275 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000276 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200277 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700278 self._reader = reader
279 self._loop = loop
280
Victor Stinneracdb7822014-07-14 18:33:40 +0200281 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500282 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200283 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500284 info.append(f'reader={self._reader!r}')
285 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200286
Guido van Rossum355491d2013-10-18 15:17:11 -0700287 @property
288 def transport(self):
289 return self._transport
290
291 def write(self, data):
292 self._transport.write(data)
293
294 def writelines(self, data):
295 self._transport.writelines(data)
296
297 def write_eof(self):
298 return self._transport.write_eof()
299
300 def can_write_eof(self):
301 return self._transport.can_write_eof()
302
Victor Stinner406204c2015-01-15 21:50:19 +0100303 def close(self):
304 return self._transport.close()
305
Guido van Rossum355491d2013-10-18 15:17:11 -0700306 def get_extra_info(self, name, default=None):
307 return self._transport.get_extra_info(name, default)
308
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200309 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200310 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700311
312 The intended use is to write
313
314 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200315 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700316 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200317 if self._reader is not None:
318 exc = self._reader.exception()
319 if exc is not None:
320 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700321 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500322 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700323 # Yield to the event loop so connection_lost() may be
324 # called. Without this, _drain_helper() would return
325 # immediately, and code that calls
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200326 # write(...); await drain()
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700327 # in a loop would never call connection_lost(), so it
328 # would not see an error when the socket is closed.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200329 await sleep(0, loop=self._loop)
330 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
332
333class StreamReader:
334
335 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
336 # The line length limit is a security feature;
337 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500338
339 if limit <= 0:
340 raise ValueError('Limit cannot be <= 0')
341
Guido van Rossum355491d2013-10-18 15:17:11 -0700342 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100344 self._loop = events.get_event_loop()
345 else:
346 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500347 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100348 self._eof = False # Whether we're done.
349 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self._exception = None
351 self._transport = None
352 self._paused = False
353
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200354 def __repr__(self):
355 info = ['StreamReader']
356 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500357 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200358 if self._eof:
359 info.append('eof')
360 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500361 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200362 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500363 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200364 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500365 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200366 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500367 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200368 if self._paused:
369 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500370 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200371
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 def exception(self):
373 return self._exception
374
375 def set_exception(self, exc):
376 self._exception = exc
377
Guido van Rossum355491d2013-10-18 15:17:11 -0700378 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700380 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if not waiter.cancelled():
382 waiter.set_exception(exc)
383
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100384 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500385 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100386 waiter = self._waiter
387 if waiter is not None:
388 self._waiter = None
389 if not waiter.cancelled():
390 waiter.set_result(None)
391
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 def set_transport(self, transport):
393 assert self._transport is None, 'Transport already set'
394 self._transport = transport
395
396 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500397 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700399 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700402 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100403 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Yury Selivanovf0020f52014-02-06 00:14:30 -0500405 def at_eof(self):
406 """Return True if the buffer is empty and 'feed_eof' was called."""
407 return self._eof and not self._buffer
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500410 assert not self._eof, 'feed_data after feed_eof'
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 if not data:
413 return
414
Yury Selivanove694c972014-02-05 18:11:13 -0500415 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100416 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
418 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400419 not self._paused and
420 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700422 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 except NotImplementedError:
424 # The transport can't be paused.
425 # We'll just have to buffer all data.
426 # Forget the transport so we don't keep trying.
427 self._transport = None
428 else:
429 self._paused = True
430
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200431 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500432 """Wait until feed_data() or feed_eof() is called.
433
434 If stream was paused, automatically resume it.
435 """
Victor Stinner183e3472014-01-23 17:40:03 +0100436 # StreamReader uses a future to link the protocol feed_data() method
437 # to a read coroutine. Running two read coroutines at the same time
438 # would have an unexpected behaviour. It would not possible to know
439 # which coroutine would get the next data.
440 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500441 raise RuntimeError(
442 f'{func_name}() called while another coroutine is '
443 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100444
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500445 assert not self._eof, '_wait_for_data after EOF'
446
447 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400448 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500449 if self._paused:
450 self._paused = False
451 self._transport.resume_reading()
452
Yury Selivanov7661db62016-05-16 15:38:39 -0400453 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100454 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200455 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100456 finally:
457 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100458
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200459 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500460 """Read chunk of data from the stream until newline (b'\n') is found.
461
462 On success, return chunk that ends with newline. If only partial
463 line can be read due to EOF, return incomplete line without
464 terminating newline. When EOF was reached while no bytes read, empty
465 bytes object is returned.
466
467 If limit is reached, ValueError will be raised. In that case, if
468 newline was found, complete line including newline will be removed
469 from internal buffer. Else, internal buffer will be cleared. Limit is
470 compared against part of the line without newline.
471
472 If stream was paused, this function will automatically resume it if
473 needed.
474 """
475 sep = b'\n'
476 seplen = len(sep)
477 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 line = await self.readuntil(sep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500479 except IncompleteReadError as e:
480 return e.partial
481 except LimitOverrunError as e:
482 if self._buffer.startswith(sep, e.consumed):
483 del self._buffer[:e.consumed + seplen]
484 else:
485 self._buffer.clear()
486 self._maybe_resume_transport()
487 raise ValueError(e.args[0])
488 return line
489
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200490 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400491 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500492
Yury Selivanovb4617912016-05-16 16:32:38 -0400493 On success, the data and separator will be removed from the
494 internal buffer (consumed). Returned data will include the
495 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500496
Yury Selivanovb4617912016-05-16 16:32:38 -0400497 Configured stream limit is used to check result. Limit sets the
498 maximal length of data that can be returned, not counting the
499 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500500
Yury Selivanovb4617912016-05-16 16:32:38 -0400501 If an EOF occurs and the complete separator is still not found,
502 an IncompleteReadError exception will be raised, and the internal
503 buffer will be reset. The IncompleteReadError.partial attribute
504 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500505
Yury Selivanovb4617912016-05-16 16:32:38 -0400506 If the data cannot be read because of over limit, a
507 LimitOverrunError exception will be raised, and the data
508 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500509 """
510 seplen = len(separator)
511 if seplen == 0:
512 raise ValueError('Separator should be at least one-byte string')
513
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 if self._exception is not None:
515 raise self._exception
516
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500517 # Consume whole buffer except last bytes, which length is
518 # one less than seplen. Let's check corner cases with
519 # separator='SEPARATOR':
520 # * we have received almost complete separator (without last
521 # byte). i.e buffer='some textSEPARATO'. In this case we
522 # can safely consume len(separator) - 1 bytes.
523 # * last byte of buffer is first byte of separator, i.e.
524 # buffer='abcdefghijklmnopqrS'. We may safely consume
525 # everything except that last byte, but this require to
526 # analyze bytes of buffer that match partial separator.
527 # This is slow and/or require FSM. For this case our
528 # implementation is not optimal, since require rescanning
529 # of data that is known to not belong to separator. In
530 # real world, separator will not be so long to notice
531 # performance problems. Even when reading MIME-encoded
532 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
Yury Selivanovb4617912016-05-16 16:32:38 -0400534 # `offset` is the number of bytes from the beginning of the buffer
535 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500536 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500538 # Loop until we find `separator` in the buffer, exceed the buffer size,
539 # or an EOF has happened.
540 while True:
541 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500543 # Check if we now have enough data in the buffer for `separator` to
544 # fit.
545 if buflen - offset >= seplen:
546 isep = self._buffer.find(separator, offset)
547
548 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400549 # `separator` is in the buffer. `isep` will be used later
550 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500551 break
552
553 # see upper comment for explanation.
554 offset = buflen + 1 - seplen
555 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400556 raise LimitOverrunError(
557 'Separator is not found, and chunk exceed the limit',
558 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500559
560 # Complete message (with full separator) may be present in buffer
561 # even when EOF flag is set. This may happen when the last chunk
562 # adds data which makes separator be found. That's why we check for
563 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700564 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500565 chunk = bytes(self._buffer)
566 self._buffer.clear()
567 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500569 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200570 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500572 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400573 raise LimitOverrunError(
574 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500575
576 chunk = self._buffer[:isep + seplen]
577 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500579 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200581 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500582 """Read up to `n` bytes from the stream.
583
584 If n is not provided, or set to -1, read until EOF and return all read
585 bytes. If the EOF was received and the internal buffer is empty, return
586 an empty bytes object.
587
Martin Panter0be894b2016-09-07 12:03:06 +0000588 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500589
590 If n is positive, this function try to read `n` bytes, and may return
591 less or equal bytes than requested, but at least one byte. If EOF was
592 received before any byte is read, this function returns empty byte
593 object.
594
Yury Selivanovb4617912016-05-16 16:32:38 -0400595 Returned value is not limited with limit, configured at stream
596 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500597
598 If stream was paused, this function will automatically resume it if
599 needed.
600 """
601
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 if self._exception is not None:
603 raise self._exception
604
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500605 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 return b''
607
608 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700609 # This used to just loop creating a new waiter hoping to
610 # collect everything in self._buffer, but that would
611 # deadlock if the subprocess sends more than self.limit
612 # bytes. So just call self.read(self._limit) until EOF.
613 blocks = []
614 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200615 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700616 if not block:
617 break
618 blocks.append(block)
619 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500621 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200622 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500623
624 # This will work right even if buffer is less than n bytes
625 data = bytes(self._buffer[:n])
626 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
Yury Selivanove694c972014-02-05 18:11:13 -0500628 self._maybe_resume_transport()
629 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200631 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500632 """Read exactly `n` bytes.
633
Yury Selivanovb4617912016-05-16 16:32:38 -0400634 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
635 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500636 contain the partial read bytes.
637
638 if n is zero, return empty bytes object.
639
Yury Selivanovb4617912016-05-16 16:32:38 -0400640 Returned value is not limited with limit, configured at stream
641 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500642
643 If stream was paused, this function will automatically resume it if
644 needed.
645 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500646 if n < 0:
647 raise ValueError('readexactly size can not be less than zero')
648
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 if self._exception is not None:
650 raise self._exception
651
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500652 if n == 0:
653 return b''
654
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400655 while len(self._buffer) < n:
656 if self._eof:
657 incomplete = bytes(self._buffer)
658 self._buffer.clear()
659 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200661 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400663 if len(self._buffer) == n:
664 data = bytes(self._buffer)
665 self._buffer.clear()
666 else:
667 data = bytes(self._buffer[:n])
668 del self._buffer[:n]
669 self._maybe_resume_transport()
670 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400671
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400672 def __aiter__(self):
673 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400674
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200675 async def __anext__(self):
676 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400677 if val == b'':
678 raise StopAsyncIteration
679 return val