blob: a82cc79acaa3fb371a01d8367456be75cc997086 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020015from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanovb4617912016-05-16 16:32:38 -040022_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050030 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010031 """
32 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050033 super().__init__("%d bytes read on a total of %r expected bytes"
34 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010035 self.partial = partial
36 self.expected = expected
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
49
Victor Stinnerf951d282014-06-29 00:46:45 +020050@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051def open_connection(host=None, port=None, *,
52 loop=None, limit=_DEFAULT_LIMIT, **kwds):
53 """A wrapper for create_connection() returning a (reader, writer) pair.
54
55 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010056 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
58 The arguments are all the usual arguments to create_connection()
59 except protocol_factory; most common are positional host and port,
60 with various optional keyword arguments following.
61
62 Additional optional keyword arguments are loop (to set the event loop
63 instance to use) and limit (to set the buffer limit passed to the
64 StreamReader).
65
66 (If you want to customize the StreamReader and/or
67 StreamReaderProtocol classes, just copy the code -- there's
68 really nothing special here except some convenience.)
69 """
70 if loop is None:
71 loop = events.get_event_loop()
72 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080073 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 transport, _ = yield from loop.create_connection(
75 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070076 writer = StreamWriter(transport, protocol, reader, loop)
77 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
79
Victor Stinnerf951d282014-06-29 00:46:45 +020080@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080081def start_server(client_connected_cb, host=None, port=None, *,
82 loop=None, limit=_DEFAULT_LIMIT, **kwds):
83 """Start a socket server, call back for each client connected.
84
85 The first parameter, `client_connected_cb`, takes two parameters:
86 client_reader, client_writer. client_reader is a StreamReader
87 object, while client_writer is a StreamWriter object. This
88 parameter can either be a plain callback function or a coroutine;
89 if it is a coroutine, it will be automatically converted into a
90 Task.
91
92 The rest of the arguments are all the usual arguments to
93 loop.create_server() except protocol_factory; most common are
94 positional host and port, with various optional keyword arguments
95 following. The return value is the same as loop.create_server().
96
97 Additional optional keyword arguments are loop (to set the event loop
98 instance to use) and limit (to set the buffer limit passed to the
99 StreamReader).
100
101 The return value is the same as loop.create_server(), i.e. a
102 Server object which can be used to stop the service.
103 """
104 if loop is None:
105 loop = events.get_event_loop()
106
107 def factory():
108 reader = StreamReader(limit=limit, loop=loop)
109 protocol = StreamReaderProtocol(reader, client_connected_cb,
110 loop=loop)
111 return protocol
112
113 return (yield from loop.create_server(factory, host, port, **kwds))
114
115
Yury Selivanovb057c522014-02-18 12:15:06 -0500116if hasattr(socket, 'AF_UNIX'):
117 # UNIX Domain Sockets are supported on this platform
118
Victor Stinnerf951d282014-06-29 00:46:45 +0200119 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500120 def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
127 transport, _ = yield from loop.create_unix_connection(
128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Victor Stinnerf951d282014-06-29 00:46:45 +0200132 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500133 def start_unix_server(client_connected_cb, path=None, *,
134 loop=None, limit=_DEFAULT_LIMIT, **kwds):
135 """Similar to `start_server` but works with UNIX Domain Sockets."""
136 if loop is None:
137 loop = events.get_event_loop()
138
139 def factory():
140 reader = StreamReader(limit=limit, loop=loop)
141 protocol = StreamReaderProtocol(reader, client_connected_cb,
142 loop=loop)
143 return protocol
144
145 return (yield from loop.create_unix_server(factory, path, **kwds))
146
147
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800148class FlowControlMixin(protocols.Protocol):
149 """Reusable flow control logic for StreamWriter.drain().
150
151 This implements the protocol methods pause_writing(),
152 resume_reading() and connection_lost(). If the subclass overrides
153 these it must call the super methods.
154
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156 """
157
158 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100159 if loop is None:
160 self._loop = events.get_event_loop()
161 else:
162 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800163 self._paused = False
164 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200165 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166
167 def pause_writing(self):
168 assert not self._paused
169 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200170 if self._loop.get_debug():
171 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800172
173 def resume_writing(self):
174 assert self._paused
175 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200176 if self._loop.get_debug():
177 logger.debug("%r resumes writing", self)
178
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800179 waiter = self._drain_waiter
180 if waiter is not None:
181 self._drain_waiter = None
182 if not waiter.done():
183 waiter.set_result(None)
184
185 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200186 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800187 # Wake up the writer if currently paused.
188 if not self._paused:
189 return
190 waiter = self._drain_waiter
191 if waiter is None:
192 return
193 self._drain_waiter = None
194 if waiter.done():
195 return
196 if exc is None:
197 waiter.set_result(None)
198 else:
199 waiter.set_exception(exc)
200
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 @coroutine
202 def _drain_helper(self):
203 if self._connection_lost:
204 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200206 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800207 waiter = self._drain_waiter
208 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400209 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200211 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800212
213
214class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
215 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216
217 (This is a helper class instead of making StreamReader itself a
218 Protocol subclass, because the StreamReader has other potential
219 uses, and to prevent the user of the StreamReader to accidentally
220 call inappropriate methods of the protocol.)
221 """
222
Guido van Rossum1540b162013-11-19 11:43:38 -0800223 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800224 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800226 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400228 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400232 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800233 if self._client_connected_cb is not None:
234 self._stream_writer = StreamWriter(transport, self,
235 self._stream_reader,
236 self._loop)
237 res = self._client_connected_cb(self._stream_reader,
238 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200239 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200240 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400243 if self._stream_reader is not None:
244 if exc is None:
245 self._stream_reader.feed_eof()
246 else:
247 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800248 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400249 self._stream_reader = None
250 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700253 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
255 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700256 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400257 if self._over_ssl:
258 # Prevent a warning in SSLProtocol.eof_received:
259 # "returning true from eof_received()
260 # has no effect when using ssl"
261 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200262 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700263
Guido van Rossum355491d2013-10-18 15:17:11 -0700264
265class StreamWriter:
266 """Wraps a Transport.
267
268 This exposes write(), writelines(), [can_]write_eof(),
269 get_extra_info() and close(). It adds drain() which returns an
270 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800271 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700272 directly.
273 """
274
275 def __init__(self, transport, protocol, reader, loop):
276 self._transport = transport
277 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000278 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200279 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700280 self._reader = reader
281 self._loop = loop
282
Victor Stinneracdb7822014-07-14 18:33:40 +0200283 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100284 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200285 if self._reader is not None:
286 info.append('reader=%r' % self._reader)
287 return '<%s>' % ' '.join(info)
288
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 @property
290 def transport(self):
291 return self._transport
292
293 def write(self, data):
294 self._transport.write(data)
295
296 def writelines(self, data):
297 self._transport.writelines(data)
298
299 def write_eof(self):
300 return self._transport.write_eof()
301
302 def can_write_eof(self):
303 return self._transport.can_write_eof()
304
Victor Stinner406204c2015-01-15 21:50:19 +0100305 def close(self):
306 return self._transport.close()
307
Guido van Rossum355491d2013-10-18 15:17:11 -0700308 def get_extra_info(self, name, default=None):
309 return self._transport.get_extra_info(name, default)
310
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200311 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200313 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700314
315 The intended use is to write
316
317 w.write(data)
318 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700319 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200320 if self._reader is not None:
321 exc = self._reader.exception()
322 if exc is not None:
323 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700324 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500325 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700326 # Yield to the event loop so connection_lost() may be
327 # called. Without this, _drain_helper() would return
328 # immediately, and code that calls
329 # write(...); yield from drain()
330 # in a loop would never call connection_lost(), so it
331 # would not see an error when the socket is closed.
332 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200333 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
335
336class StreamReader:
337
338 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
339 # The line length limit is a security feature;
340 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500341
342 if limit <= 0:
343 raise ValueError('Limit cannot be <= 0')
344
Guido van Rossum355491d2013-10-18 15:17:11 -0700345 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100347 self._loop = events.get_event_loop()
348 else:
349 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500350 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100351 self._eof = False # Whether we're done.
352 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._exception = None
354 self._transport = None
355 self._paused = False
356
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200357 def __repr__(self):
358 info = ['StreamReader']
359 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300360 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200361 if self._eof:
362 info.append('eof')
363 if self._limit != _DEFAULT_LIMIT:
364 info.append('l=%d' % self._limit)
365 if self._waiter:
366 info.append('w=%r' % self._waiter)
367 if self._exception:
368 info.append('e=%r' % self._exception)
369 if self._transport:
370 info.append('t=%r' % self._transport)
371 if self._paused:
372 info.append('paused')
373 return '<%s>' % ' '.join(info)
374
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 def exception(self):
376 return self._exception
377
378 def set_exception(self, exc):
379 self._exception = exc
380
Guido van Rossum355491d2013-10-18 15:17:11 -0700381 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700383 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 if not waiter.cancelled():
385 waiter.set_exception(exc)
386
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100387 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500388 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100389 waiter = self._waiter
390 if waiter is not None:
391 self._waiter = None
392 if not waiter.cancelled():
393 waiter.set_result(None)
394
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 def set_transport(self, transport):
396 assert self._transport is None, 'Transport already set'
397 self._transport = transport
398
399 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500400 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700402 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700405 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100406 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Yury Selivanovf0020f52014-02-06 00:14:30 -0500408 def at_eof(self):
409 """Return True if the buffer is empty and 'feed_eof' was called."""
410 return self._eof and not self._buffer
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500413 assert not self._eof, 'feed_data after feed_eof'
414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 if not data:
416 return
417
Yury Selivanove694c972014-02-05 18:11:13 -0500418 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100419 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
421 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400422 not self._paused and
423 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700425 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 except NotImplementedError:
427 # The transport can't be paused.
428 # We'll just have to buffer all data.
429 # Forget the transport so we don't keep trying.
430 self._transport = None
431 else:
432 self._paused = True
433
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100434 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100435 def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500436 """Wait until feed_data() or feed_eof() is called.
437
438 If stream was paused, automatically resume it.
439 """
Victor Stinner183e3472014-01-23 17:40:03 +0100440 # StreamReader uses a future to link the protocol feed_data() method
441 # to a read coroutine. Running two read coroutines at the same time
442 # would have an unexpected behaviour. It would not possible to know
443 # which coroutine would get the next data.
444 if self._waiter is not None:
445 raise RuntimeError('%s() called while another coroutine is '
446 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100447
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500448 assert not self._eof, '_wait_for_data after EOF'
449
450 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400451 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500452 if self._paused:
453 self._paused = False
454 self._transport.resume_reading()
455
Yury Selivanov7661db62016-05-16 15:38:39 -0400456 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100457 try:
458 yield from self._waiter
459 finally:
460 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100461
Victor Stinnerf951d282014-06-29 00:46:45 +0200462 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500464 """Read chunk of data from the stream until newline (b'\n') is found.
465
466 On success, return chunk that ends with newline. If only partial
467 line can be read due to EOF, return incomplete line without
468 terminating newline. When EOF was reached while no bytes read, empty
469 bytes object is returned.
470
471 If limit is reached, ValueError will be raised. In that case, if
472 newline was found, complete line including newline will be removed
473 from internal buffer. Else, internal buffer will be cleared. Limit is
474 compared against part of the line without newline.
475
476 If stream was paused, this function will automatically resume it if
477 needed.
478 """
479 sep = b'\n'
480 seplen = len(sep)
481 try:
482 line = yield from self.readuntil(sep)
483 except IncompleteReadError as e:
484 return e.partial
485 except LimitOverrunError as e:
486 if self._buffer.startswith(sep, e.consumed):
487 del self._buffer[:e.consumed + seplen]
488 else:
489 self._buffer.clear()
490 self._maybe_resume_transport()
491 raise ValueError(e.args[0])
492 return line
493
494 @coroutine
495 def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400496 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500497
Yury Selivanovb4617912016-05-16 16:32:38 -0400498 On success, the data and separator will be removed from the
499 internal buffer (consumed). Returned data will include the
500 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500501
Yury Selivanovb4617912016-05-16 16:32:38 -0400502 Configured stream limit is used to check result. Limit sets the
503 maximal length of data that can be returned, not counting the
504 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500505
Yury Selivanovb4617912016-05-16 16:32:38 -0400506 If an EOF occurs and the complete separator is still not found,
507 an IncompleteReadError exception will be raised, and the internal
508 buffer will be reset. The IncompleteReadError.partial attribute
509 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500510
Yury Selivanovb4617912016-05-16 16:32:38 -0400511 If the data cannot be read because of over limit, a
512 LimitOverrunError exception will be raised, and the data
513 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500514 """
515 seplen = len(separator)
516 if seplen == 0:
517 raise ValueError('Separator should be at least one-byte string')
518
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 if self._exception is not None:
520 raise self._exception
521
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500522 # Consume whole buffer except last bytes, which length is
523 # one less than seplen. Let's check corner cases with
524 # separator='SEPARATOR':
525 # * we have received almost complete separator (without last
526 # byte). i.e buffer='some textSEPARATO'. In this case we
527 # can safely consume len(separator) - 1 bytes.
528 # * last byte of buffer is first byte of separator, i.e.
529 # buffer='abcdefghijklmnopqrS'. We may safely consume
530 # everything except that last byte, but this require to
531 # analyze bytes of buffer that match partial separator.
532 # This is slow and/or require FSM. For this case our
533 # implementation is not optimal, since require rescanning
534 # of data that is known to not belong to separator. In
535 # real world, separator will not be so long to notice
536 # performance problems. Even when reading MIME-encoded
537 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538
Yury Selivanovb4617912016-05-16 16:32:38 -0400539 # `offset` is the number of bytes from the beginning of the buffer
540 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500541 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500543 # Loop until we find `separator` in the buffer, exceed the buffer size,
544 # or an EOF has happened.
545 while True:
546 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500548 # Check if we now have enough data in the buffer for `separator` to
549 # fit.
550 if buflen - offset >= seplen:
551 isep = self._buffer.find(separator, offset)
552
553 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400554 # `separator` is in the buffer. `isep` will be used later
555 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500556 break
557
558 # see upper comment for explanation.
559 offset = buflen + 1 - seplen
560 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400561 raise LimitOverrunError(
562 'Separator is not found, and chunk exceed the limit',
563 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500564
565 # Complete message (with full separator) may be present in buffer
566 # even when EOF flag is set. This may happen when the last chunk
567 # adds data which makes separator be found. That's why we check for
568 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700569 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500570 chunk = bytes(self._buffer)
571 self._buffer.clear()
572 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500574 # _wait_for_data() will resume reading if stream was paused.
575 yield from self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500577 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400578 raise LimitOverrunError(
579 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500580
581 chunk = self._buffer[:isep + seplen]
582 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500584 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585
Victor Stinnerf951d282014-06-29 00:46:45 +0200586 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500588 """Read up to `n` bytes from the stream.
589
590 If n is not provided, or set to -1, read until EOF and return all read
591 bytes. If the EOF was received and the internal buffer is empty, return
592 an empty bytes object.
593
Martin Panter0be894b2016-09-07 12:03:06 +0000594 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500595
596 If n is positive, this function try to read `n` bytes, and may return
597 less or equal bytes than requested, but at least one byte. If EOF was
598 received before any byte is read, this function returns empty byte
599 object.
600
Yury Selivanovb4617912016-05-16 16:32:38 -0400601 Returned value is not limited with limit, configured at stream
602 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500603
604 If stream was paused, this function will automatically resume it if
605 needed.
606 """
607
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 if self._exception is not None:
609 raise self._exception
610
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500611 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 return b''
613
614 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700615 # This used to just loop creating a new waiter hoping to
616 # collect everything in self._buffer, but that would
617 # deadlock if the subprocess sends more than self.limit
618 # bytes. So just call self.read(self._limit) until EOF.
619 blocks = []
620 while True:
621 block = yield from self.read(self._limit)
622 if not block:
623 break
624 blocks.append(block)
625 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500627 if not self._buffer and not self._eof:
628 yield from self._wait_for_data('read')
629
630 # This will work right even if buffer is less than n bytes
631 data = bytes(self._buffer[:n])
632 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
Yury Selivanove694c972014-02-05 18:11:13 -0500634 self._maybe_resume_transport()
635 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636
Victor Stinnerf951d282014-06-29 00:46:45 +0200637 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500639 """Read exactly `n` bytes.
640
Yury Selivanovb4617912016-05-16 16:32:38 -0400641 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
642 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500643 contain the partial read bytes.
644
645 if n is zero, return empty bytes object.
646
Yury Selivanovb4617912016-05-16 16:32:38 -0400647 Returned value is not limited with limit, configured at stream
648 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500649
650 If stream was paused, this function will automatically resume it if
651 needed.
652 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500653 if n < 0:
654 raise ValueError('readexactly size can not be less than zero')
655
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656 if self._exception is not None:
657 raise self._exception
658
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500659 if n == 0:
660 return b''
661
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400662 while len(self._buffer) < n:
663 if self._eof:
664 incomplete = bytes(self._buffer)
665 self._buffer.clear()
666 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400668 yield from self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400670 if len(self._buffer) == n:
671 data = bytes(self._buffer)
672 self._buffer.clear()
673 else:
674 data = bytes(self._buffer[:n])
675 del self._buffer[:n]
676 self._maybe_resume_transport()
677 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400678
Victor Stinner71080fc2015-07-25 02:23:21 +0200679 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400680 @coroutine
681 def __aiter__(self):
682 return self
683
684 @coroutine
685 def __anext__(self):
686 val = yield from self.readline()
687 if val == b'':
688 raise StopAsyncIteration
689 return val
Yury Selivanova6f6edb2016-06-09 15:08:31 -0400690
691 if compat.PY352:
692 # In Python 3.5.2 and greater, __aiter__ should return
693 # the asynchronous iterator directly.
694 def __aiter__(self):
695 return self