blob: 6b5e96aea2c9caa559393f015ff1124a7eb41b0b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
16from . import futures
17from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
22_DEFAULT_LIMIT = 2**16
23
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
30 - expected: total number of expected bytes
31 """
32 def __init__(self, partial, expected):
33 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
34 % (len(partial), expected))
35 self.partial = partial
36 self.expected = expected
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Victor Stinnerf951d282014-06-29 00:46:45 +020039@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040def open_connection(host=None, port=None, *,
41 loop=None, limit=_DEFAULT_LIMIT, **kwds):
42 """A wrapper for create_connection() returning a (reader, writer) pair.
43
44 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010045 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
47 The arguments are all the usual arguments to create_connection()
48 except protocol_factory; most common are positional host and port,
49 with various optional keyword arguments following.
50
51 Additional optional keyword arguments are loop (to set the event loop
52 instance to use) and limit (to set the buffer limit passed to the
53 StreamReader).
54
55 (If you want to customize the StreamReader and/or
56 StreamReaderProtocol classes, just copy the code -- there's
57 really nothing special here except some convenience.)
58 """
59 if loop is None:
60 loop = events.get_event_loop()
61 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080062 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 transport, _ = yield from loop.create_connection(
64 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070065 writer = StreamWriter(transport, protocol, reader, loop)
66 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
68
Victor Stinnerf951d282014-06-29 00:46:45 +020069@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080070def start_server(client_connected_cb, host=None, port=None, *,
71 loop=None, limit=_DEFAULT_LIMIT, **kwds):
72 """Start a socket server, call back for each client connected.
73
74 The first parameter, `client_connected_cb`, takes two parameters:
75 client_reader, client_writer. client_reader is a StreamReader
76 object, while client_writer is a StreamWriter object. This
77 parameter can either be a plain callback function or a coroutine;
78 if it is a coroutine, it will be automatically converted into a
79 Task.
80
81 The rest of the arguments are all the usual arguments to
82 loop.create_server() except protocol_factory; most common are
83 positional host and port, with various optional keyword arguments
84 following. The return value is the same as loop.create_server().
85
86 Additional optional keyword arguments are loop (to set the event loop
87 instance to use) and limit (to set the buffer limit passed to the
88 StreamReader).
89
90 The return value is the same as loop.create_server(), i.e. a
91 Server object which can be used to stop the service.
92 """
93 if loop is None:
94 loop = events.get_event_loop()
95
96 def factory():
97 reader = StreamReader(limit=limit, loop=loop)
98 protocol = StreamReaderProtocol(reader, client_connected_cb,
99 loop=loop)
100 return protocol
101
102 return (yield from loop.create_server(factory, host, port, **kwds))
103
104
Yury Selivanovb057c522014-02-18 12:15:06 -0500105if hasattr(socket, 'AF_UNIX'):
106 # UNIX Domain Sockets are supported on this platform
107
Victor Stinnerf951d282014-06-29 00:46:45 +0200108 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500109 def open_unix_connection(path=None, *,
110 loop=None, limit=_DEFAULT_LIMIT, **kwds):
111 """Similar to `open_connection` but works with UNIX Domain Sockets."""
112 if loop is None:
113 loop = events.get_event_loop()
114 reader = StreamReader(limit=limit, loop=loop)
115 protocol = StreamReaderProtocol(reader, loop=loop)
116 transport, _ = yield from loop.create_unix_connection(
117 lambda: protocol, path, **kwds)
118 writer = StreamWriter(transport, protocol, reader, loop)
119 return reader, writer
120
121
Victor Stinnerf951d282014-06-29 00:46:45 +0200122 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500123 def start_unix_server(client_connected_cb, path=None, *,
124 loop=None, limit=_DEFAULT_LIMIT, **kwds):
125 """Similar to `start_server` but works with UNIX Domain Sockets."""
126 if loop is None:
127 loop = events.get_event_loop()
128
129 def factory():
130 reader = StreamReader(limit=limit, loop=loop)
131 protocol = StreamReaderProtocol(reader, client_connected_cb,
132 loop=loop)
133 return protocol
134
135 return (yield from loop.create_unix_server(factory, path, **kwds))
136
137
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800138class FlowControlMixin(protocols.Protocol):
139 """Reusable flow control logic for StreamWriter.drain().
140
141 This implements the protocol methods pause_writing(),
142 resume_reading() and connection_lost(). If the subclass overrides
143 these it must call the super methods.
144
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200145 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800146 """
147
148 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100149 if loop is None:
150 self._loop = events.get_event_loop()
151 else:
152 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153 self._paused = False
154 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156
157 def pause_writing(self):
158 assert not self._paused
159 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200160 if self._loop.get_debug():
161 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162
163 def resume_writing(self):
164 assert self._paused
165 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200166 if self._loop.get_debug():
167 logger.debug("%r resumes writing", self)
168
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800169 waiter = self._drain_waiter
170 if waiter is not None:
171 self._drain_waiter = None
172 if not waiter.done():
173 waiter.set_result(None)
174
175 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200176 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177 # Wake up the writer if currently paused.
178 if not self._paused:
179 return
180 waiter = self._drain_waiter
181 if waiter is None:
182 return
183 self._drain_waiter = None
184 if waiter.done():
185 return
186 if exc is None:
187 waiter.set_result(None)
188 else:
189 waiter.set_exception(exc)
190
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200191 @coroutine
192 def _drain_helper(self):
193 if self._connection_lost:
194 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800195 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200196 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800197 waiter = self._drain_waiter
198 assert waiter is None or waiter.cancelled()
199 waiter = futures.Future(loop=self._loop)
200 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800202
203
204class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
205 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 (This is a helper class instead of making StreamReader itself a
208 Protocol subclass, because the StreamReader has other potential
209 uses, and to prevent the user of the StreamReader to accidentally
210 call inappropriate methods of the protocol.)
211 """
212
Guido van Rossum1540b162013-11-19 11:43:38 -0800213 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800214 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700215 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800216 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800217 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
219 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700220 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800221 if self._client_connected_cb is not None:
222 self._stream_writer = StreamWriter(transport, self,
223 self._stream_reader,
224 self._loop)
225 res = self._client_connected_cb(self._stream_reader,
226 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200227 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200228 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def connection_lost(self, exc):
231 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700232 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800235 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236
237 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700238 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239
240 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700241 self._stream_reader.feed_eof()
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200242 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700243
Guido van Rossum355491d2013-10-18 15:17:11 -0700244
245class StreamWriter:
246 """Wraps a Transport.
247
248 This exposes write(), writelines(), [can_]write_eof(),
249 get_extra_info() and close(). It adds drain() which returns an
250 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800251 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700252 directly.
253 """
254
255 def __init__(self, transport, protocol, reader, loop):
256 self._transport = transport
257 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000258 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200259 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700260 self._reader = reader
261 self._loop = loop
262
Victor Stinneracdb7822014-07-14 18:33:40 +0200263 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100264 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200265 if self._reader is not None:
266 info.append('reader=%r' % self._reader)
267 return '<%s>' % ' '.join(info)
268
Guido van Rossum355491d2013-10-18 15:17:11 -0700269 @property
270 def transport(self):
271 return self._transport
272
273 def write(self, data):
274 self._transport.write(data)
275
276 def writelines(self, data):
277 self._transport.writelines(data)
278
279 def write_eof(self):
280 return self._transport.write_eof()
281
282 def can_write_eof(self):
283 return self._transport.can_write_eof()
284
Victor Stinner406204c2015-01-15 21:50:19 +0100285 def close(self):
286 return self._transport.close()
287
Guido van Rossum355491d2013-10-18 15:17:11 -0700288 def get_extra_info(self, name, default=None):
289 return self._transport.get_extra_info(name, default)
290
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200291 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700292 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200293 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700294
295 The intended use is to write
296
297 w.write(data)
298 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700299 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200300 if self._reader is not None:
301 exc = self._reader.exception()
302 if exc is not None:
303 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700304 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500305 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700306 # Yield to the event loop so connection_lost() may be
307 # called. Without this, _drain_helper() would return
308 # immediately, and code that calls
309 # write(...); yield from drain()
310 # in a loop would never call connection_lost(), so it
311 # would not see an error when the socket is closed.
312 yield
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200313 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
316class StreamReader:
317
318 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
319 # The line length limit is a security feature;
320 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700321 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100323 self._loop = events.get_event_loop()
324 else:
325 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500326 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100327 self._eof = False # Whether we're done.
328 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._exception = None
330 self._transport = None
331 self._paused = False
332
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200333 def __repr__(self):
334 info = ['StreamReader']
335 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300336 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200337 if self._eof:
338 info.append('eof')
339 if self._limit != _DEFAULT_LIMIT:
340 info.append('l=%d' % self._limit)
341 if self._waiter:
342 info.append('w=%r' % self._waiter)
343 if self._exception:
344 info.append('e=%r' % self._exception)
345 if self._transport:
346 info.append('t=%r' % self._transport)
347 if self._paused:
348 info.append('paused')
349 return '<%s>' % ' '.join(info)
350
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 def exception(self):
352 return self._exception
353
354 def set_exception(self, exc):
355 self._exception = exc
356
Guido van Rossum355491d2013-10-18 15:17:11 -0700357 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700359 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 if not waiter.cancelled():
361 waiter.set_exception(exc)
362
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100363 def _wakeup_waiter(self):
364 """Wakeup read() or readline() function waiting for data or EOF."""
365 waiter = self._waiter
366 if waiter is not None:
367 self._waiter = None
368 if not waiter.cancelled():
369 waiter.set_result(None)
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 def set_transport(self, transport):
372 assert self._transport is None, 'Transport already set'
373 self._transport = transport
374
375 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500376 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700378 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
380 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700381 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100382 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Yury Selivanovf0020f52014-02-06 00:14:30 -0500384 def at_eof(self):
385 """Return True if the buffer is empty and 'feed_eof' was called."""
386 return self._eof and not self._buffer
387
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500389 assert not self._eof, 'feed_data after feed_eof'
390
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 if not data:
392 return
393
Yury Selivanove694c972014-02-05 18:11:13 -0500394 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100395 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
397 if (self._transport is not None and
398 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500399 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700401 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 except NotImplementedError:
403 # The transport can't be paused.
404 # We'll just have to buffer all data.
405 # Forget the transport so we don't keep trying.
406 self._transport = None
407 else:
408 self._paused = True
409
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100410 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100411 def _wait_for_data(self, func_name):
412 """Wait until feed_data() or feed_eof() is called."""
Victor Stinner183e3472014-01-23 17:40:03 +0100413 # StreamReader uses a future to link the protocol feed_data() method
414 # to a read coroutine. Running two read coroutines at the same time
415 # would have an unexpected behaviour. It would not possible to know
416 # which coroutine would get the next data.
417 if self._waiter is not None:
418 raise RuntimeError('%s() called while another coroutine is '
419 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100420
421 self._waiter = futures.Future(loop=self._loop)
422 try:
423 yield from self._waiter
424 finally:
425 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100426
Victor Stinnerf951d282014-06-29 00:46:45 +0200427 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 def readline(self):
429 if self._exception is not None:
430 raise self._exception
431
Yury Selivanove694c972014-02-05 18:11:13 -0500432 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 not_enough = True
434
435 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700436 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500437 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500439 line.extend(self._buffer)
440 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 else:
442 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500443 line.extend(self._buffer[:ichar])
444 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
Yury Selivanove694c972014-02-05 18:11:13 -0500447 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 self._maybe_resume_transport()
449 raise ValueError('Line is too long')
450
Guido van Rossum355491d2013-10-18 15:17:11 -0700451 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 break
453
454 if not_enough:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100455 yield from self._wait_for_data('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500458 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459
Victor Stinnerf951d282014-06-29 00:46:45 +0200460 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 def read(self, n=-1):
462 if self._exception is not None:
463 raise self._exception
464
465 if not n:
466 return b''
467
468 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700469 # This used to just loop creating a new waiter hoping to
470 # collect everything in self._buffer, but that would
471 # deadlock if the subprocess sends more than self.limit
472 # bytes. So just call self.read(self._limit) until EOF.
473 blocks = []
474 while True:
475 block = yield from self.read(self._limit)
476 if not block:
477 break
478 blocks.append(block)
479 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500481 if not self._buffer and not self._eof:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100482 yield from self._wait_for_data('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
Yury Selivanove694c972014-02-05 18:11:13 -0500484 if n < 0 or len(self._buffer) <= n:
485 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700486 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500487 else:
488 # n > 0 and len(self._buffer) > n
489 data = bytes(self._buffer[:n])
490 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491
Yury Selivanove694c972014-02-05 18:11:13 -0500492 self._maybe_resume_transport()
493 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
Victor Stinnerf951d282014-06-29 00:46:45 +0200495 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 def readexactly(self, n):
497 if self._exception is not None:
498 raise self._exception
499
Guido van Rossum38455212014-01-06 16:09:18 -0800500 # There used to be "optimized" code here. It created its own
501 # Future and waited until self._buffer had at least the n
502 # bytes, then called read(n). Unfortunately, this could pause
503 # the transport if the argument was larger than the pause
504 # limit (which is twice self._limit). So now we just read()
505 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
Guido van Rossum38455212014-01-06 16:09:18 -0800507 blocks = []
508 while n > 0:
509 block = yield from self.read(n)
510 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100511 partial = b''.join(blocks)
512 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800513 blocks.append(block)
514 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515
Guido van Rossum38455212014-01-06 16:09:18 -0800516 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400517
Victor Stinner71080fc2015-07-25 02:23:21 +0200518 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400519 @coroutine
520 def __aiter__(self):
521 return self
522
523 @coroutine
524 def __anext__(self):
525 val = yield from self.readline()
526 if val == b'':
527 raise StopAsyncIteration
528 return val