blob: 0008d514508a90472f2fbda2ce5f6f49de5ad2df [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
17from . import futures
18from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020019from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
23_DEFAULT_LIMIT = 2**16
24
Guido van Rossuma849be92014-01-30 16:05:28 -080025
Victor Stinner8dffc452014-01-25 15:32:06 +010026class IncompleteReadError(EOFError):
27 """
28 Incomplete read error. Attributes:
29
30 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050031 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010032 """
33 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050034 super().__init__("%d bytes read on a total of %r expected bytes"
35 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010036 self.partial = partial
37 self.expected = expected
38
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
Yury Selivanovd9d0e862016-01-11 12:28:19 -050040class LimitOverrunError(Exception):
41 """Reached buffer limit while looking for the separator.
42
43 Attributes:
44 - message: error message
45 - consumed: total number of bytes that should be consumed
46 """
47 def __init__(self, message, consumed):
48 super().__init__(message)
49 self.message = message
50 self.consumed = consumed
51
52
Victor Stinnerf951d282014-06-29 00:46:45 +020053@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054def open_connection(host=None, port=None, *,
55 loop=None, limit=_DEFAULT_LIMIT, **kwds):
56 """A wrapper for create_connection() returning a (reader, writer) pair.
57
58 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010059 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060
61 The arguments are all the usual arguments to create_connection()
62 except protocol_factory; most common are positional host and port,
63 with various optional keyword arguments following.
64
65 Additional optional keyword arguments are loop (to set the event loop
66 instance to use) and limit (to set the buffer limit passed to the
67 StreamReader).
68
69 (If you want to customize the StreamReader and/or
70 StreamReaderProtocol classes, just copy the code -- there's
71 really nothing special here except some convenience.)
72 """
73 if loop is None:
74 loop = events.get_event_loop()
75 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080076 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 transport, _ = yield from loop.create_connection(
78 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070079 writer = StreamWriter(transport, protocol, reader, loop)
80 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82
Victor Stinnerf951d282014-06-29 00:46:45 +020083@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080084def start_server(client_connected_cb, host=None, port=None, *,
85 loop=None, limit=_DEFAULT_LIMIT, **kwds):
86 """Start a socket server, call back for each client connected.
87
88 The first parameter, `client_connected_cb`, takes two parameters:
89 client_reader, client_writer. client_reader is a StreamReader
90 object, while client_writer is a StreamWriter object. This
91 parameter can either be a plain callback function or a coroutine;
92 if it is a coroutine, it will be automatically converted into a
93 Task.
94
95 The rest of the arguments are all the usual arguments to
96 loop.create_server() except protocol_factory; most common are
97 positional host and port, with various optional keyword arguments
98 following. The return value is the same as loop.create_server().
99
100 Additional optional keyword arguments are loop (to set the event loop
101 instance to use) and limit (to set the buffer limit passed to the
102 StreamReader).
103
104 The return value is the same as loop.create_server(), i.e. a
105 Server object which can be used to stop the service.
106 """
107 if loop is None:
108 loop = events.get_event_loop()
109
110 def factory():
111 reader = StreamReader(limit=limit, loop=loop)
112 protocol = StreamReaderProtocol(reader, client_connected_cb,
113 loop=loop)
114 return protocol
115
116 return (yield from loop.create_server(factory, host, port, **kwds))
117
118
Yury Selivanovb057c522014-02-18 12:15:06 -0500119if hasattr(socket, 'AF_UNIX'):
120 # UNIX Domain Sockets are supported on this platform
121
Victor Stinnerf951d282014-06-29 00:46:45 +0200122 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500123 def open_unix_connection(path=None, *,
124 loop=None, limit=_DEFAULT_LIMIT, **kwds):
125 """Similar to `open_connection` but works with UNIX Domain Sockets."""
126 if loop is None:
127 loop = events.get_event_loop()
128 reader = StreamReader(limit=limit, loop=loop)
129 protocol = StreamReaderProtocol(reader, loop=loop)
130 transport, _ = yield from loop.create_unix_connection(
131 lambda: protocol, path, **kwds)
132 writer = StreamWriter(transport, protocol, reader, loop)
133 return reader, writer
134
135
Victor Stinnerf951d282014-06-29 00:46:45 +0200136 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500137 def start_unix_server(client_connected_cb, path=None, *,
138 loop=None, limit=_DEFAULT_LIMIT, **kwds):
139 """Similar to `start_server` but works with UNIX Domain Sockets."""
140 if loop is None:
141 loop = events.get_event_loop()
142
143 def factory():
144 reader = StreamReader(limit=limit, loop=loop)
145 protocol = StreamReaderProtocol(reader, client_connected_cb,
146 loop=loop)
147 return protocol
148
149 return (yield from loop.create_unix_server(factory, path, **kwds))
150
151
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152class FlowControlMixin(protocols.Protocol):
153 """Reusable flow control logic for StreamWriter.drain().
154
155 This implements the protocol methods pause_writing(),
156 resume_reading() and connection_lost(). If the subclass overrides
157 these it must call the super methods.
158
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200159 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800160 """
161
162 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100163 if loop is None:
164 self._loop = events.get_event_loop()
165 else:
166 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800167 self._paused = False
168 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200169 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800170
171 def pause_writing(self):
172 assert not self._paused
173 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200174 if self._loop.get_debug():
175 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800176
177 def resume_writing(self):
178 assert self._paused
179 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200180 if self._loop.get_debug():
181 logger.debug("%r resumes writing", self)
182
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800183 waiter = self._drain_waiter
184 if waiter is not None:
185 self._drain_waiter = None
186 if not waiter.done():
187 waiter.set_result(None)
188
189 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200190 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800191 # Wake up the writer if currently paused.
192 if not self._paused:
193 return
194 waiter = self._drain_waiter
195 if waiter is None:
196 return
197 self._drain_waiter = None
198 if waiter.done():
199 return
200 if exc is None:
201 waiter.set_result(None)
202 else:
203 waiter.set_exception(exc)
204
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200205 @coroutine
206 def _drain_helper(self):
207 if self._connection_lost:
208 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800209 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200210 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800211 waiter = self._drain_waiter
212 assert waiter is None or waiter.cancelled()
213 waiter = futures.Future(loop=self._loop)
214 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200215 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800216
217
218class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
219 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220
221 (This is a helper class instead of making StreamReader itself a
222 Protocol subclass, because the StreamReader has other potential
223 uses, and to prevent the user of the StreamReader to accidentally
224 call inappropriate methods of the protocol.)
225 """
226
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800228 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700229 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800230 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800235 if self._client_connected_cb is not None:
236 self._stream_writer = StreamWriter(transport, self,
237 self._stream_reader,
238 self._loop)
239 res = self._client_connected_cb(self._stream_reader,
240 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200241 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200242 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243
244 def connection_lost(self, exc):
245 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700246 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700248 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800249 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
251 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700252 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253
254 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700255 self._stream_reader.feed_eof()
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200256 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700257
Guido van Rossum355491d2013-10-18 15:17:11 -0700258
259class StreamWriter:
260 """Wraps a Transport.
261
262 This exposes write(), writelines(), [can_]write_eof(),
263 get_extra_info() and close(). It adds drain() which returns an
264 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800265 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700266 directly.
267 """
268
269 def __init__(self, transport, protocol, reader, loop):
270 self._transport = transport
271 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000272 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200273 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700274 self._reader = reader
275 self._loop = loop
276
Victor Stinneracdb7822014-07-14 18:33:40 +0200277 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100278 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200279 if self._reader is not None:
280 info.append('reader=%r' % self._reader)
281 return '<%s>' % ' '.join(info)
282
Guido van Rossum355491d2013-10-18 15:17:11 -0700283 @property
284 def transport(self):
285 return self._transport
286
287 def write(self, data):
288 self._transport.write(data)
289
290 def writelines(self, data):
291 self._transport.writelines(data)
292
293 def write_eof(self):
294 return self._transport.write_eof()
295
296 def can_write_eof(self):
297 return self._transport.can_write_eof()
298
Victor Stinner406204c2015-01-15 21:50:19 +0100299 def close(self):
300 return self._transport.close()
301
Guido van Rossum355491d2013-10-18 15:17:11 -0700302 def get_extra_info(self, name, default=None):
303 return self._transport.get_extra_info(name, default)
304
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200305 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700306 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200307 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700308
309 The intended use is to write
310
311 w.write(data)
312 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700313 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200314 if self._reader is not None:
315 exc = self._reader.exception()
316 if exc is not None:
317 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700318 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500319 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700320 # Yield to the event loop so connection_lost() may be
321 # called. Without this, _drain_helper() would return
322 # immediately, and code that calls
323 # write(...); yield from drain()
324 # in a loop would never call connection_lost(), so it
325 # would not see an error when the socket is closed.
326 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200327 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328
329
330class StreamReader:
331
332 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
333 # The line length limit is a security feature;
334 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500335
336 if limit <= 0:
337 raise ValueError('Limit cannot be <= 0')
338
Guido van Rossum355491d2013-10-18 15:17:11 -0700339 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100341 self._loop = events.get_event_loop()
342 else:
343 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500344 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100345 self._eof = False # Whether we're done.
346 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._exception = None
348 self._transport = None
349 self._paused = False
350
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200351 def __repr__(self):
352 info = ['StreamReader']
353 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300354 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200355 if self._eof:
356 info.append('eof')
357 if self._limit != _DEFAULT_LIMIT:
358 info.append('l=%d' % self._limit)
359 if self._waiter:
360 info.append('w=%r' % self._waiter)
361 if self._exception:
362 info.append('e=%r' % self._exception)
363 if self._transport:
364 info.append('t=%r' % self._transport)
365 if self._paused:
366 info.append('paused')
367 return '<%s>' % ' '.join(info)
368
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 def exception(self):
370 return self._exception
371
372 def set_exception(self, exc):
373 self._exception = exc
374
Guido van Rossum355491d2013-10-18 15:17:11 -0700375 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700377 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 if not waiter.cancelled():
379 waiter.set_exception(exc)
380
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100381 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500382 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100383 waiter = self._waiter
384 if waiter is not None:
385 self._waiter = None
386 if not waiter.cancelled():
387 waiter.set_result(None)
388
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 def set_transport(self, transport):
390 assert self._transport is None, 'Transport already set'
391 self._transport = transport
392
393 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500394 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700396 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
398 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700399 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100400 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401
Yury Selivanovf0020f52014-02-06 00:14:30 -0500402 def at_eof(self):
403 """Return True if the buffer is empty and 'feed_eof' was called."""
404 return self._eof and not self._buffer
405
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500407 assert not self._eof, 'feed_data after feed_eof'
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 if not data:
410 return
411
Yury Selivanove694c972014-02-05 18:11:13 -0500412 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100413 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
415 if (self._transport is not None and
416 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500417 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700419 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 except NotImplementedError:
421 # The transport can't be paused.
422 # We'll just have to buffer all data.
423 # Forget the transport so we don't keep trying.
424 self._transport = None
425 else:
426 self._paused = True
427
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100428 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100429 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500430 """Wait until feed_data() or feed_eof() is called.
431
432 If stream was paused, automatically resume it.
433 """
Victor Stinner183e3472014-01-23 17:40:03 +0100434 # StreamReader uses a future to link the protocol feed_data() method
435 # to a read coroutine. Running two read coroutines at the same time
436 # would have an unexpected behaviour. It would not possible to know
437 # which coroutine would get the next data.
438 if self._waiter is not None:
439 raise RuntimeError('%s() called while another coroutine is '
440 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100441
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500442 assert not self._eof, '_wait_for_data after EOF'
443
444 # Waiting for data while paused will make deadlock, so prevent it.
445 if self._paused:
446 self._paused = False
447 self._transport.resume_reading()
448
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100449 self._waiter = futures.Future(loop=self._loop)
450 try:
451 yield from self._waiter
452 finally:
453 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100454
Victor Stinnerf951d282014-06-29 00:46:45 +0200455 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500457 """Read chunk of data from the stream until newline (b'\n') is found.
458
459 On success, return chunk that ends with newline. If only partial
460 line can be read due to EOF, return incomplete line without
461 terminating newline. When EOF was reached while no bytes read, empty
462 bytes object is returned.
463
464 If limit is reached, ValueError will be raised. In that case, if
465 newline was found, complete line including newline will be removed
466 from internal buffer. Else, internal buffer will be cleared. Limit is
467 compared against part of the line without newline.
468
469 If stream was paused, this function will automatically resume it if
470 needed.
471 """
472 sep = b'\n'
473 seplen = len(sep)
474 try:
475 line = yield from self.readuntil(sep)
476 except IncompleteReadError as e:
477 return e.partial
478 except LimitOverrunError as e:
479 if self._buffer.startswith(sep, e.consumed):
480 del self._buffer[:e.consumed + seplen]
481 else:
482 self._buffer.clear()
483 self._maybe_resume_transport()
484 raise ValueError(e.args[0])
485 return line
486
487 @coroutine
488 def readuntil(self, separator=b'\n'):
489 """Read chunk of data from the stream until `separator` is found.
490
491 On success, chunk and its separator will be removed from internal buffer
492 (i.e. consumed). Returned chunk will include separator at the end.
493
494 Configured stream limit is used to check result. Limit means maximal
495 length of chunk that can be returned, not counting the separator.
496
497 If EOF occurs and complete separator still not found,
498 IncompleteReadError(<partial data>, None) will be raised and internal
499 buffer becomes empty. This partial data may contain a partial separator.
500
501 If chunk cannot be read due to overlimit, LimitOverrunError will be raised
502 and data will be left in internal buffer, so it can be read again, in
503 some different way.
504
505 If stream was paused, this function will automatically resume it if
506 needed.
507 """
508 seplen = len(separator)
509 if seplen == 0:
510 raise ValueError('Separator should be at least one-byte string')
511
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 if self._exception is not None:
513 raise self._exception
514
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500515 # Consume whole buffer except last bytes, which length is
516 # one less than seplen. Let's check corner cases with
517 # separator='SEPARATOR':
518 # * we have received almost complete separator (without last
519 # byte). i.e buffer='some textSEPARATO'. In this case we
520 # can safely consume len(separator) - 1 bytes.
521 # * last byte of buffer is first byte of separator, i.e.
522 # buffer='abcdefghijklmnopqrS'. We may safely consume
523 # everything except that last byte, but this require to
524 # analyze bytes of buffer that match partial separator.
525 # This is slow and/or require FSM. For this case our
526 # implementation is not optimal, since require rescanning
527 # of data that is known to not belong to separator. In
528 # real world, separator will not be so long to notice
529 # performance problems. Even when reading MIME-encoded
530 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500532 # `offset` is the number of bytes from the beginning of the buffer where
533 # is no occurrence of `separator`.
534 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500536 # Loop until we find `separator` in the buffer, exceed the buffer size,
537 # or an EOF has happened.
538 while True:
539 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500541 # Check if we now have enough data in the buffer for `separator` to
542 # fit.
543 if buflen - offset >= seplen:
544 isep = self._buffer.find(separator, offset)
545
546 if isep != -1:
547 # `separator` is in the buffer. `isep` will be used later to
548 # retrieve the data.
549 break
550
551 # see upper comment for explanation.
552 offset = buflen + 1 - seplen
553 if offset > self._limit:
554 raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
555
556 # Complete message (with full separator) may be present in buffer
557 # even when EOF flag is set. This may happen when the last chunk
558 # adds data which makes separator be found. That's why we check for
559 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700560 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561 chunk = bytes(self._buffer)
562 self._buffer.clear()
563 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500565 # _wait_for_data() will resume reading if stream was paused.
566 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500568 if isep > self._limit:
569 raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
570
571 chunk = self._buffer[:isep + seplen]
572 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500574 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
Victor Stinnerf951d282014-06-29 00:46:45 +0200576 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500578 """Read up to `n` bytes from the stream.
579
580 If n is not provided, or set to -1, read until EOF and return all read
581 bytes. If the EOF was received and the internal buffer is empty, return
582 an empty bytes object.
583
584 If n is zero, return empty bytes object immediatelly.
585
586 If n is positive, this function try to read `n` bytes, and may return
587 less or equal bytes than requested, but at least one byte. If EOF was
588 received before any byte is read, this function returns empty byte
589 object.
590
591 Returned value is not limited with limit, configured at stream creation.
592
593 If stream was paused, this function will automatically resume it if
594 needed.
595 """
596
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 if self._exception is not None:
598 raise self._exception
599
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500600 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 return b''
602
603 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700604 # This used to just loop creating a new waiter hoping to
605 # collect everything in self._buffer, but that would
606 # deadlock if the subprocess sends more than self.limit
607 # bytes. So just call self.read(self._limit) until EOF.
608 blocks = []
609 while True:
610 block = yield from self.read(self._limit)
611 if not block:
612 break
613 blocks.append(block)
614 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500616 if not self._buffer and not self._eof:
617 yield from self._wait_for_data('read')
618
619 # This will work right even if buffer is less than n bytes
620 data = bytes(self._buffer[:n])
621 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622
Yury Selivanove694c972014-02-05 18:11:13 -0500623 self._maybe_resume_transport()
624 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
Victor Stinnerf951d282014-06-29 00:46:45 +0200626 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500628 """Read exactly `n` bytes.
629
630 Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
631 read. The `IncompleteReadError.partial` attribute of the exception will
632 contain the partial read bytes.
633
634 if n is zero, return empty bytes object.
635
636 Returned value is not limited with limit, configured at stream creation.
637
638 If stream was paused, this function will automatically resume it if
639 needed.
640 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500641 if n < 0:
642 raise ValueError('readexactly size can not be less than zero')
643
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644 if self._exception is not None:
645 raise self._exception
646
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500647 if n == 0:
648 return b''
649
Guido van Rossum38455212014-01-06 16:09:18 -0800650 # There used to be "optimized" code here. It created its own
651 # Future and waited until self._buffer had at least the n
652 # bytes, then called read(n). Unfortunately, this could pause
653 # the transport if the argument was larger than the pause
654 # limit (which is twice self._limit). So now we just read()
655 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
Guido van Rossum38455212014-01-06 16:09:18 -0800657 blocks = []
658 while n > 0:
659 block = yield from self.read(n)
660 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100661 partial = b''.join(blocks)
662 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800663 blocks.append(block)
664 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500666 assert n == 0
667
Guido van Rossum38455212014-01-06 16:09:18 -0800668 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400669
Victor Stinner71080fc2015-07-25 02:23:21 +0200670 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400671 @coroutine
672 def __aiter__(self):
673 return self
674
675 @coroutine
676 def __anext__(self):
677 val = yield from self.readline()
678 if val == b'':
679 raise StopAsyncIteration
680 return val