blob: b7b0485aa0bf0cf6da4b16b0864dc7286709033c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
17from . import futures
18from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020019from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
23_DEFAULT_LIMIT = 2**16
24
Guido van Rossuma849be92014-01-30 16:05:28 -080025
Victor Stinner8dffc452014-01-25 15:32:06 +010026class IncompleteReadError(EOFError):
27 """
28 Incomplete read error. Attributes:
29
30 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050031 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010032 """
33 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050034 super().__init__("%d bytes read on a total of %r expected bytes"
35 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010036 self.partial = partial
37 self.expected = expected
38
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
Yury Selivanovd9d0e862016-01-11 12:28:19 -050040class LimitOverrunError(Exception):
41 """Reached buffer limit while looking for the separator.
42
43 Attributes:
44 - message: error message
45 - consumed: total number of bytes that should be consumed
46 """
47 def __init__(self, message, consumed):
48 super().__init__(message)
49 self.message = message
50 self.consumed = consumed
51
52
Victor Stinnerf951d282014-06-29 00:46:45 +020053@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054def open_connection(host=None, port=None, *,
55 loop=None, limit=_DEFAULT_LIMIT, **kwds):
56 """A wrapper for create_connection() returning a (reader, writer) pair.
57
58 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010059 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060
61 The arguments are all the usual arguments to create_connection()
62 except protocol_factory; most common are positional host and port,
63 with various optional keyword arguments following.
64
65 Additional optional keyword arguments are loop (to set the event loop
66 instance to use) and limit (to set the buffer limit passed to the
67 StreamReader).
68
69 (If you want to customize the StreamReader and/or
70 StreamReaderProtocol classes, just copy the code -- there's
71 really nothing special here except some convenience.)
72 """
73 if loop is None:
74 loop = events.get_event_loop()
75 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080076 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 transport, _ = yield from loop.create_connection(
78 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070079 writer = StreamWriter(transport, protocol, reader, loop)
80 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82
Victor Stinnerf951d282014-06-29 00:46:45 +020083@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080084def start_server(client_connected_cb, host=None, port=None, *,
85 loop=None, limit=_DEFAULT_LIMIT, **kwds):
86 """Start a socket server, call back for each client connected.
87
88 The first parameter, `client_connected_cb`, takes two parameters:
89 client_reader, client_writer. client_reader is a StreamReader
90 object, while client_writer is a StreamWriter object. This
91 parameter can either be a plain callback function or a coroutine;
92 if it is a coroutine, it will be automatically converted into a
93 Task.
94
95 The rest of the arguments are all the usual arguments to
96 loop.create_server() except protocol_factory; most common are
97 positional host and port, with various optional keyword arguments
98 following. The return value is the same as loop.create_server().
99
100 Additional optional keyword arguments are loop (to set the event loop
101 instance to use) and limit (to set the buffer limit passed to the
102 StreamReader).
103
104 The return value is the same as loop.create_server(), i.e. a
105 Server object which can be used to stop the service.
106 """
107 if loop is None:
108 loop = events.get_event_loop()
109
110 def factory():
111 reader = StreamReader(limit=limit, loop=loop)
112 protocol = StreamReaderProtocol(reader, client_connected_cb,
113 loop=loop)
114 return protocol
115
116 return (yield from loop.create_server(factory, host, port, **kwds))
117
118
Yury Selivanovb057c522014-02-18 12:15:06 -0500119if hasattr(socket, 'AF_UNIX'):
120 # UNIX Domain Sockets are supported on this platform
121
Victor Stinnerf951d282014-06-29 00:46:45 +0200122 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500123 def open_unix_connection(path=None, *,
124 loop=None, limit=_DEFAULT_LIMIT, **kwds):
125 """Similar to `open_connection` but works with UNIX Domain Sockets."""
126 if loop is None:
127 loop = events.get_event_loop()
128 reader = StreamReader(limit=limit, loop=loop)
129 protocol = StreamReaderProtocol(reader, loop=loop)
130 transport, _ = yield from loop.create_unix_connection(
131 lambda: protocol, path, **kwds)
132 writer = StreamWriter(transport, protocol, reader, loop)
133 return reader, writer
134
135
Victor Stinnerf951d282014-06-29 00:46:45 +0200136 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500137 def start_unix_server(client_connected_cb, path=None, *,
138 loop=None, limit=_DEFAULT_LIMIT, **kwds):
139 """Similar to `start_server` but works with UNIX Domain Sockets."""
140 if loop is None:
141 loop = events.get_event_loop()
142
143 def factory():
144 reader = StreamReader(limit=limit, loop=loop)
145 protocol = StreamReaderProtocol(reader, client_connected_cb,
146 loop=loop)
147 return protocol
148
149 return (yield from loop.create_unix_server(factory, path, **kwds))
150
151
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152class FlowControlMixin(protocols.Protocol):
153 """Reusable flow control logic for StreamWriter.drain().
154
155 This implements the protocol methods pause_writing(),
156 resume_reading() and connection_lost(). If the subclass overrides
157 these it must call the super methods.
158
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200159 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800160 """
161
162 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100163 if loop is None:
164 self._loop = events.get_event_loop()
165 else:
166 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800167 self._paused = False
168 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200169 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800170
171 def pause_writing(self):
172 assert not self._paused
173 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200174 if self._loop.get_debug():
175 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800176
177 def resume_writing(self):
178 assert self._paused
179 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200180 if self._loop.get_debug():
181 logger.debug("%r resumes writing", self)
182
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800183 waiter = self._drain_waiter
184 if waiter is not None:
185 self._drain_waiter = None
186 if not waiter.done():
187 waiter.set_result(None)
188
189 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200190 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800191 # Wake up the writer if currently paused.
192 if not self._paused:
193 return
194 waiter = self._drain_waiter
195 if waiter is None:
196 return
197 self._drain_waiter = None
198 if waiter.done():
199 return
200 if exc is None:
201 waiter.set_result(None)
202 else:
203 waiter.set_exception(exc)
204
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200205 @coroutine
206 def _drain_helper(self):
207 if self._connection_lost:
208 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800209 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200210 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800211 waiter = self._drain_waiter
212 assert waiter is None or waiter.cancelled()
213 waiter = futures.Future(loop=self._loop)
214 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200215 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800216
217
218class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
219 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220
221 (This is a helper class instead of making StreamReader itself a
222 Protocol subclass, because the StreamReader has other potential
223 uses, and to prevent the user of the StreamReader to accidentally
224 call inappropriate methods of the protocol.)
225 """
226
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800228 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700229 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800230 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800235 if self._client_connected_cb is not None:
236 self._stream_writer = StreamWriter(transport, self,
237 self._stream_reader,
238 self._loop)
239 res = self._client_connected_cb(self._stream_reader,
240 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200241 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200242 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243
244 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400245 if self._stream_reader is not None:
246 if exc is None:
247 self._stream_reader.feed_eof()
248 else:
249 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800250 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400251 self._stream_reader = None
252 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253
254 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700255 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256
257 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700258 self._stream_reader.feed_eof()
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200259 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700260
Guido van Rossum355491d2013-10-18 15:17:11 -0700261
262class StreamWriter:
263 """Wraps a Transport.
264
265 This exposes write(), writelines(), [can_]write_eof(),
266 get_extra_info() and close(). It adds drain() which returns an
267 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800268 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700269 directly.
270 """
271
272 def __init__(self, transport, protocol, reader, loop):
273 self._transport = transport
274 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000275 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200276 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700277 self._reader = reader
278 self._loop = loop
279
Victor Stinneracdb7822014-07-14 18:33:40 +0200280 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100281 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200282 if self._reader is not None:
283 info.append('reader=%r' % self._reader)
284 return '<%s>' % ' '.join(info)
285
Guido van Rossum355491d2013-10-18 15:17:11 -0700286 @property
287 def transport(self):
288 return self._transport
289
290 def write(self, data):
291 self._transport.write(data)
292
293 def writelines(self, data):
294 self._transport.writelines(data)
295
296 def write_eof(self):
297 return self._transport.write_eof()
298
299 def can_write_eof(self):
300 return self._transport.can_write_eof()
301
Victor Stinner406204c2015-01-15 21:50:19 +0100302 def close(self):
303 return self._transport.close()
304
Guido van Rossum355491d2013-10-18 15:17:11 -0700305 def get_extra_info(self, name, default=None):
306 return self._transport.get_extra_info(name, default)
307
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200308 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700309 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200310 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700311
312 The intended use is to write
313
314 w.write(data)
315 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700316 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200317 if self._reader is not None:
318 exc = self._reader.exception()
319 if exc is not None:
320 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700321 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500322 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700323 # Yield to the event loop so connection_lost() may be
324 # called. Without this, _drain_helper() would return
325 # immediately, and code that calls
326 # write(...); yield from drain()
327 # in a loop would never call connection_lost(), so it
328 # would not see an error when the socket is closed.
329 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200330 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331
332
333class StreamReader:
334
335 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
336 # The line length limit is a security feature;
337 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500338
339 if limit <= 0:
340 raise ValueError('Limit cannot be <= 0')
341
Guido van Rossum355491d2013-10-18 15:17:11 -0700342 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100344 self._loop = events.get_event_loop()
345 else:
346 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500347 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100348 self._eof = False # Whether we're done.
349 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self._exception = None
351 self._transport = None
352 self._paused = False
353
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200354 def __repr__(self):
355 info = ['StreamReader']
356 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300357 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200358 if self._eof:
359 info.append('eof')
360 if self._limit != _DEFAULT_LIMIT:
361 info.append('l=%d' % self._limit)
362 if self._waiter:
363 info.append('w=%r' % self._waiter)
364 if self._exception:
365 info.append('e=%r' % self._exception)
366 if self._transport:
367 info.append('t=%r' % self._transport)
368 if self._paused:
369 info.append('paused')
370 return '<%s>' % ' '.join(info)
371
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 def exception(self):
373 return self._exception
374
375 def set_exception(self, exc):
376 self._exception = exc
377
Guido van Rossum355491d2013-10-18 15:17:11 -0700378 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700380 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if not waiter.cancelled():
382 waiter.set_exception(exc)
383
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100384 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500385 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100386 waiter = self._waiter
387 if waiter is not None:
388 self._waiter = None
389 if not waiter.cancelled():
390 waiter.set_result(None)
391
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 def set_transport(self, transport):
393 assert self._transport is None, 'Transport already set'
394 self._transport = transport
395
396 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500397 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700399 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700402 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100403 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Yury Selivanovf0020f52014-02-06 00:14:30 -0500405 def at_eof(self):
406 """Return True if the buffer is empty and 'feed_eof' was called."""
407 return self._eof and not self._buffer
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500410 assert not self._eof, 'feed_data after feed_eof'
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 if not data:
413 return
414
Yury Selivanove694c972014-02-05 18:11:13 -0500415 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100416 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417
418 if (self._transport is not None and
419 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500420 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700422 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 except NotImplementedError:
424 # The transport can't be paused.
425 # We'll just have to buffer all data.
426 # Forget the transport so we don't keep trying.
427 self._transport = None
428 else:
429 self._paused = True
430
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100431 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100432 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500433 """Wait until feed_data() or feed_eof() is called.
434
435 If stream was paused, automatically resume it.
436 """
Victor Stinner183e3472014-01-23 17:40:03 +0100437 # StreamReader uses a future to link the protocol feed_data() method
438 # to a read coroutine. Running two read coroutines at the same time
439 # would have an unexpected behaviour. It would not possible to know
440 # which coroutine would get the next data.
441 if self._waiter is not None:
442 raise RuntimeError('%s() called while another coroutine is '
443 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100444
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500445 assert not self._eof, '_wait_for_data after EOF'
446
447 # Waiting for data while paused will make deadlock, so prevent it.
448 if self._paused:
449 self._paused = False
450 self._transport.resume_reading()
451
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100452 self._waiter = futures.Future(loop=self._loop)
453 try:
454 yield from self._waiter
455 finally:
456 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100457
Victor Stinnerf951d282014-06-29 00:46:45 +0200458 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500460 """Read chunk of data from the stream until newline (b'\n') is found.
461
462 On success, return chunk that ends with newline. If only partial
463 line can be read due to EOF, return incomplete line without
464 terminating newline. When EOF was reached while no bytes read, empty
465 bytes object is returned.
466
467 If limit is reached, ValueError will be raised. In that case, if
468 newline was found, complete line including newline will be removed
469 from internal buffer. Else, internal buffer will be cleared. Limit is
470 compared against part of the line without newline.
471
472 If stream was paused, this function will automatically resume it if
473 needed.
474 """
475 sep = b'\n'
476 seplen = len(sep)
477 try:
478 line = yield from self.readuntil(sep)
479 except IncompleteReadError as e:
480 return e.partial
481 except LimitOverrunError as e:
482 if self._buffer.startswith(sep, e.consumed):
483 del self._buffer[:e.consumed + seplen]
484 else:
485 self._buffer.clear()
486 self._maybe_resume_transport()
487 raise ValueError(e.args[0])
488 return line
489
490 @coroutine
491 def readuntil(self, separator=b'\n'):
492 """Read chunk of data from the stream until `separator` is found.
493
494 On success, chunk and its separator will be removed from internal buffer
495 (i.e. consumed). Returned chunk will include separator at the end.
496
497 Configured stream limit is used to check result. Limit means maximal
498 length of chunk that can be returned, not counting the separator.
499
500 If EOF occurs and complete separator still not found,
501 IncompleteReadError(<partial data>, None) will be raised and internal
502 buffer becomes empty. This partial data may contain a partial separator.
503
504 If chunk cannot be read due to overlimit, LimitOverrunError will be raised
505 and data will be left in internal buffer, so it can be read again, in
506 some different way.
507
508 If stream was paused, this function will automatically resume it if
509 needed.
510 """
511 seplen = len(separator)
512 if seplen == 0:
513 raise ValueError('Separator should be at least one-byte string')
514
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 if self._exception is not None:
516 raise self._exception
517
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500518 # Consume whole buffer except last bytes, which length is
519 # one less than seplen. Let's check corner cases with
520 # separator='SEPARATOR':
521 # * we have received almost complete separator (without last
522 # byte). i.e buffer='some textSEPARATO'. In this case we
523 # can safely consume len(separator) - 1 bytes.
524 # * last byte of buffer is first byte of separator, i.e.
525 # buffer='abcdefghijklmnopqrS'. We may safely consume
526 # everything except that last byte, but this require to
527 # analyze bytes of buffer that match partial separator.
528 # This is slow and/or require FSM. For this case our
529 # implementation is not optimal, since require rescanning
530 # of data that is known to not belong to separator. In
531 # real world, separator will not be so long to notice
532 # performance problems. Even when reading MIME-encoded
533 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500535 # `offset` is the number of bytes from the beginning of the buffer where
536 # is no occurrence of `separator`.
537 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500539 # Loop until we find `separator` in the buffer, exceed the buffer size,
540 # or an EOF has happened.
541 while True:
542 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500544 # Check if we now have enough data in the buffer for `separator` to
545 # fit.
546 if buflen - offset >= seplen:
547 isep = self._buffer.find(separator, offset)
548
549 if isep != -1:
550 # `separator` is in the buffer. `isep` will be used later to
551 # retrieve the data.
552 break
553
554 # see upper comment for explanation.
555 offset = buflen + 1 - seplen
556 if offset > self._limit:
557 raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
558
559 # Complete message (with full separator) may be present in buffer
560 # even when EOF flag is set. This may happen when the last chunk
561 # adds data which makes separator be found. That's why we check for
562 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700563 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500564 chunk = bytes(self._buffer)
565 self._buffer.clear()
566 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500568 # _wait_for_data() will resume reading if stream was paused.
569 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500571 if isep > self._limit:
572 raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
573
574 chunk = self._buffer[:isep + seplen]
575 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500577 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578
Victor Stinnerf951d282014-06-29 00:46:45 +0200579 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500581 """Read up to `n` bytes from the stream.
582
583 If n is not provided, or set to -1, read until EOF and return all read
584 bytes. If the EOF was received and the internal buffer is empty, return
585 an empty bytes object.
586
587 If n is zero, return empty bytes object immediatelly.
588
589 If n is positive, this function try to read `n` bytes, and may return
590 less or equal bytes than requested, but at least one byte. If EOF was
591 received before any byte is read, this function returns empty byte
592 object.
593
594 Returned value is not limited with limit, configured at stream creation.
595
596 If stream was paused, this function will automatically resume it if
597 needed.
598 """
599
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 if self._exception is not None:
601 raise self._exception
602
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500603 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 return b''
605
606 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700607 # This used to just loop creating a new waiter hoping to
608 # collect everything in self._buffer, but that would
609 # deadlock if the subprocess sends more than self.limit
610 # bytes. So just call self.read(self._limit) until EOF.
611 blocks = []
612 while True:
613 block = yield from self.read(self._limit)
614 if not block:
615 break
616 blocks.append(block)
617 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500619 if not self._buffer and not self._eof:
620 yield from self._wait_for_data('read')
621
622 # This will work right even if buffer is less than n bytes
623 data = bytes(self._buffer[:n])
624 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
Yury Selivanove694c972014-02-05 18:11:13 -0500626 self._maybe_resume_transport()
627 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
Victor Stinnerf951d282014-06-29 00:46:45 +0200629 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500631 """Read exactly `n` bytes.
632
633 Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
634 read. The `IncompleteReadError.partial` attribute of the exception will
635 contain the partial read bytes.
636
637 if n is zero, return empty bytes object.
638
639 Returned value is not limited with limit, configured at stream creation.
640
641 If stream was paused, this function will automatically resume it if
642 needed.
643 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500644 if n < 0:
645 raise ValueError('readexactly size can not be less than zero')
646
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 if self._exception is not None:
648 raise self._exception
649
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500650 if n == 0:
651 return b''
652
Guido van Rossum38455212014-01-06 16:09:18 -0800653 # There used to be "optimized" code here. It created its own
654 # Future and waited until self._buffer had at least the n
655 # bytes, then called read(n). Unfortunately, this could pause
656 # the transport if the argument was larger than the pause
657 # limit (which is twice self._limit). So now we just read()
658 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Guido van Rossum38455212014-01-06 16:09:18 -0800660 blocks = []
661 while n > 0:
662 block = yield from self.read(n)
663 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100664 partial = b''.join(blocks)
665 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800666 blocks.append(block)
667 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500669 assert n == 0
670
Guido van Rossum38455212014-01-06 16:09:18 -0800671 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400672
Victor Stinner71080fc2015-07-25 02:23:21 +0200673 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400674 @coroutine
675 def __aiter__(self):
676 return self
677
678 @coroutine
679 def __anext__(self):
680 val = yield from self.readline()
681 if val == b'':
682 raise StopAsyncIteration
683 return val