blob: c88a87cd0967c87f22bab48d5b24e6306df661e1 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanovb4617912016-05-16 16:32:38 -040022_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050030 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010031 """
32 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050033 super().__init__("%d bytes read on a total of %r expected bytes"
34 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010035 self.partial = partial
36 self.expected = expected
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
49
Victor Stinnerf951d282014-06-29 00:46:45 +020050@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051def open_connection(host=None, port=None, *,
52 loop=None, limit=_DEFAULT_LIMIT, **kwds):
53 """A wrapper for create_connection() returning a (reader, writer) pair.
54
55 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010056 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
58 The arguments are all the usual arguments to create_connection()
59 except protocol_factory; most common are positional host and port,
60 with various optional keyword arguments following.
61
62 Additional optional keyword arguments are loop (to set the event loop
63 instance to use) and limit (to set the buffer limit passed to the
64 StreamReader).
65
66 (If you want to customize the StreamReader and/or
67 StreamReaderProtocol classes, just copy the code -- there's
68 really nothing special here except some convenience.)
69 """
70 if loop is None:
71 loop = events.get_event_loop()
72 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080073 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 transport, _ = yield from loop.create_connection(
75 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070076 writer = StreamWriter(transport, protocol, reader, loop)
77 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
79
Victor Stinnerf951d282014-06-29 00:46:45 +020080@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080081def start_server(client_connected_cb, host=None, port=None, *,
82 loop=None, limit=_DEFAULT_LIMIT, **kwds):
83 """Start a socket server, call back for each client connected.
84
85 The first parameter, `client_connected_cb`, takes two parameters:
86 client_reader, client_writer. client_reader is a StreamReader
87 object, while client_writer is a StreamWriter object. This
88 parameter can either be a plain callback function or a coroutine;
89 if it is a coroutine, it will be automatically converted into a
90 Task.
91
92 The rest of the arguments are all the usual arguments to
93 loop.create_server() except protocol_factory; most common are
94 positional host and port, with various optional keyword arguments
95 following. The return value is the same as loop.create_server().
96
97 Additional optional keyword arguments are loop (to set the event loop
98 instance to use) and limit (to set the buffer limit passed to the
99 StreamReader).
100
101 The return value is the same as loop.create_server(), i.e. a
102 Server object which can be used to stop the service.
103 """
104 if loop is None:
105 loop = events.get_event_loop()
106
107 def factory():
108 reader = StreamReader(limit=limit, loop=loop)
109 protocol = StreamReaderProtocol(reader, client_connected_cb,
110 loop=loop)
111 return protocol
112
113 return (yield from loop.create_server(factory, host, port, **kwds))
114
115
Yury Selivanovb057c522014-02-18 12:15:06 -0500116if hasattr(socket, 'AF_UNIX'):
117 # UNIX Domain Sockets are supported on this platform
118
Victor Stinnerf951d282014-06-29 00:46:45 +0200119 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500120 def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
127 transport, _ = yield from loop.create_unix_connection(
128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Victor Stinnerf951d282014-06-29 00:46:45 +0200132 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500133 def start_unix_server(client_connected_cb, path=None, *,
134 loop=None, limit=_DEFAULT_LIMIT, **kwds):
135 """Similar to `start_server` but works with UNIX Domain Sockets."""
136 if loop is None:
137 loop = events.get_event_loop()
138
139 def factory():
140 reader = StreamReader(limit=limit, loop=loop)
141 protocol = StreamReaderProtocol(reader, client_connected_cb,
142 loop=loop)
143 return protocol
144
145 return (yield from loop.create_unix_server(factory, path, **kwds))
146
147
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800148class FlowControlMixin(protocols.Protocol):
149 """Reusable flow control logic for StreamWriter.drain().
150
151 This implements the protocol methods pause_writing(),
152 resume_reading() and connection_lost(). If the subclass overrides
153 these it must call the super methods.
154
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156 """
157
158 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100159 if loop is None:
160 self._loop = events.get_event_loop()
161 else:
162 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800163 self._paused = False
164 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200165 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166
167 def pause_writing(self):
168 assert not self._paused
169 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200170 if self._loop.get_debug():
171 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800172
173 def resume_writing(self):
174 assert self._paused
175 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200176 if self._loop.get_debug():
177 logger.debug("%r resumes writing", self)
178
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800179 waiter = self._drain_waiter
180 if waiter is not None:
181 self._drain_waiter = None
182 if not waiter.done():
183 waiter.set_result(None)
184
185 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200186 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800187 # Wake up the writer if currently paused.
188 if not self._paused:
189 return
190 waiter = self._drain_waiter
191 if waiter is None:
192 return
193 self._drain_waiter = None
194 if waiter.done():
195 return
196 if exc is None:
197 waiter.set_result(None)
198 else:
199 waiter.set_exception(exc)
200
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 @coroutine
202 def _drain_helper(self):
203 if self._connection_lost:
204 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200206 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800207 waiter = self._drain_waiter
208 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400209 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200211 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800212
213
214class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
215 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216
217 (This is a helper class instead of making StreamReader itself a
218 Protocol subclass, because the StreamReader has other potential
219 uses, and to prevent the user of the StreamReader to accidentally
220 call inappropriate methods of the protocol.)
221 """
222
Guido van Rossum1540b162013-11-19 11:43:38 -0800223 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800224 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800226 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400228 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400232 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800233 if self._client_connected_cb is not None:
234 self._stream_writer = StreamWriter(transport, self,
235 self._stream_reader,
236 self._loop)
237 res = self._client_connected_cb(self._stream_reader,
238 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200239 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200240 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400243 if self._stream_reader is not None:
244 if exc is None:
245 self._stream_reader.feed_eof()
246 else:
247 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800248 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400249 self._stream_reader = None
250 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700253 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
255 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700256 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400257 if self._over_ssl:
258 # Prevent a warning in SSLProtocol.eof_received:
259 # "returning true from eof_received()
260 # has no effect when using ssl"
261 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200262 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700263
Guido van Rossum355491d2013-10-18 15:17:11 -0700264
265class StreamWriter:
266 """Wraps a Transport.
267
268 This exposes write(), writelines(), [can_]write_eof(),
269 get_extra_info() and close(). It adds drain() which returns an
270 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800271 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700272 directly.
273 """
274
275 def __init__(self, transport, protocol, reader, loop):
276 self._transport = transport
277 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000278 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200279 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700280 self._reader = reader
281 self._loop = loop
282
Victor Stinneracdb7822014-07-14 18:33:40 +0200283 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100284 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200285 if self._reader is not None:
286 info.append('reader=%r' % self._reader)
287 return '<%s>' % ' '.join(info)
288
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 @property
290 def transport(self):
291 return self._transport
292
293 def write(self, data):
294 self._transport.write(data)
295
296 def writelines(self, data):
297 self._transport.writelines(data)
298
299 def write_eof(self):
300 return self._transport.write_eof()
301
302 def can_write_eof(self):
303 return self._transport.can_write_eof()
304
Victor Stinner406204c2015-01-15 21:50:19 +0100305 def close(self):
306 return self._transport.close()
307
Guido van Rossum355491d2013-10-18 15:17:11 -0700308 def get_extra_info(self, name, default=None):
309 return self._transport.get_extra_info(name, default)
310
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200311 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200313 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700314
315 The intended use is to write
316
317 w.write(data)
318 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700319 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200320 if self._reader is not None:
321 exc = self._reader.exception()
322 if exc is not None:
323 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700324 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500325 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700326 # Yield to the event loop so connection_lost() may be
327 # called. Without this, _drain_helper() would return
328 # immediately, and code that calls
329 # write(...); yield from drain()
330 # in a loop would never call connection_lost(), so it
331 # would not see an error when the socket is closed.
332 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200333 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
335
336class StreamReader:
337
338 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
339 # The line length limit is a security feature;
340 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500341
342 if limit <= 0:
343 raise ValueError('Limit cannot be <= 0')
344
Guido van Rossum355491d2013-10-18 15:17:11 -0700345 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100347 self._loop = events.get_event_loop()
348 else:
349 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500350 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100351 self._eof = False # Whether we're done.
352 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._exception = None
354 self._transport = None
355 self._paused = False
356
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200357 def __repr__(self):
358 info = ['StreamReader']
359 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300360 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200361 if self._eof:
362 info.append('eof')
363 if self._limit != _DEFAULT_LIMIT:
364 info.append('l=%d' % self._limit)
365 if self._waiter:
366 info.append('w=%r' % self._waiter)
367 if self._exception:
368 info.append('e=%r' % self._exception)
369 if self._transport:
370 info.append('t=%r' % self._transport)
371 if self._paused:
372 info.append('paused')
373 return '<%s>' % ' '.join(info)
374
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 def exception(self):
376 return self._exception
377
378 def set_exception(self, exc):
379 self._exception = exc
380
Guido van Rossum355491d2013-10-18 15:17:11 -0700381 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700383 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 if not waiter.cancelled():
385 waiter.set_exception(exc)
386
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100387 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500388 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100389 waiter = self._waiter
390 if waiter is not None:
391 self._waiter = None
392 if not waiter.cancelled():
393 waiter.set_result(None)
394
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 def set_transport(self, transport):
396 assert self._transport is None, 'Transport already set'
397 self._transport = transport
398
399 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500400 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700402 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700405 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100406 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Yury Selivanovf0020f52014-02-06 00:14:30 -0500408 def at_eof(self):
409 """Return True if the buffer is empty and 'feed_eof' was called."""
410 return self._eof and not self._buffer
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500413 assert not self._eof, 'feed_data after feed_eof'
414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 if not data:
416 return
417
Yury Selivanove694c972014-02-05 18:11:13 -0500418 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100419 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
421 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400422 not self._paused and
423 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700425 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 except NotImplementedError:
427 # The transport can't be paused.
428 # We'll just have to buffer all data.
429 # Forget the transport so we don't keep trying.
430 self._transport = None
431 else:
432 self._paused = True
433
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100434 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100435 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500436 """Wait until feed_data() or feed_eof() is called.
437
438 If stream was paused, automatically resume it.
439 """
Victor Stinner183e3472014-01-23 17:40:03 +0100440 # StreamReader uses a future to link the protocol feed_data() method
441 # to a read coroutine. Running two read coroutines at the same time
442 # would have an unexpected behaviour. It would not possible to know
443 # which coroutine would get the next data.
444 if self._waiter is not None:
445 raise RuntimeError('%s() called while another coroutine is '
446 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100447
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500448 assert not self._eof, '_wait_for_data after EOF'
449
450 # Waiting for data while paused will make deadlock, so prevent it.
451 if self._paused:
452 self._paused = False
453 self._transport.resume_reading()
454
Yury Selivanov7661db62016-05-16 15:38:39 -0400455 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100456 try:
457 yield from self._waiter
458 finally:
459 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100460
Victor Stinnerf951d282014-06-29 00:46:45 +0200461 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500463 """Read chunk of data from the stream until newline (b'\n') is found.
464
465 On success, return chunk that ends with newline. If only partial
466 line can be read due to EOF, return incomplete line without
467 terminating newline. When EOF was reached while no bytes read, empty
468 bytes object is returned.
469
470 If limit is reached, ValueError will be raised. In that case, if
471 newline was found, complete line including newline will be removed
472 from internal buffer. Else, internal buffer will be cleared. Limit is
473 compared against part of the line without newline.
474
475 If stream was paused, this function will automatically resume it if
476 needed.
477 """
478 sep = b'\n'
479 seplen = len(sep)
480 try:
481 line = yield from self.readuntil(sep)
482 except IncompleteReadError as e:
483 return e.partial
484 except LimitOverrunError as e:
485 if self._buffer.startswith(sep, e.consumed):
486 del self._buffer[:e.consumed + seplen]
487 else:
488 self._buffer.clear()
489 self._maybe_resume_transport()
490 raise ValueError(e.args[0])
491 return line
492
493 @coroutine
494 def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400495 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500496
Yury Selivanovb4617912016-05-16 16:32:38 -0400497 On success, the data and separator will be removed from the
498 internal buffer (consumed). Returned data will include the
499 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500500
Yury Selivanovb4617912016-05-16 16:32:38 -0400501 Configured stream limit is used to check result. Limit sets the
502 maximal length of data that can be returned, not counting the
503 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500504
Yury Selivanovb4617912016-05-16 16:32:38 -0400505 If an EOF occurs and the complete separator is still not found,
506 an IncompleteReadError exception will be raised, and the internal
507 buffer will be reset. The IncompleteReadError.partial attribute
508 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500509
Yury Selivanovb4617912016-05-16 16:32:38 -0400510 If the data cannot be read because of over limit, a
511 LimitOverrunError exception will be raised, and the data
512 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500513 """
514 seplen = len(separator)
515 if seplen == 0:
516 raise ValueError('Separator should be at least one-byte string')
517
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 if self._exception is not None:
519 raise self._exception
520
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500521 # Consume whole buffer except last bytes, which length is
522 # one less than seplen. Let's check corner cases with
523 # separator='SEPARATOR':
524 # * we have received almost complete separator (without last
525 # byte). i.e buffer='some textSEPARATO'. In this case we
526 # can safely consume len(separator) - 1 bytes.
527 # * last byte of buffer is first byte of separator, i.e.
528 # buffer='abcdefghijklmnopqrS'. We may safely consume
529 # everything except that last byte, but this require to
530 # analyze bytes of buffer that match partial separator.
531 # This is slow and/or require FSM. For this case our
532 # implementation is not optimal, since require rescanning
533 # of data that is known to not belong to separator. In
534 # real world, separator will not be so long to notice
535 # performance problems. Even when reading MIME-encoded
536 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
Yury Selivanovb4617912016-05-16 16:32:38 -0400538 # `offset` is the number of bytes from the beginning of the buffer
539 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500540 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500542 # Loop until we find `separator` in the buffer, exceed the buffer size,
543 # or an EOF has happened.
544 while True:
545 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500547 # Check if we now have enough data in the buffer for `separator` to
548 # fit.
549 if buflen - offset >= seplen:
550 isep = self._buffer.find(separator, offset)
551
552 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400553 # `separator` is in the buffer. `isep` will be used later
554 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500555 break
556
557 # see upper comment for explanation.
558 offset = buflen + 1 - seplen
559 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400560 raise LimitOverrunError(
561 'Separator is not found, and chunk exceed the limit',
562 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500563
564 # Complete message (with full separator) may be present in buffer
565 # even when EOF flag is set. This may happen when the last chunk
566 # adds data which makes separator be found. That's why we check for
567 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700568 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500569 chunk = bytes(self._buffer)
570 self._buffer.clear()
571 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500573 # _wait_for_data() will resume reading if stream was paused.
574 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400577 raise LimitOverrunError(
578 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500579
580 chunk = self._buffer[:isep + seplen]
581 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500583 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584
Victor Stinnerf951d282014-06-29 00:46:45 +0200585 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500587 """Read up to `n` bytes from the stream.
588
589 If n is not provided, or set to -1, read until EOF and return all read
590 bytes. If the EOF was received and the internal buffer is empty, return
591 an empty bytes object.
592
593 If n is zero, return empty bytes object immediatelly.
594
595 If n is positive, this function try to read `n` bytes, and may return
596 less or equal bytes than requested, but at least one byte. If EOF was
597 received before any byte is read, this function returns empty byte
598 object.
599
Yury Selivanovb4617912016-05-16 16:32:38 -0400600 Returned value is not limited with limit, configured at stream
601 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500602
603 If stream was paused, this function will automatically resume it if
604 needed.
605 """
606
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 if self._exception is not None:
608 raise self._exception
609
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500610 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 return b''
612
613 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700614 # This used to just loop creating a new waiter hoping to
615 # collect everything in self._buffer, but that would
616 # deadlock if the subprocess sends more than self.limit
617 # bytes. So just call self.read(self._limit) until EOF.
618 blocks = []
619 while True:
620 block = yield from self.read(self._limit)
621 if not block:
622 break
623 blocks.append(block)
624 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500626 if not self._buffer and not self._eof:
627 yield from self._wait_for_data('read')
628
629 # This will work right even if buffer is less than n bytes
630 data = bytes(self._buffer[:n])
631 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632
Yury Selivanove694c972014-02-05 18:11:13 -0500633 self._maybe_resume_transport()
634 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635
Victor Stinnerf951d282014-06-29 00:46:45 +0200636 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500638 """Read exactly `n` bytes.
639
Yury Selivanovb4617912016-05-16 16:32:38 -0400640 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
641 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500642 contain the partial read bytes.
643
644 if n is zero, return empty bytes object.
645
Yury Selivanovb4617912016-05-16 16:32:38 -0400646 Returned value is not limited with limit, configured at stream
647 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500648
649 If stream was paused, this function will automatically resume it if
650 needed.
651 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500652 if n < 0:
653 raise ValueError('readexactly size can not be less than zero')
654
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 if self._exception is not None:
656 raise self._exception
657
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500658 if n == 0:
659 return b''
660
Guido van Rossum38455212014-01-06 16:09:18 -0800661 # There used to be "optimized" code here. It created its own
662 # Future and waited until self._buffer had at least the n
663 # bytes, then called read(n). Unfortunately, this could pause
664 # the transport if the argument was larger than the pause
665 # limit (which is twice self._limit). So now we just read()
666 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
Guido van Rossum38455212014-01-06 16:09:18 -0800668 blocks = []
669 while n > 0:
670 block = yield from self.read(n)
671 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100672 partial = b''.join(blocks)
673 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800674 blocks.append(block)
675 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500677 assert n == 0
678
Guido van Rossum38455212014-01-06 16:09:18 -0800679 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400680
Victor Stinner71080fc2015-07-25 02:23:21 +0200681 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400682 @coroutine
683 def __aiter__(self):
684 return self
685
686 @coroutine
687 def __anext__(self):
688 val = yield from self.readline()
689 if val == b'':
690 raise StopAsyncIteration
691 return val
Yury Selivanova6f6edb2016-06-09 15:08:31 -0400692
693 if compat.PY352:
694 # In Python 3.5.2 and greater, __aiter__ should return
695 # the asynchronous iterator directly.
696 def __aiter__(self):
697 return self