blob: 05774e953009f7c85cb7d1e8d320873efd194613 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020017from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Yury Selivanovb4617912016-05-16 16:32:38 -040021_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050029 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010030 """
31 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050032 super().__init__("%d bytes read on a total of %r expected bytes"
33 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010034 self.partial = partial
35 self.expected = expected
36
Yury Selivanov43605e62017-11-15 17:14:28 -050037 def __reduce__(self):
38 return type(self), (self.partial, self.expected)
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040042 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050043
44 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040045 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050046 """
47 def __init__(self, message, consumed):
48 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050049 self.consumed = consumed
50
Yury Selivanov43605e62017-11-15 17:14:28 -050051 def __reduce__(self):
52 return type(self), (self.args[0], self.consumed)
53
Yury Selivanovd9d0e862016-01-11 12:28:19 -050054
Victor Stinnerf951d282014-06-29 00:46:45 +020055@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056def open_connection(host=None, port=None, *,
57 loop=None, limit=_DEFAULT_LIMIT, **kwds):
58 """A wrapper for create_connection() returning a (reader, writer) pair.
59
60 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010061 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
63 The arguments are all the usual arguments to create_connection()
64 except protocol_factory; most common are positional host and port,
65 with various optional keyword arguments following.
66
67 Additional optional keyword arguments are loop (to set the event loop
68 instance to use) and limit (to set the buffer limit passed to the
69 StreamReader).
70
71 (If you want to customize the StreamReader and/or
72 StreamReaderProtocol classes, just copy the code -- there's
73 really nothing special here except some convenience.)
74 """
75 if loop is None:
76 loop = events.get_event_loop()
77 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080078 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 transport, _ = yield from loop.create_connection(
80 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070081 writer = StreamWriter(transport, protocol, reader, loop)
82 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083
84
Victor Stinnerf951d282014-06-29 00:46:45 +020085@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080086def start_server(client_connected_cb, host=None, port=None, *,
87 loop=None, limit=_DEFAULT_LIMIT, **kwds):
88 """Start a socket server, call back for each client connected.
89
90 The first parameter, `client_connected_cb`, takes two parameters:
91 client_reader, client_writer. client_reader is a StreamReader
92 object, while client_writer is a StreamWriter object. This
93 parameter can either be a plain callback function or a coroutine;
94 if it is a coroutine, it will be automatically converted into a
95 Task.
96
97 The rest of the arguments are all the usual arguments to
98 loop.create_server() except protocol_factory; most common are
99 positional host and port, with various optional keyword arguments
100 following. The return value is the same as loop.create_server().
101
102 Additional optional keyword arguments are loop (to set the event loop
103 instance to use) and limit (to set the buffer limit passed to the
104 StreamReader).
105
106 The return value is the same as loop.create_server(), i.e. a
107 Server object which can be used to stop the service.
108 """
109 if loop is None:
110 loop = events.get_event_loop()
111
112 def factory():
113 reader = StreamReader(limit=limit, loop=loop)
114 protocol = StreamReaderProtocol(reader, client_connected_cb,
115 loop=loop)
116 return protocol
117
118 return (yield from loop.create_server(factory, host, port, **kwds))
119
120
Yury Selivanovb057c522014-02-18 12:15:06 -0500121if hasattr(socket, 'AF_UNIX'):
122 # UNIX Domain Sockets are supported on this platform
123
Victor Stinnerf951d282014-06-29 00:46:45 +0200124 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500125 def open_unix_connection(path=None, *,
126 loop=None, limit=_DEFAULT_LIMIT, **kwds):
127 """Similar to `open_connection` but works with UNIX Domain Sockets."""
128 if loop is None:
129 loop = events.get_event_loop()
130 reader = StreamReader(limit=limit, loop=loop)
131 protocol = StreamReaderProtocol(reader, loop=loop)
132 transport, _ = yield from loop.create_unix_connection(
133 lambda: protocol, path, **kwds)
134 writer = StreamWriter(transport, protocol, reader, loop)
135 return reader, writer
136
Victor Stinnerf951d282014-06-29 00:46:45 +0200137 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500138 def start_unix_server(client_connected_cb, path=None, *,
139 loop=None, limit=_DEFAULT_LIMIT, **kwds):
140 """Similar to `start_server` but works with UNIX Domain Sockets."""
141 if loop is None:
142 loop = events.get_event_loop()
143
144 def factory():
145 reader = StreamReader(limit=limit, loop=loop)
146 protocol = StreamReaderProtocol(reader, client_connected_cb,
147 loop=loop)
148 return protocol
149
150 return (yield from loop.create_unix_server(factory, path, **kwds))
151
152
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153class FlowControlMixin(protocols.Protocol):
154 """Reusable flow control logic for StreamWriter.drain().
155
156 This implements the protocol methods pause_writing(),
157 resume_reading() and connection_lost(). If the subclass overrides
158 these it must call the super methods.
159
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200160 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800161 """
162
163 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100164 if loop is None:
165 self._loop = events.get_event_loop()
166 else:
167 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800168 self._paused = False
169 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200170 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800171
172 def pause_writing(self):
173 assert not self._paused
174 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200175 if self._loop.get_debug():
176 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177
178 def resume_writing(self):
179 assert self._paused
180 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200181 if self._loop.get_debug():
182 logger.debug("%r resumes writing", self)
183
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800184 waiter = self._drain_waiter
185 if waiter is not None:
186 self._drain_waiter = None
187 if not waiter.done():
188 waiter.set_result(None)
189
190 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200191 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800192 # Wake up the writer if currently paused.
193 if not self._paused:
194 return
195 waiter = self._drain_waiter
196 if waiter is None:
197 return
198 self._drain_waiter = None
199 if waiter.done():
200 return
201 if exc is None:
202 waiter.set_result(None)
203 else:
204 waiter.set_exception(exc)
205
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200206 @coroutine
207 def _drain_helper(self):
208 if self._connection_lost:
209 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200211 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800212 waiter = self._drain_waiter
213 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400214 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800215 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200216 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800217
218
219class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
220 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222 (This is a helper class instead of making StreamReader itself a
223 Protocol subclass, because the StreamReader has other potential
224 uses, and to prevent the user of the StreamReader to accidentally
225 call inappropriate methods of the protocol.)
226 """
227
Guido van Rossum1540b162013-11-19 11:43:38 -0800228 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800229 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700230 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800232 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400233 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234
235 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700236 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400237 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800238 if self._client_connected_cb is not None:
239 self._stream_writer = StreamWriter(transport, self,
240 self._stream_reader,
241 self._loop)
242 res = self._client_connected_cb(self._stream_reader,
243 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200244 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200245 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246
247 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400248 if self._stream_reader is not None:
249 if exc is None:
250 self._stream_reader.feed_eof()
251 else:
252 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800253 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400254 self._stream_reader = None
255 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256
257 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700258 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259
260 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700261 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400262 if self._over_ssl:
263 # Prevent a warning in SSLProtocol.eof_received:
264 # "returning true from eof_received()
265 # has no effect when using ssl"
266 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200267 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700268
Guido van Rossum355491d2013-10-18 15:17:11 -0700269
270class StreamWriter:
271 """Wraps a Transport.
272
273 This exposes write(), writelines(), [can_]write_eof(),
274 get_extra_info() and close(). It adds drain() which returns an
275 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800276 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700277 directly.
278 """
279
280 def __init__(self, transport, protocol, reader, loop):
281 self._transport = transport
282 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000283 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200284 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700285 self._reader = reader
286 self._loop = loop
287
Victor Stinneracdb7822014-07-14 18:33:40 +0200288 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100289 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200290 if self._reader is not None:
291 info.append('reader=%r' % self._reader)
292 return '<%s>' % ' '.join(info)
293
Guido van Rossum355491d2013-10-18 15:17:11 -0700294 @property
295 def transport(self):
296 return self._transport
297
298 def write(self, data):
299 self._transport.write(data)
300
301 def writelines(self, data):
302 self._transport.writelines(data)
303
304 def write_eof(self):
305 return self._transport.write_eof()
306
307 def can_write_eof(self):
308 return self._transport.can_write_eof()
309
Victor Stinner406204c2015-01-15 21:50:19 +0100310 def close(self):
311 return self._transport.close()
312
Guido van Rossum355491d2013-10-18 15:17:11 -0700313 def get_extra_info(self, name, default=None):
314 return self._transport.get_extra_info(name, default)
315
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200316 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700317 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200318 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700319
320 The intended use is to write
321
322 w.write(data)
323 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700324 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200325 if self._reader is not None:
326 exc = self._reader.exception()
327 if exc is not None:
328 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700329 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500330 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700331 # Yield to the event loop so connection_lost() may be
332 # called. Without this, _drain_helper() would return
333 # immediately, and code that calls
334 # write(...); yield from drain()
335 # in a loop would never call connection_lost(), so it
336 # would not see an error when the socket is closed.
337 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200338 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340
341class StreamReader:
342
343 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
344 # The line length limit is a security feature;
345 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500346
347 if limit <= 0:
348 raise ValueError('Limit cannot be <= 0')
349
Guido van Rossum355491d2013-10-18 15:17:11 -0700350 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100352 self._loop = events.get_event_loop()
353 else:
354 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500355 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100356 self._eof = False # Whether we're done.
357 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 self._exception = None
359 self._transport = None
360 self._paused = False
361
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200362 def __repr__(self):
363 info = ['StreamReader']
364 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300365 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200366 if self._eof:
367 info.append('eof')
368 if self._limit != _DEFAULT_LIMIT:
369 info.append('l=%d' % self._limit)
370 if self._waiter:
371 info.append('w=%r' % self._waiter)
372 if self._exception:
373 info.append('e=%r' % self._exception)
374 if self._transport:
375 info.append('t=%r' % self._transport)
376 if self._paused:
377 info.append('paused')
378 return '<%s>' % ' '.join(info)
379
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 def exception(self):
381 return self._exception
382
383 def set_exception(self, exc):
384 self._exception = exc
385
Guido van Rossum355491d2013-10-18 15:17:11 -0700386 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700388 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 if not waiter.cancelled():
390 waiter.set_exception(exc)
391
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100392 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500393 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100394 waiter = self._waiter
395 if waiter is not None:
396 self._waiter = None
397 if not waiter.cancelled():
398 waiter.set_result(None)
399
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 def set_transport(self, transport):
401 assert self._transport is None, 'Transport already set'
402 self._transport = transport
403
404 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500405 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700407 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408
409 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700410 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100411 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
Yury Selivanovf0020f52014-02-06 00:14:30 -0500413 def at_eof(self):
414 """Return True if the buffer is empty and 'feed_eof' was called."""
415 return self._eof and not self._buffer
416
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500418 assert not self._eof, 'feed_data after feed_eof'
419
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 if not data:
421 return
422
Yury Selivanove694c972014-02-05 18:11:13 -0500423 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100424 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
426 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400427 not self._paused and
428 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700430 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 except NotImplementedError:
432 # The transport can't be paused.
433 # We'll just have to buffer all data.
434 # Forget the transport so we don't keep trying.
435 self._transport = None
436 else:
437 self._paused = True
438
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100439 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100440 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500441 """Wait until feed_data() or feed_eof() is called.
442
443 If stream was paused, automatically resume it.
444 """
Victor Stinner183e3472014-01-23 17:40:03 +0100445 # StreamReader uses a future to link the protocol feed_data() method
446 # to a read coroutine. Running two read coroutines at the same time
447 # would have an unexpected behaviour. It would not possible to know
448 # which coroutine would get the next data.
449 if self._waiter is not None:
450 raise RuntimeError('%s() called while another coroutine is '
451 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100452
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500453 assert not self._eof, '_wait_for_data after EOF'
454
455 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400456 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500457 if self._paused:
458 self._paused = False
459 self._transport.resume_reading()
460
Yury Selivanov7661db62016-05-16 15:38:39 -0400461 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100462 try:
463 yield from self._waiter
464 finally:
465 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100466
Victor Stinnerf951d282014-06-29 00:46:45 +0200467 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500469 """Read chunk of data from the stream until newline (b'\n') is found.
470
471 On success, return chunk that ends with newline. If only partial
472 line can be read due to EOF, return incomplete line without
473 terminating newline. When EOF was reached while no bytes read, empty
474 bytes object is returned.
475
476 If limit is reached, ValueError will be raised. In that case, if
477 newline was found, complete line including newline will be removed
478 from internal buffer. Else, internal buffer will be cleared. Limit is
479 compared against part of the line without newline.
480
481 If stream was paused, this function will automatically resume it if
482 needed.
483 """
484 sep = b'\n'
485 seplen = len(sep)
486 try:
487 line = yield from self.readuntil(sep)
488 except IncompleteReadError as e:
489 return e.partial
490 except LimitOverrunError as e:
491 if self._buffer.startswith(sep, e.consumed):
492 del self._buffer[:e.consumed + seplen]
493 else:
494 self._buffer.clear()
495 self._maybe_resume_transport()
496 raise ValueError(e.args[0])
497 return line
498
499 @coroutine
500 def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400501 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500502
Yury Selivanovb4617912016-05-16 16:32:38 -0400503 On success, the data and separator will be removed from the
504 internal buffer (consumed). Returned data will include the
505 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500506
Yury Selivanovb4617912016-05-16 16:32:38 -0400507 Configured stream limit is used to check result. Limit sets the
508 maximal length of data that can be returned, not counting the
509 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500510
Yury Selivanovb4617912016-05-16 16:32:38 -0400511 If an EOF occurs and the complete separator is still not found,
512 an IncompleteReadError exception will be raised, and the internal
513 buffer will be reset. The IncompleteReadError.partial attribute
514 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500515
Yury Selivanovb4617912016-05-16 16:32:38 -0400516 If the data cannot be read because of over limit, a
517 LimitOverrunError exception will be raised, and the data
518 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500519 """
520 seplen = len(separator)
521 if seplen == 0:
522 raise ValueError('Separator should be at least one-byte string')
523
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 if self._exception is not None:
525 raise self._exception
526
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500527 # Consume whole buffer except last bytes, which length is
528 # one less than seplen. Let's check corner cases with
529 # separator='SEPARATOR':
530 # * we have received almost complete separator (without last
531 # byte). i.e buffer='some textSEPARATO'. In this case we
532 # can safely consume len(separator) - 1 bytes.
533 # * last byte of buffer is first byte of separator, i.e.
534 # buffer='abcdefghijklmnopqrS'. We may safely consume
535 # everything except that last byte, but this require to
536 # analyze bytes of buffer that match partial separator.
537 # This is slow and/or require FSM. For this case our
538 # implementation is not optimal, since require rescanning
539 # of data that is known to not belong to separator. In
540 # real world, separator will not be so long to notice
541 # performance problems. Even when reading MIME-encoded
542 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
Yury Selivanovb4617912016-05-16 16:32:38 -0400544 # `offset` is the number of bytes from the beginning of the buffer
545 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500546 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500548 # Loop until we find `separator` in the buffer, exceed the buffer size,
549 # or an EOF has happened.
550 while True:
551 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500553 # Check if we now have enough data in the buffer for `separator` to
554 # fit.
555 if buflen - offset >= seplen:
556 isep = self._buffer.find(separator, offset)
557
558 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400559 # `separator` is in the buffer. `isep` will be used later
560 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561 break
562
563 # see upper comment for explanation.
564 offset = buflen + 1 - seplen
565 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400566 raise LimitOverrunError(
567 'Separator is not found, and chunk exceed the limit',
568 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500569
570 # Complete message (with full separator) may be present in buffer
571 # even when EOF flag is set. This may happen when the last chunk
572 # adds data which makes separator be found. That's why we check for
573 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700574 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500575 chunk = bytes(self._buffer)
576 self._buffer.clear()
577 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500579 # _wait_for_data() will resume reading if stream was paused.
580 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500582 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400583 raise LimitOverrunError(
584 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500585
586 chunk = self._buffer[:isep + seplen]
587 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500589 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590
Victor Stinnerf951d282014-06-29 00:46:45 +0200591 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500593 """Read up to `n` bytes from the stream.
594
595 If n is not provided, or set to -1, read until EOF and return all read
596 bytes. If the EOF was received and the internal buffer is empty, return
597 an empty bytes object.
598
Martin Panter0be894b2016-09-07 12:03:06 +0000599 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500600
601 If n is positive, this function try to read `n` bytes, and may return
602 less or equal bytes than requested, but at least one byte. If EOF was
603 received before any byte is read, this function returns empty byte
604 object.
605
Yury Selivanovb4617912016-05-16 16:32:38 -0400606 Returned value is not limited with limit, configured at stream
607 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500608
609 If stream was paused, this function will automatically resume it if
610 needed.
611 """
612
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 if self._exception is not None:
614 raise self._exception
615
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500616 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 return b''
618
619 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700620 # This used to just loop creating a new waiter hoping to
621 # collect everything in self._buffer, but that would
622 # deadlock if the subprocess sends more than self.limit
623 # bytes. So just call self.read(self._limit) until EOF.
624 blocks = []
625 while True:
626 block = yield from self.read(self._limit)
627 if not block:
628 break
629 blocks.append(block)
630 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500632 if not self._buffer and not self._eof:
633 yield from self._wait_for_data('read')
634
635 # This will work right even if buffer is less than n bytes
636 data = bytes(self._buffer[:n])
637 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638
Yury Selivanove694c972014-02-05 18:11:13 -0500639 self._maybe_resume_transport()
640 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641
Victor Stinnerf951d282014-06-29 00:46:45 +0200642 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 """Read exactly `n` bytes.
645
Yury Selivanovb4617912016-05-16 16:32:38 -0400646 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
647 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500648 contain the partial read bytes.
649
650 if n is zero, return empty bytes object.
651
Yury Selivanovb4617912016-05-16 16:32:38 -0400652 Returned value is not limited with limit, configured at stream
653 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500654
655 If stream was paused, this function will automatically resume it if
656 needed.
657 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500658 if n < 0:
659 raise ValueError('readexactly size can not be less than zero')
660
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 if self._exception is not None:
662 raise self._exception
663
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500664 if n == 0:
665 return b''
666
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400667 while len(self._buffer) < n:
668 if self._eof:
669 incomplete = bytes(self._buffer)
670 self._buffer.clear()
671 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400673 yield from self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400675 if len(self._buffer) == n:
676 data = bytes(self._buffer)
677 self._buffer.clear()
678 else:
679 data = bytes(self._buffer[:n])
680 del self._buffer[:n]
681 self._maybe_resume_transport()
682 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400683
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400684 def __aiter__(self):
685 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400686
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400687 @coroutine
688 def __anext__(self):
689 val = yield from self.readline()
690 if val == b'':
691 raise StopAsyncIteration
692 return val