blob: 30b751e9891b38d86ccb62f153ac754c03552c8a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanovb4617912016-05-16 16:32:38 -040022_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050030 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010031 """
32 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050033 super().__init__("%d bytes read on a total of %r expected bytes"
34 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010035 self.partial = partial
36 self.expected = expected
37
Yury Selivanov43605e62017-11-15 17:14:28 -050038 def __reduce__(self):
39 return type(self), (self.partial, self.expected)
40
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovd9d0e862016-01-11 12:28:19 -050042class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040043 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044
45 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040046 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 """
48 def __init__(self, message, consumed):
49 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050050 self.consumed = consumed
51
Yury Selivanov43605e62017-11-15 17:14:28 -050052 def __reduce__(self):
53 return type(self), (self.args[0], self.consumed)
54
Yury Selivanovd9d0e862016-01-11 12:28:19 -050055
Victor Stinnerf951d282014-06-29 00:46:45 +020056@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057def open_connection(host=None, port=None, *,
58 loop=None, limit=_DEFAULT_LIMIT, **kwds):
59 """A wrapper for create_connection() returning a (reader, writer) pair.
60
61 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010062 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063
64 The arguments are all the usual arguments to create_connection()
65 except protocol_factory; most common are positional host and port,
66 with various optional keyword arguments following.
67
68 Additional optional keyword arguments are loop (to set the event loop
69 instance to use) and limit (to set the buffer limit passed to the
70 StreamReader).
71
72 (If you want to customize the StreamReader and/or
73 StreamReaderProtocol classes, just copy the code -- there's
74 really nothing special here except some convenience.)
75 """
76 if loop is None:
77 loop = events.get_event_loop()
78 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080079 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 transport, _ = yield from loop.create_connection(
81 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070082 writer = StreamWriter(transport, protocol, reader, loop)
83 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
85
Victor Stinnerf951d282014-06-29 00:46:45 +020086@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080087def start_server(client_connected_cb, host=None, port=None, *,
88 loop=None, limit=_DEFAULT_LIMIT, **kwds):
89 """Start a socket server, call back for each client connected.
90
91 The first parameter, `client_connected_cb`, takes two parameters:
92 client_reader, client_writer. client_reader is a StreamReader
93 object, while client_writer is a StreamWriter object. This
94 parameter can either be a plain callback function or a coroutine;
95 if it is a coroutine, it will be automatically converted into a
96 Task.
97
98 The rest of the arguments are all the usual arguments to
99 loop.create_server() except protocol_factory; most common are
100 positional host and port, with various optional keyword arguments
101 following. The return value is the same as loop.create_server().
102
103 Additional optional keyword arguments are loop (to set the event loop
104 instance to use) and limit (to set the buffer limit passed to the
105 StreamReader).
106
107 The return value is the same as loop.create_server(), i.e. a
108 Server object which can be used to stop the service.
109 """
110 if loop is None:
111 loop = events.get_event_loop()
112
113 def factory():
114 reader = StreamReader(limit=limit, loop=loop)
115 protocol = StreamReaderProtocol(reader, client_connected_cb,
116 loop=loop)
117 return protocol
118
119 return (yield from loop.create_server(factory, host, port, **kwds))
120
121
Yury Selivanovb057c522014-02-18 12:15:06 -0500122if hasattr(socket, 'AF_UNIX'):
123 # UNIX Domain Sockets are supported on this platform
124
Victor Stinnerf951d282014-06-29 00:46:45 +0200125 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500126 def open_unix_connection(path=None, *,
127 loop=None, limit=_DEFAULT_LIMIT, **kwds):
128 """Similar to `open_connection` but works with UNIX Domain Sockets."""
129 if loop is None:
130 loop = events.get_event_loop()
131 reader = StreamReader(limit=limit, loop=loop)
132 protocol = StreamReaderProtocol(reader, loop=loop)
133 transport, _ = yield from loop.create_unix_connection(
134 lambda: protocol, path, **kwds)
135 writer = StreamWriter(transport, protocol, reader, loop)
136 return reader, writer
137
Victor Stinnerf951d282014-06-29 00:46:45 +0200138 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500139 def start_unix_server(client_connected_cb, path=None, *,
140 loop=None, limit=_DEFAULT_LIMIT, **kwds):
141 """Similar to `start_server` but works with UNIX Domain Sockets."""
142 if loop is None:
143 loop = events.get_event_loop()
144
145 def factory():
146 reader = StreamReader(limit=limit, loop=loop)
147 protocol = StreamReaderProtocol(reader, client_connected_cb,
148 loop=loop)
149 return protocol
150
151 return (yield from loop.create_unix_server(factory, path, **kwds))
152
153
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800154class FlowControlMixin(protocols.Protocol):
155 """Reusable flow control logic for StreamWriter.drain().
156
157 This implements the protocol methods pause_writing(),
158 resume_reading() and connection_lost(). If the subclass overrides
159 these it must call the super methods.
160
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200161 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162 """
163
164 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100165 if loop is None:
166 self._loop = events.get_event_loop()
167 else:
168 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800169 self._paused = False
170 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200171 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800172
173 def pause_writing(self):
174 assert not self._paused
175 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200176 if self._loop.get_debug():
177 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800178
179 def resume_writing(self):
180 assert self._paused
181 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200182 if self._loop.get_debug():
183 logger.debug("%r resumes writing", self)
184
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800185 waiter = self._drain_waiter
186 if waiter is not None:
187 self._drain_waiter = None
188 if not waiter.done():
189 waiter.set_result(None)
190
191 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800193 # Wake up the writer if currently paused.
194 if not self._paused:
195 return
196 waiter = self._drain_waiter
197 if waiter is None:
198 return
199 self._drain_waiter = None
200 if waiter.done():
201 return
202 if exc is None:
203 waiter.set_result(None)
204 else:
205 waiter.set_exception(exc)
206
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200207 @coroutine
208 def _drain_helper(self):
209 if self._connection_lost:
210 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800211 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200212 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800213 waiter = self._drain_waiter
214 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400215 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800216 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200217 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800218
219
220class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
221 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222
223 (This is a helper class instead of making StreamReader itself a
224 Protocol subclass, because the StreamReader has other potential
225 uses, and to prevent the user of the StreamReader to accidentally
226 call inappropriate methods of the protocol.)
227 """
228
Guido van Rossum1540b162013-11-19 11:43:38 -0800229 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800230 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800232 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800233 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400234 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235
236 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700237 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400238 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800239 if self._client_connected_cb is not None:
240 self._stream_writer = StreamWriter(transport, self,
241 self._stream_reader,
242 self._loop)
243 res = self._client_connected_cb(self._stream_reader,
244 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200245 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200246 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247
248 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400249 if self._stream_reader is not None:
250 if exc is None:
251 self._stream_reader.feed_eof()
252 else:
253 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800254 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400255 self._stream_reader = None
256 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257
258 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700259 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260
261 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700262 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400263 if self._over_ssl:
264 # Prevent a warning in SSLProtocol.eof_received:
265 # "returning true from eof_received()
266 # has no effect when using ssl"
267 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200268 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700269
Guido van Rossum355491d2013-10-18 15:17:11 -0700270
271class StreamWriter:
272 """Wraps a Transport.
273
274 This exposes write(), writelines(), [can_]write_eof(),
275 get_extra_info() and close(). It adds drain() which returns an
276 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800277 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700278 directly.
279 """
280
281 def __init__(self, transport, protocol, reader, loop):
282 self._transport = transport
283 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000284 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200285 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700286 self._reader = reader
287 self._loop = loop
288
Victor Stinneracdb7822014-07-14 18:33:40 +0200289 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100290 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200291 if self._reader is not None:
292 info.append('reader=%r' % self._reader)
293 return '<%s>' % ' '.join(info)
294
Guido van Rossum355491d2013-10-18 15:17:11 -0700295 @property
296 def transport(self):
297 return self._transport
298
299 def write(self, data):
300 self._transport.write(data)
301
302 def writelines(self, data):
303 self._transport.writelines(data)
304
305 def write_eof(self):
306 return self._transport.write_eof()
307
308 def can_write_eof(self):
309 return self._transport.can_write_eof()
310
Victor Stinner406204c2015-01-15 21:50:19 +0100311 def close(self):
312 return self._transport.close()
313
Guido van Rossum355491d2013-10-18 15:17:11 -0700314 def get_extra_info(self, name, default=None):
315 return self._transport.get_extra_info(name, default)
316
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200317 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700318 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200319 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700320
321 The intended use is to write
322
323 w.write(data)
324 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700325 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200326 if self._reader is not None:
327 exc = self._reader.exception()
328 if exc is not None:
329 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700330 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500331 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700332 # Yield to the event loop so connection_lost() may be
333 # called. Without this, _drain_helper() would return
334 # immediately, and code that calls
335 # write(...); yield from drain()
336 # in a loop would never call connection_lost(), so it
337 # would not see an error when the socket is closed.
338 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200339 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340
341
342class StreamReader:
343
344 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
345 # The line length limit is a security feature;
346 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500347
348 if limit <= 0:
349 raise ValueError('Limit cannot be <= 0')
350
Guido van Rossum355491d2013-10-18 15:17:11 -0700351 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100353 self._loop = events.get_event_loop()
354 else:
355 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500356 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100357 self._eof = False # Whether we're done.
358 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 self._exception = None
360 self._transport = None
361 self._paused = False
362
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200363 def __repr__(self):
364 info = ['StreamReader']
365 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300366 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200367 if self._eof:
368 info.append('eof')
369 if self._limit != _DEFAULT_LIMIT:
370 info.append('l=%d' % self._limit)
371 if self._waiter:
372 info.append('w=%r' % self._waiter)
373 if self._exception:
374 info.append('e=%r' % self._exception)
375 if self._transport:
376 info.append('t=%r' % self._transport)
377 if self._paused:
378 info.append('paused')
379 return '<%s>' % ' '.join(info)
380
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 def exception(self):
382 return self._exception
383
384 def set_exception(self, exc):
385 self._exception = exc
386
Guido van Rossum355491d2013-10-18 15:17:11 -0700387 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700389 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 if not waiter.cancelled():
391 waiter.set_exception(exc)
392
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100393 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500394 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100395 waiter = self._waiter
396 if waiter is not None:
397 self._waiter = None
398 if not waiter.cancelled():
399 waiter.set_result(None)
400
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 def set_transport(self, transport):
402 assert self._transport is None, 'Transport already set'
403 self._transport = transport
404
405 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500406 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700408 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700411 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100412 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
Yury Selivanovf0020f52014-02-06 00:14:30 -0500414 def at_eof(self):
415 """Return True if the buffer is empty and 'feed_eof' was called."""
416 return self._eof and not self._buffer
417
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500419 assert not self._eof, 'feed_data after feed_eof'
420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 if not data:
422 return
423
Yury Selivanove694c972014-02-05 18:11:13 -0500424 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100425 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
427 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400428 not self._paused and
429 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700431 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 except NotImplementedError:
433 # The transport can't be paused.
434 # We'll just have to buffer all data.
435 # Forget the transport so we don't keep trying.
436 self._transport = None
437 else:
438 self._paused = True
439
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100440 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100441 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500442 """Wait until feed_data() or feed_eof() is called.
443
444 If stream was paused, automatically resume it.
445 """
Victor Stinner183e3472014-01-23 17:40:03 +0100446 # StreamReader uses a future to link the protocol feed_data() method
447 # to a read coroutine. Running two read coroutines at the same time
448 # would have an unexpected behaviour. It would not possible to know
449 # which coroutine would get the next data.
450 if self._waiter is not None:
451 raise RuntimeError('%s() called while another coroutine is '
452 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100453
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500454 assert not self._eof, '_wait_for_data after EOF'
455
456 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400457 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500458 if self._paused:
459 self._paused = False
460 self._transport.resume_reading()
461
Yury Selivanov7661db62016-05-16 15:38:39 -0400462 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100463 try:
464 yield from self._waiter
465 finally:
466 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100467
Victor Stinnerf951d282014-06-29 00:46:45 +0200468 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500470 """Read chunk of data from the stream until newline (b'\n') is found.
471
472 On success, return chunk that ends with newline. If only partial
473 line can be read due to EOF, return incomplete line without
474 terminating newline. When EOF was reached while no bytes read, empty
475 bytes object is returned.
476
477 If limit is reached, ValueError will be raised. In that case, if
478 newline was found, complete line including newline will be removed
479 from internal buffer. Else, internal buffer will be cleared. Limit is
480 compared against part of the line without newline.
481
482 If stream was paused, this function will automatically resume it if
483 needed.
484 """
485 sep = b'\n'
486 seplen = len(sep)
487 try:
488 line = yield from self.readuntil(sep)
489 except IncompleteReadError as e:
490 return e.partial
491 except LimitOverrunError as e:
492 if self._buffer.startswith(sep, e.consumed):
493 del self._buffer[:e.consumed + seplen]
494 else:
495 self._buffer.clear()
496 self._maybe_resume_transport()
497 raise ValueError(e.args[0])
498 return line
499
500 @coroutine
501 def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400502 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500503
Yury Selivanovb4617912016-05-16 16:32:38 -0400504 On success, the data and separator will be removed from the
505 internal buffer (consumed). Returned data will include the
506 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500507
Yury Selivanovb4617912016-05-16 16:32:38 -0400508 Configured stream limit is used to check result. Limit sets the
509 maximal length of data that can be returned, not counting the
510 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500511
Yury Selivanovb4617912016-05-16 16:32:38 -0400512 If an EOF occurs and the complete separator is still not found,
513 an IncompleteReadError exception will be raised, and the internal
514 buffer will be reset. The IncompleteReadError.partial attribute
515 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500516
Yury Selivanovb4617912016-05-16 16:32:38 -0400517 If the data cannot be read because of over limit, a
518 LimitOverrunError exception will be raised, and the data
519 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500520 """
521 seplen = len(separator)
522 if seplen == 0:
523 raise ValueError('Separator should be at least one-byte string')
524
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 if self._exception is not None:
526 raise self._exception
527
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500528 # Consume whole buffer except last bytes, which length is
529 # one less than seplen. Let's check corner cases with
530 # separator='SEPARATOR':
531 # * we have received almost complete separator (without last
532 # byte). i.e buffer='some textSEPARATO'. In this case we
533 # can safely consume len(separator) - 1 bytes.
534 # * last byte of buffer is first byte of separator, i.e.
535 # buffer='abcdefghijklmnopqrS'. We may safely consume
536 # everything except that last byte, but this require to
537 # analyze bytes of buffer that match partial separator.
538 # This is slow and/or require FSM. For this case our
539 # implementation is not optimal, since require rescanning
540 # of data that is known to not belong to separator. In
541 # real world, separator will not be so long to notice
542 # performance problems. Even when reading MIME-encoded
543 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544
Yury Selivanovb4617912016-05-16 16:32:38 -0400545 # `offset` is the number of bytes from the beginning of the buffer
546 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500547 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500549 # Loop until we find `separator` in the buffer, exceed the buffer size,
550 # or an EOF has happened.
551 while True:
552 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500554 # Check if we now have enough data in the buffer for `separator` to
555 # fit.
556 if buflen - offset >= seplen:
557 isep = self._buffer.find(separator, offset)
558
559 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400560 # `separator` is in the buffer. `isep` will be used later
561 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500562 break
563
564 # see upper comment for explanation.
565 offset = buflen + 1 - seplen
566 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400567 raise LimitOverrunError(
568 'Separator is not found, and chunk exceed the limit',
569 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500570
571 # Complete message (with full separator) may be present in buffer
572 # even when EOF flag is set. This may happen when the last chunk
573 # adds data which makes separator be found. That's why we check for
574 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700575 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576 chunk = bytes(self._buffer)
577 self._buffer.clear()
578 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500580 # _wait_for_data() will resume reading if stream was paused.
581 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500583 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400584 raise LimitOverrunError(
585 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500586
587 chunk = self._buffer[:isep + seplen]
588 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500590 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591
Victor Stinnerf951d282014-06-29 00:46:45 +0200592 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500594 """Read up to `n` bytes from the stream.
595
596 If n is not provided, or set to -1, read until EOF and return all read
597 bytes. If the EOF was received and the internal buffer is empty, return
598 an empty bytes object.
599
Martin Panter0be894b2016-09-07 12:03:06 +0000600 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500601
602 If n is positive, this function try to read `n` bytes, and may return
603 less or equal bytes than requested, but at least one byte. If EOF was
604 received before any byte is read, this function returns empty byte
605 object.
606
Yury Selivanovb4617912016-05-16 16:32:38 -0400607 Returned value is not limited with limit, configured at stream
608 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500609
610 If stream was paused, this function will automatically resume it if
611 needed.
612 """
613
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 if self._exception is not None:
615 raise self._exception
616
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500617 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 return b''
619
620 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700621 # This used to just loop creating a new waiter hoping to
622 # collect everything in self._buffer, but that would
623 # deadlock if the subprocess sends more than self.limit
624 # bytes. So just call self.read(self._limit) until EOF.
625 blocks = []
626 while True:
627 block = yield from self.read(self._limit)
628 if not block:
629 break
630 blocks.append(block)
631 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500633 if not self._buffer and not self._eof:
634 yield from self._wait_for_data('read')
635
636 # This will work right even if buffer is less than n bytes
637 data = bytes(self._buffer[:n])
638 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639
Yury Selivanove694c972014-02-05 18:11:13 -0500640 self._maybe_resume_transport()
641 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642
Victor Stinnerf951d282014-06-29 00:46:45 +0200643 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500645 """Read exactly `n` bytes.
646
Yury Selivanovb4617912016-05-16 16:32:38 -0400647 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
648 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500649 contain the partial read bytes.
650
651 if n is zero, return empty bytes object.
652
Yury Selivanovb4617912016-05-16 16:32:38 -0400653 Returned value is not limited with limit, configured at stream
654 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500655
656 If stream was paused, this function will automatically resume it if
657 needed.
658 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500659 if n < 0:
660 raise ValueError('readexactly size can not be less than zero')
661
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 if self._exception is not None:
663 raise self._exception
664
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500665 if n == 0:
666 return b''
667
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400668 while len(self._buffer) < n:
669 if self._eof:
670 incomplete = bytes(self._buffer)
671 self._buffer.clear()
672 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400674 yield from self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400676 if len(self._buffer) == n:
677 data = bytes(self._buffer)
678 self._buffer.clear()
679 else:
680 data = bytes(self._buffer[:n])
681 del self._buffer[:n]
682 self._maybe_resume_transport()
683 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400684
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400685 def __aiter__(self):
686 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400687
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400688 @coroutine
689 def __anext__(self):
690 val = yield from self.readline()
691 if val == b'':
692 raise StopAsyncIteration
693 return val