blob: c77eb606c2f9f1f8d1e71d08629eeaceaed8a9e8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
15from . import futures
16from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020017from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
21_DEFAULT_LIMIT = 2**16
22
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
29 - expected: total number of expected bytes
30 """
31 def __init__(self, partial, expected):
32 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
33 % (len(partial), expected))
34 self.partial = partial
35 self.expected = expected
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Victor Stinnerf951d282014-06-29 00:46:45 +020038@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039def open_connection(host=None, port=None, *,
40 loop=None, limit=_DEFAULT_LIMIT, **kwds):
41 """A wrapper for create_connection() returning a (reader, writer) pair.
42
43 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010044 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
46 The arguments are all the usual arguments to create_connection()
47 except protocol_factory; most common are positional host and port,
48 with various optional keyword arguments following.
49
50 Additional optional keyword arguments are loop (to set the event loop
51 instance to use) and limit (to set the buffer limit passed to the
52 StreamReader).
53
54 (If you want to customize the StreamReader and/or
55 StreamReaderProtocol classes, just copy the code -- there's
56 really nothing special here except some convenience.)
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080061 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 transport, _ = yield from loop.create_connection(
63 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070064 writer = StreamWriter(transport, protocol, reader, loop)
65 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
67
Victor Stinnerf951d282014-06-29 00:46:45 +020068@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080069def start_server(client_connected_cb, host=None, port=None, *,
70 loop=None, limit=_DEFAULT_LIMIT, **kwds):
71 """Start a socket server, call back for each client connected.
72
73 The first parameter, `client_connected_cb`, takes two parameters:
74 client_reader, client_writer. client_reader is a StreamReader
75 object, while client_writer is a StreamWriter object. This
76 parameter can either be a plain callback function or a coroutine;
77 if it is a coroutine, it will be automatically converted into a
78 Task.
79
80 The rest of the arguments are all the usual arguments to
81 loop.create_server() except protocol_factory; most common are
82 positional host and port, with various optional keyword arguments
83 following. The return value is the same as loop.create_server().
84
85 Additional optional keyword arguments are loop (to set the event loop
86 instance to use) and limit (to set the buffer limit passed to the
87 StreamReader).
88
89 The return value is the same as loop.create_server(), i.e. a
90 Server object which can be used to stop the service.
91 """
92 if loop is None:
93 loop = events.get_event_loop()
94
95 def factory():
96 reader = StreamReader(limit=limit, loop=loop)
97 protocol = StreamReaderProtocol(reader, client_connected_cb,
98 loop=loop)
99 return protocol
100
101 return (yield from loop.create_server(factory, host, port, **kwds))
102
103
Yury Selivanovb057c522014-02-18 12:15:06 -0500104if hasattr(socket, 'AF_UNIX'):
105 # UNIX Domain Sockets are supported on this platform
106
Victor Stinnerf951d282014-06-29 00:46:45 +0200107 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500108 def open_unix_connection(path=None, *,
109 loop=None, limit=_DEFAULT_LIMIT, **kwds):
110 """Similar to `open_connection` but works with UNIX Domain Sockets."""
111 if loop is None:
112 loop = events.get_event_loop()
113 reader = StreamReader(limit=limit, loop=loop)
114 protocol = StreamReaderProtocol(reader, loop=loop)
115 transport, _ = yield from loop.create_unix_connection(
116 lambda: protocol, path, **kwds)
117 writer = StreamWriter(transport, protocol, reader, loop)
118 return reader, writer
119
120
Victor Stinnerf951d282014-06-29 00:46:45 +0200121 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 def start_unix_server(client_connected_cb, path=None, *,
123 loop=None, limit=_DEFAULT_LIMIT, **kwds):
124 """Similar to `start_server` but works with UNIX Domain Sockets."""
125 if loop is None:
126 loop = events.get_event_loop()
127
128 def factory():
129 reader = StreamReader(limit=limit, loop=loop)
130 protocol = StreamReaderProtocol(reader, client_connected_cb,
131 loop=loop)
132 return protocol
133
134 return (yield from loop.create_unix_server(factory, path, **kwds))
135
136
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137class FlowControlMixin(protocols.Protocol):
138 """Reusable flow control logic for StreamWriter.drain().
139
140 This implements the protocol methods pause_writing(),
141 resume_reading() and connection_lost(). If the subclass overrides
142 these it must call the super methods.
143
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200144 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800145 """
146
147 def __init__(self, loop=None):
148 self._loop = loop # May be None; we may never need it.
149 self._paused = False
150 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200151 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152
153 def pause_writing(self):
154 assert not self._paused
155 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200156 if self._loop.get_debug():
157 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800158
159 def resume_writing(self):
160 assert self._paused
161 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200162 if self._loop.get_debug():
163 logger.debug("%r resumes writing", self)
164
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800165 waiter = self._drain_waiter
166 if waiter is not None:
167 self._drain_waiter = None
168 if not waiter.done():
169 waiter.set_result(None)
170
171 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200172 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800173 # Wake up the writer if currently paused.
174 if not self._paused:
175 return
176 waiter = self._drain_waiter
177 if waiter is None:
178 return
179 self._drain_waiter = None
180 if waiter.done():
181 return
182 if exc is None:
183 waiter.set_result(None)
184 else:
185 waiter.set_exception(exc)
186
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200187 @coroutine
188 def _drain_helper(self):
189 if self._connection_lost:
190 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800191 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800193 waiter = self._drain_waiter
194 assert waiter is None or waiter.cancelled()
195 waiter = futures.Future(loop=self._loop)
196 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200197 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800198
199
200class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
201 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202
203 (This is a helper class instead of making StreamReader itself a
204 Protocol subclass, because the StreamReader has other potential
205 uses, and to prevent the user of the StreamReader to accidentally
206 call inappropriate methods of the protocol.)
207 """
208
Guido van Rossum1540b162013-11-19 11:43:38 -0800209 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700211 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800212 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800213 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
215 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700216 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800217 if self._client_connected_cb is not None:
218 self._stream_writer = StreamWriter(transport, self,
219 self._stream_reader,
220 self._loop)
221 res = self._client_connected_cb(self._stream_reader,
222 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200223 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200224 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225
226 def connection_lost(self, exc):
227 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700228 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700230 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800231 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235
236 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700237 self._stream_reader.feed_eof()
238
Guido van Rossum355491d2013-10-18 15:17:11 -0700239
240class StreamWriter:
241 """Wraps a Transport.
242
243 This exposes write(), writelines(), [can_]write_eof(),
244 get_extra_info() and close(). It adds drain() which returns an
245 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800246 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700247 directly.
248 """
249
250 def __init__(self, transport, protocol, reader, loop):
251 self._transport = transport
252 self._protocol = protocol
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200253 # drain() expects that the reader has a exception() method
254 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700255 self._reader = reader
256 self._loop = loop
257
Victor Stinneracdb7822014-07-14 18:33:40 +0200258 def __repr__(self):
259 info = [self.__class__.__name__, 'transport=%r' % self._transport]
260 if self._reader is not None:
261 info.append('reader=%r' % self._reader)
262 return '<%s>' % ' '.join(info)
263
Guido van Rossum355491d2013-10-18 15:17:11 -0700264 @property
265 def transport(self):
266 return self._transport
267
268 def write(self, data):
269 self._transport.write(data)
270
271 def writelines(self, data):
272 self._transport.writelines(data)
273
274 def write_eof(self):
275 return self._transport.write_eof()
276
277 def can_write_eof(self):
278 return self._transport.can_write_eof()
279
280 def close(self):
281 return self._transport.close()
282
283 def get_extra_info(self, name, default=None):
284 return self._transport.get_extra_info(name, default)
285
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200286 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700287 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200288 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700289
290 The intended use is to write
291
292 w.write(data)
293 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700294 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200295 if self._reader is not None:
296 exc = self._reader.exception()
297 if exc is not None:
298 raise exc
299 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
301
302class StreamReader:
303
304 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
305 # The line length limit is a security feature;
306 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700307 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 if loop is None:
309 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700310 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500311 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 self._eof = False # Whether we're done.
313 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 self._exception = None
315 self._transport = None
316 self._paused = False
317
318 def exception(self):
319 return self._exception
320
321 def set_exception(self, exc):
322 self._exception = exc
323
Guido van Rossum355491d2013-10-18 15:17:11 -0700324 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700326 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 if not waiter.cancelled():
328 waiter.set_exception(exc)
329
330 def set_transport(self, transport):
331 assert self._transport is None, 'Transport already set'
332 self._transport = transport
333
334 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500335 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700337 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
339 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700340 self._eof = True
341 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700343 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if not waiter.cancelled():
345 waiter.set_result(True)
346
Yury Selivanovf0020f52014-02-06 00:14:30 -0500347 def at_eof(self):
348 """Return True if the buffer is empty and 'feed_eof' was called."""
349 return self._eof and not self._buffer
350
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500352 assert not self._eof, 'feed_data after feed_eof'
353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 if not data:
355 return
356
Yury Selivanove694c972014-02-05 18:11:13 -0500357 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358
Guido van Rossum355491d2013-10-18 15:17:11 -0700359 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700361 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 if not waiter.cancelled():
363 waiter.set_result(False)
364
365 if (self._transport is not None and
366 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500367 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700369 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 except NotImplementedError:
371 # The transport can't be paused.
372 # We'll just have to buffer all data.
373 # Forget the transport so we don't keep trying.
374 self._transport = None
375 else:
376 self._paused = True
377
Victor Stinner183e3472014-01-23 17:40:03 +0100378 def _create_waiter(self, func_name):
379 # StreamReader uses a future to link the protocol feed_data() method
380 # to a read coroutine. Running two read coroutines at the same time
381 # would have an unexpected behaviour. It would not possible to know
382 # which coroutine would get the next data.
383 if self._waiter is not None:
384 raise RuntimeError('%s() called while another coroutine is '
385 'already waiting for incoming data' % func_name)
386 return futures.Future(loop=self._loop)
387
Victor Stinnerf951d282014-06-29 00:46:45 +0200388 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 def readline(self):
390 if self._exception is not None:
391 raise self._exception
392
Yury Selivanove694c972014-02-05 18:11:13 -0500393 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 not_enough = True
395
396 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700397 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500398 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500400 line.extend(self._buffer)
401 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 else:
403 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500404 line.extend(self._buffer[:ichar])
405 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Yury Selivanove694c972014-02-05 18:11:13 -0500408 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 self._maybe_resume_transport()
410 raise ValueError('Line is too long')
411
Guido van Rossum355491d2013-10-18 15:17:11 -0700412 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 break
414
415 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100416 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700418 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700420 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500423 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
Victor Stinnerf951d282014-06-29 00:46:45 +0200425 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 def read(self, n=-1):
427 if self._exception is not None:
428 raise self._exception
429
430 if not n:
431 return b''
432
433 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700434 # This used to just loop creating a new waiter hoping to
435 # collect everything in self._buffer, but that would
436 # deadlock if the subprocess sends more than self.limit
437 # bytes. So just call self.read(self._limit) until EOF.
438 blocks = []
439 while True:
440 block = yield from self.read(self._limit)
441 if not block:
442 break
443 blocks.append(block)
444 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500446 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100447 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700449 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700451 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
Yury Selivanove694c972014-02-05 18:11:13 -0500453 if n < 0 or len(self._buffer) <= n:
454 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700455 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500456 else:
457 # n > 0 and len(self._buffer) > n
458 data = bytes(self._buffer[:n])
459 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
Yury Selivanove694c972014-02-05 18:11:13 -0500461 self._maybe_resume_transport()
462 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463
Victor Stinnerf951d282014-06-29 00:46:45 +0200464 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 def readexactly(self, n):
466 if self._exception is not None:
467 raise self._exception
468
Guido van Rossum38455212014-01-06 16:09:18 -0800469 # There used to be "optimized" code here. It created its own
470 # Future and waited until self._buffer had at least the n
471 # bytes, then called read(n). Unfortunately, this could pause
472 # the transport if the argument was larger than the pause
473 # limit (which is twice self._limit). So now we just read()
474 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
Guido van Rossum38455212014-01-06 16:09:18 -0800476 blocks = []
477 while n > 0:
478 block = yield from self.read(n)
479 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100480 partial = b''.join(blocks)
481 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800482 blocks.append(block)
483 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484
Guido van Rossum38455212014-01-06 16:09:18 -0800485 return b''.join(blocks)