blob: 0345a3d3918cc33787b3905abb3bf8b4fe824852 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanovb4617912016-05-16 16:32:38 -040022_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050030 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010031 """
32 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050033 super().__init__("%d bytes read on a total of %r expected bytes"
34 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010035 self.partial = partial
36 self.expected = expected
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
49
Victor Stinnerf951d282014-06-29 00:46:45 +020050@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051def open_connection(host=None, port=None, *,
52 loop=None, limit=_DEFAULT_LIMIT, **kwds):
53 """A wrapper for create_connection() returning a (reader, writer) pair.
54
55 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010056 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
58 The arguments are all the usual arguments to create_connection()
59 except protocol_factory; most common are positional host and port,
60 with various optional keyword arguments following.
61
62 Additional optional keyword arguments are loop (to set the event loop
63 instance to use) and limit (to set the buffer limit passed to the
64 StreamReader).
65
66 (If you want to customize the StreamReader and/or
67 StreamReaderProtocol classes, just copy the code -- there's
68 really nothing special here except some convenience.)
69 """
70 if loop is None:
71 loop = events.get_event_loop()
72 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080073 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 transport, _ = yield from loop.create_connection(
75 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070076 writer = StreamWriter(transport, protocol, reader, loop)
77 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
79
Victor Stinnerf951d282014-06-29 00:46:45 +020080@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080081def start_server(client_connected_cb, host=None, port=None, *,
82 loop=None, limit=_DEFAULT_LIMIT, **kwds):
83 """Start a socket server, call back for each client connected.
84
85 The first parameter, `client_connected_cb`, takes two parameters:
86 client_reader, client_writer. client_reader is a StreamReader
87 object, while client_writer is a StreamWriter object. This
88 parameter can either be a plain callback function or a coroutine;
89 if it is a coroutine, it will be automatically converted into a
90 Task.
91
92 The rest of the arguments are all the usual arguments to
93 loop.create_server() except protocol_factory; most common are
94 positional host and port, with various optional keyword arguments
95 following. The return value is the same as loop.create_server().
96
97 Additional optional keyword arguments are loop (to set the event loop
98 instance to use) and limit (to set the buffer limit passed to the
99 StreamReader).
100
101 The return value is the same as loop.create_server(), i.e. a
102 Server object which can be used to stop the service.
103 """
104 if loop is None:
105 loop = events.get_event_loop()
106
107 def factory():
108 reader = StreamReader(limit=limit, loop=loop)
109 protocol = StreamReaderProtocol(reader, client_connected_cb,
110 loop=loop)
111 return protocol
112
113 return (yield from loop.create_server(factory, host, port, **kwds))
114
115
Yury Selivanovb057c522014-02-18 12:15:06 -0500116if hasattr(socket, 'AF_UNIX'):
117 # UNIX Domain Sockets are supported on this platform
118
Victor Stinnerf951d282014-06-29 00:46:45 +0200119 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500120 def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
127 transport, _ = yield from loop.create_unix_connection(
128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Victor Stinnerf951d282014-06-29 00:46:45 +0200132 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500133 def start_unix_server(client_connected_cb, path=None, *,
134 loop=None, limit=_DEFAULT_LIMIT, **kwds):
135 """Similar to `start_server` but works with UNIX Domain Sockets."""
136 if loop is None:
137 loop = events.get_event_loop()
138
139 def factory():
140 reader = StreamReader(limit=limit, loop=loop)
141 protocol = StreamReaderProtocol(reader, client_connected_cb,
142 loop=loop)
143 return protocol
144
145 return (yield from loop.create_unix_server(factory, path, **kwds))
146
147
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800148class FlowControlMixin(protocols.Protocol):
149 """Reusable flow control logic for StreamWriter.drain().
150
151 This implements the protocol methods pause_writing(),
152 resume_reading() and connection_lost(). If the subclass overrides
153 these it must call the super methods.
154
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156 """
157
158 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100159 if loop is None:
160 self._loop = events.get_event_loop()
161 else:
162 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800163 self._paused = False
164 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200165 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166
167 def pause_writing(self):
168 assert not self._paused
169 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200170 if self._loop.get_debug():
171 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800172
173 def resume_writing(self):
174 assert self._paused
175 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200176 if self._loop.get_debug():
177 logger.debug("%r resumes writing", self)
178
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800179 waiter = self._drain_waiter
180 if waiter is not None:
181 self._drain_waiter = None
182 if not waiter.done():
183 waiter.set_result(None)
184
185 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200186 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800187 # Wake up the writer if currently paused.
188 if not self._paused:
189 return
190 waiter = self._drain_waiter
191 if waiter is None:
192 return
193 self._drain_waiter = None
194 if waiter.done():
195 return
196 if exc is None:
197 waiter.set_result(None)
198 else:
199 waiter.set_exception(exc)
200
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 @coroutine
202 def _drain_helper(self):
203 if self._connection_lost:
204 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200206 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800207 waiter = self._drain_waiter
208 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400209 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200211 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800212
213
214class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
215 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216
217 (This is a helper class instead of making StreamReader itself a
218 Protocol subclass, because the StreamReader has other potential
219 uses, and to prevent the user of the StreamReader to accidentally
220 call inappropriate methods of the protocol.)
221 """
222
Guido van Rossum1540b162013-11-19 11:43:38 -0800223 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800224 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800226 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
229 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700230 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 if self._client_connected_cb is not None:
232 self._stream_writer = StreamWriter(transport, self,
233 self._stream_reader,
234 self._loop)
235 res = self._client_connected_cb(self._stream_reader,
236 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200237 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200238 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239
240 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400241 if self._stream_reader is not None:
242 if exc is None:
243 self._stream_reader.feed_eof()
244 else:
245 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800246 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400247 self._stream_reader = None
248 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249
250 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700251 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252
253 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700254 self._stream_reader.feed_eof()
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200255 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700256
Guido van Rossum355491d2013-10-18 15:17:11 -0700257
258class StreamWriter:
259 """Wraps a Transport.
260
261 This exposes write(), writelines(), [can_]write_eof(),
262 get_extra_info() and close(). It adds drain() which returns an
263 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800264 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700265 directly.
266 """
267
268 def __init__(self, transport, protocol, reader, loop):
269 self._transport = transport
270 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000271 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200272 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700273 self._reader = reader
274 self._loop = loop
275
Victor Stinneracdb7822014-07-14 18:33:40 +0200276 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100277 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200278 if self._reader is not None:
279 info.append('reader=%r' % self._reader)
280 return '<%s>' % ' '.join(info)
281
Guido van Rossum355491d2013-10-18 15:17:11 -0700282 @property
283 def transport(self):
284 return self._transport
285
286 def write(self, data):
287 self._transport.write(data)
288
289 def writelines(self, data):
290 self._transport.writelines(data)
291
292 def write_eof(self):
293 return self._transport.write_eof()
294
295 def can_write_eof(self):
296 return self._transport.can_write_eof()
297
Victor Stinner406204c2015-01-15 21:50:19 +0100298 def close(self):
299 return self._transport.close()
300
Guido van Rossum355491d2013-10-18 15:17:11 -0700301 def get_extra_info(self, name, default=None):
302 return self._transport.get_extra_info(name, default)
303
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200304 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700305 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200306 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700307
308 The intended use is to write
309
310 w.write(data)
311 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200313 if self._reader is not None:
314 exc = self._reader.exception()
315 if exc is not None:
316 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700317 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500318 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700319 # Yield to the event loop so connection_lost() may be
320 # called. Without this, _drain_helper() would return
321 # immediately, and code that calls
322 # write(...); yield from drain()
323 # in a loop would never call connection_lost(), so it
324 # would not see an error when the socket is closed.
325 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200326 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328
329class StreamReader:
330
331 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
332 # The line length limit is a security feature;
333 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500334
335 if limit <= 0:
336 raise ValueError('Limit cannot be <= 0')
337
Guido van Rossum355491d2013-10-18 15:17:11 -0700338 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100340 self._loop = events.get_event_loop()
341 else:
342 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500343 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100344 self._eof = False # Whether we're done.
345 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 self._exception = None
347 self._transport = None
348 self._paused = False
349
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200350 def __repr__(self):
351 info = ['StreamReader']
352 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300353 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200354 if self._eof:
355 info.append('eof')
356 if self._limit != _DEFAULT_LIMIT:
357 info.append('l=%d' % self._limit)
358 if self._waiter:
359 info.append('w=%r' % self._waiter)
360 if self._exception:
361 info.append('e=%r' % self._exception)
362 if self._transport:
363 info.append('t=%r' % self._transport)
364 if self._paused:
365 info.append('paused')
366 return '<%s>' % ' '.join(info)
367
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 def exception(self):
369 return self._exception
370
371 def set_exception(self, exc):
372 self._exception = exc
373
Guido van Rossum355491d2013-10-18 15:17:11 -0700374 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700376 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 if not waiter.cancelled():
378 waiter.set_exception(exc)
379
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100380 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500381 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100382 waiter = self._waiter
383 if waiter is not None:
384 self._waiter = None
385 if not waiter.cancelled():
386 waiter.set_result(None)
387
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 def set_transport(self, transport):
389 assert self._transport is None, 'Transport already set'
390 self._transport = transport
391
392 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500393 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700395 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700398 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100399 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Yury Selivanovf0020f52014-02-06 00:14:30 -0500401 def at_eof(self):
402 """Return True if the buffer is empty and 'feed_eof' was called."""
403 return self._eof and not self._buffer
404
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500406 assert not self._eof, 'feed_data after feed_eof'
407
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 if not data:
409 return
410
Yury Selivanove694c972014-02-05 18:11:13 -0500411 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100412 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400415 not self._paused and
416 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700418 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 except NotImplementedError:
420 # The transport can't be paused.
421 # We'll just have to buffer all data.
422 # Forget the transport so we don't keep trying.
423 self._transport = None
424 else:
425 self._paused = True
426
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100427 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100428 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500429 """Wait until feed_data() or feed_eof() is called.
430
431 If stream was paused, automatically resume it.
432 """
Victor Stinner183e3472014-01-23 17:40:03 +0100433 # StreamReader uses a future to link the protocol feed_data() method
434 # to a read coroutine. Running two read coroutines at the same time
435 # would have an unexpected behaviour. It would not possible to know
436 # which coroutine would get the next data.
437 if self._waiter is not None:
438 raise RuntimeError('%s() called while another coroutine is '
439 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100440
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500441 assert not self._eof, '_wait_for_data after EOF'
442
443 # Waiting for data while paused will make deadlock, so prevent it.
444 if self._paused:
445 self._paused = False
446 self._transport.resume_reading()
447
Yury Selivanov7661db62016-05-16 15:38:39 -0400448 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100449 try:
450 yield from self._waiter
451 finally:
452 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100453
Victor Stinnerf951d282014-06-29 00:46:45 +0200454 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500456 """Read chunk of data from the stream until newline (b'\n') is found.
457
458 On success, return chunk that ends with newline. If only partial
459 line can be read due to EOF, return incomplete line without
460 terminating newline. When EOF was reached while no bytes read, empty
461 bytes object is returned.
462
463 If limit is reached, ValueError will be raised. In that case, if
464 newline was found, complete line including newline will be removed
465 from internal buffer. Else, internal buffer will be cleared. Limit is
466 compared against part of the line without newline.
467
468 If stream was paused, this function will automatically resume it if
469 needed.
470 """
471 sep = b'\n'
472 seplen = len(sep)
473 try:
474 line = yield from self.readuntil(sep)
475 except IncompleteReadError as e:
476 return e.partial
477 except LimitOverrunError as e:
478 if self._buffer.startswith(sep, e.consumed):
479 del self._buffer[:e.consumed + seplen]
480 else:
481 self._buffer.clear()
482 self._maybe_resume_transport()
483 raise ValueError(e.args[0])
484 return line
485
486 @coroutine
487 def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400488 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500489
Yury Selivanovb4617912016-05-16 16:32:38 -0400490 On success, the data and separator will be removed from the
491 internal buffer (consumed). Returned data will include the
492 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500493
Yury Selivanovb4617912016-05-16 16:32:38 -0400494 Configured stream limit is used to check result. Limit sets the
495 maximal length of data that can be returned, not counting the
496 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500497
Yury Selivanovb4617912016-05-16 16:32:38 -0400498 If an EOF occurs and the complete separator is still not found,
499 an IncompleteReadError exception will be raised, and the internal
500 buffer will be reset. The IncompleteReadError.partial attribute
501 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500502
Yury Selivanovb4617912016-05-16 16:32:38 -0400503 If the data cannot be read because of over limit, a
504 LimitOverrunError exception will be raised, and the data
505 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500506 """
507 seplen = len(separator)
508 if seplen == 0:
509 raise ValueError('Separator should be at least one-byte string')
510
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 if self._exception is not None:
512 raise self._exception
513
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500514 # Consume whole buffer except last bytes, which length is
515 # one less than seplen. Let's check corner cases with
516 # separator='SEPARATOR':
517 # * we have received almost complete separator (without last
518 # byte). i.e buffer='some textSEPARATO'. In this case we
519 # can safely consume len(separator) - 1 bytes.
520 # * last byte of buffer is first byte of separator, i.e.
521 # buffer='abcdefghijklmnopqrS'. We may safely consume
522 # everything except that last byte, but this require to
523 # analyze bytes of buffer that match partial separator.
524 # This is slow and/or require FSM. For this case our
525 # implementation is not optimal, since require rescanning
526 # of data that is known to not belong to separator. In
527 # real world, separator will not be so long to notice
528 # performance problems. Even when reading MIME-encoded
529 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
Yury Selivanovb4617912016-05-16 16:32:38 -0400531 # `offset` is the number of bytes from the beginning of the buffer
532 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500533 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500535 # Loop until we find `separator` in the buffer, exceed the buffer size,
536 # or an EOF has happened.
537 while True:
538 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500540 # Check if we now have enough data in the buffer for `separator` to
541 # fit.
542 if buflen - offset >= seplen:
543 isep = self._buffer.find(separator, offset)
544
545 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400546 # `separator` is in the buffer. `isep` will be used later
547 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500548 break
549
550 # see upper comment for explanation.
551 offset = buflen + 1 - seplen
552 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400553 raise LimitOverrunError(
554 'Separator is not found, and chunk exceed the limit',
555 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500556
557 # Complete message (with full separator) may be present in buffer
558 # even when EOF flag is set. This may happen when the last chunk
559 # adds data which makes separator be found. That's why we check for
560 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700561 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500562 chunk = bytes(self._buffer)
563 self._buffer.clear()
564 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500566 # _wait_for_data() will resume reading if stream was paused.
567 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500569 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400570 raise LimitOverrunError(
571 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500572
573 chunk = self._buffer[:isep + seplen]
574 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577
Victor Stinnerf951d282014-06-29 00:46:45 +0200578 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500580 """Read up to `n` bytes from the stream.
581
582 If n is not provided, or set to -1, read until EOF and return all read
583 bytes. If the EOF was received and the internal buffer is empty, return
584 an empty bytes object.
585
586 If n is zero, return empty bytes object immediatelly.
587
588 If n is positive, this function try to read `n` bytes, and may return
589 less or equal bytes than requested, but at least one byte. If EOF was
590 received before any byte is read, this function returns empty byte
591 object.
592
Yury Selivanovb4617912016-05-16 16:32:38 -0400593 Returned value is not limited with limit, configured at stream
594 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500595
596 If stream was paused, this function will automatically resume it if
597 needed.
598 """
599
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 if self._exception is not None:
601 raise self._exception
602
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500603 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 return b''
605
606 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700607 # This used to just loop creating a new waiter hoping to
608 # collect everything in self._buffer, but that would
609 # deadlock if the subprocess sends more than self.limit
610 # bytes. So just call self.read(self._limit) until EOF.
611 blocks = []
612 while True:
613 block = yield from self.read(self._limit)
614 if not block:
615 break
616 blocks.append(block)
617 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500619 if not self._buffer and not self._eof:
620 yield from self._wait_for_data('read')
621
622 # This will work right even if buffer is less than n bytes
623 data = bytes(self._buffer[:n])
624 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
Yury Selivanove694c972014-02-05 18:11:13 -0500626 self._maybe_resume_transport()
627 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
Victor Stinnerf951d282014-06-29 00:46:45 +0200629 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500631 """Read exactly `n` bytes.
632
Yury Selivanovb4617912016-05-16 16:32:38 -0400633 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
634 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500635 contain the partial read bytes.
636
637 if n is zero, return empty bytes object.
638
Yury Selivanovb4617912016-05-16 16:32:38 -0400639 Returned value is not limited with limit, configured at stream
640 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500641
642 If stream was paused, this function will automatically resume it if
643 needed.
644 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500645 if n < 0:
646 raise ValueError('readexactly size can not be less than zero')
647
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 if self._exception is not None:
649 raise self._exception
650
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500651 if n == 0:
652 return b''
653
Guido van Rossum38455212014-01-06 16:09:18 -0800654 # There used to be "optimized" code here. It created its own
655 # Future and waited until self._buffer had at least the n
656 # bytes, then called read(n). Unfortunately, this could pause
657 # the transport if the argument was larger than the pause
658 # limit (which is twice self._limit). So now we just read()
659 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660
Guido van Rossum38455212014-01-06 16:09:18 -0800661 blocks = []
662 while n > 0:
663 block = yield from self.read(n)
664 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100665 partial = b''.join(blocks)
666 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800667 blocks.append(block)
668 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500670 assert n == 0
671
Guido van Rossum38455212014-01-06 16:09:18 -0800672 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400673
Victor Stinner71080fc2015-07-25 02:23:21 +0200674 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400675 @coroutine
676 def __aiter__(self):
677 return self
678
679 @coroutine
680 def __anext__(self):
681 val = yield from self.readline()
682 if val == b'':
683 raise StopAsyncIteration
684 return val