blob: 7ff16a488f9ef3f39e85a56ae24fcdf16cc73eea [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
15from . import futures
16from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020017from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
21_DEFAULT_LIMIT = 2**16
22
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
29 - expected: total number of expected bytes
30 """
31 def __init__(self, partial, expected):
32 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
33 % (len(partial), expected))
34 self.partial = partial
35 self.expected = expected
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Victor Stinnerf951d282014-06-29 00:46:45 +020038@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039def open_connection(host=None, port=None, *,
40 loop=None, limit=_DEFAULT_LIMIT, **kwds):
41 """A wrapper for create_connection() returning a (reader, writer) pair.
42
43 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010044 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
46 The arguments are all the usual arguments to create_connection()
47 except protocol_factory; most common are positional host and port,
48 with various optional keyword arguments following.
49
50 Additional optional keyword arguments are loop (to set the event loop
51 instance to use) and limit (to set the buffer limit passed to the
52 StreamReader).
53
54 (If you want to customize the StreamReader and/or
55 StreamReaderProtocol classes, just copy the code -- there's
56 really nothing special here except some convenience.)
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080061 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 transport, _ = yield from loop.create_connection(
63 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070064 writer = StreamWriter(transport, protocol, reader, loop)
65 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
67
Victor Stinnerf951d282014-06-29 00:46:45 +020068@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080069def start_server(client_connected_cb, host=None, port=None, *,
70 loop=None, limit=_DEFAULT_LIMIT, **kwds):
71 """Start a socket server, call back for each client connected.
72
73 The first parameter, `client_connected_cb`, takes two parameters:
74 client_reader, client_writer. client_reader is a StreamReader
75 object, while client_writer is a StreamWriter object. This
76 parameter can either be a plain callback function or a coroutine;
77 if it is a coroutine, it will be automatically converted into a
78 Task.
79
80 The rest of the arguments are all the usual arguments to
81 loop.create_server() except protocol_factory; most common are
82 positional host and port, with various optional keyword arguments
83 following. The return value is the same as loop.create_server().
84
85 Additional optional keyword arguments are loop (to set the event loop
86 instance to use) and limit (to set the buffer limit passed to the
87 StreamReader).
88
89 The return value is the same as loop.create_server(), i.e. a
90 Server object which can be used to stop the service.
91 """
92 if loop is None:
93 loop = events.get_event_loop()
94
95 def factory():
96 reader = StreamReader(limit=limit, loop=loop)
97 protocol = StreamReaderProtocol(reader, client_connected_cb,
98 loop=loop)
99 return protocol
100
101 return (yield from loop.create_server(factory, host, port, **kwds))
102
103
Yury Selivanovb057c522014-02-18 12:15:06 -0500104if hasattr(socket, 'AF_UNIX'):
105 # UNIX Domain Sockets are supported on this platform
106
Victor Stinnerf951d282014-06-29 00:46:45 +0200107 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500108 def open_unix_connection(path=None, *,
109 loop=None, limit=_DEFAULT_LIMIT, **kwds):
110 """Similar to `open_connection` but works with UNIX Domain Sockets."""
111 if loop is None:
112 loop = events.get_event_loop()
113 reader = StreamReader(limit=limit, loop=loop)
114 protocol = StreamReaderProtocol(reader, loop=loop)
115 transport, _ = yield from loop.create_unix_connection(
116 lambda: protocol, path, **kwds)
117 writer = StreamWriter(transport, protocol, reader, loop)
118 return reader, writer
119
120
Victor Stinnerf951d282014-06-29 00:46:45 +0200121 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 def start_unix_server(client_connected_cb, path=None, *,
123 loop=None, limit=_DEFAULT_LIMIT, **kwds):
124 """Similar to `start_server` but works with UNIX Domain Sockets."""
125 if loop is None:
126 loop = events.get_event_loop()
127
128 def factory():
129 reader = StreamReader(limit=limit, loop=loop)
130 protocol = StreamReaderProtocol(reader, client_connected_cb,
131 loop=loop)
132 return protocol
133
134 return (yield from loop.create_unix_server(factory, path, **kwds))
135
136
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137class FlowControlMixin(protocols.Protocol):
138 """Reusable flow control logic for StreamWriter.drain().
139
140 This implements the protocol methods pause_writing(),
141 resume_reading() and connection_lost(). If the subclass overrides
142 these it must call the super methods.
143
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200144 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800145 """
146
147 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100148 if loop is None:
149 self._loop = events.get_event_loop()
150 else:
151 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152 self._paused = False
153 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200154 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800155
156 def pause_writing(self):
157 assert not self._paused
158 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200159 if self._loop.get_debug():
160 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800161
162 def resume_writing(self):
163 assert self._paused
164 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200165 if self._loop.get_debug():
166 logger.debug("%r resumes writing", self)
167
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800168 waiter = self._drain_waiter
169 if waiter is not None:
170 self._drain_waiter = None
171 if not waiter.done():
172 waiter.set_result(None)
173
174 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200175 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800176 # Wake up the writer if currently paused.
177 if not self._paused:
178 return
179 waiter = self._drain_waiter
180 if waiter is None:
181 return
182 self._drain_waiter = None
183 if waiter.done():
184 return
185 if exc is None:
186 waiter.set_result(None)
187 else:
188 waiter.set_exception(exc)
189
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200190 @coroutine
191 def _drain_helper(self):
192 if self._connection_lost:
193 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800194 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200195 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 waiter = self._drain_waiter
197 assert waiter is None or waiter.cancelled()
198 waiter = futures.Future(loop=self._loop)
199 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200200 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800201
202
203class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
204 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 (This is a helper class instead of making StreamReader itself a
207 Protocol subclass, because the StreamReader has other potential
208 uses, and to prevent the user of the StreamReader to accidentally
209 call inappropriate methods of the protocol.)
210 """
211
Guido van Rossum1540b162013-11-19 11:43:38 -0800212 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800213 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700214 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800215 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800216 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700219 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800220 if self._client_connected_cb is not None:
221 self._stream_writer = StreamWriter(transport, self,
222 self._stream_reader,
223 self._loop)
224 res = self._client_connected_cb(self._stream_reader,
225 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200226 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200227 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
229 def connection_lost(self, exc):
230 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700233 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800234 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235
236 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700237 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700240 self._stream_reader.feed_eof()
241
Guido van Rossum355491d2013-10-18 15:17:11 -0700242
243class StreamWriter:
244 """Wraps a Transport.
245
246 This exposes write(), writelines(), [can_]write_eof(),
247 get_extra_info() and close(). It adds drain() which returns an
248 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800249 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700250 directly.
251 """
252
253 def __init__(self, transport, protocol, reader, loop):
254 self._transport = transport
255 self._protocol = protocol
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200256 # drain() expects that the reader has a exception() method
257 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700258 self._reader = reader
259 self._loop = loop
260
Victor Stinneracdb7822014-07-14 18:33:40 +0200261 def __repr__(self):
262 info = [self.__class__.__name__, 'transport=%r' % self._transport]
263 if self._reader is not None:
264 info.append('reader=%r' % self._reader)
265 return '<%s>' % ' '.join(info)
266
Guido van Rossum355491d2013-10-18 15:17:11 -0700267 @property
268 def transport(self):
269 return self._transport
270
271 def write(self, data):
272 self._transport.write(data)
273
274 def writelines(self, data):
275 self._transport.writelines(data)
276
277 def write_eof(self):
278 return self._transport.write_eof()
279
280 def can_write_eof(self):
281 return self._transport.can_write_eof()
282
283 def close(self):
284 return self._transport.close()
285
286 def get_extra_info(self, name, default=None):
287 return self._transport.get_extra_info(name, default)
288
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200289 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700290 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200291 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700292
293 The intended use is to write
294
295 w.write(data)
296 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700297 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200298 if self._reader is not None:
299 exc = self._reader.exception()
300 if exc is not None:
301 raise exc
302 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303
304
305class StreamReader:
306
307 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
308 # The line length limit is a security feature;
309 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700310 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100312 self._loop = events.get_event_loop()
313 else:
314 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500315 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100316 self._eof = False # Whether we're done.
317 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 self._exception = None
319 self._transport = None
320 self._paused = False
321
322 def exception(self):
323 return self._exception
324
325 def set_exception(self, exc):
326 self._exception = exc
327
Guido van Rossum355491d2013-10-18 15:17:11 -0700328 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700330 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if not waiter.cancelled():
332 waiter.set_exception(exc)
333
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100334 def _wakeup_waiter(self):
335 """Wakeup read() or readline() function waiting for data or EOF."""
336 waiter = self._waiter
337 if waiter is not None:
338 self._waiter = None
339 if not waiter.cancelled():
340 waiter.set_result(None)
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 def set_transport(self, transport):
343 assert self._transport is None, 'Transport already set'
344 self._transport = transport
345
346 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500347 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700349 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350
351 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700352 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100353 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
Yury Selivanovf0020f52014-02-06 00:14:30 -0500355 def at_eof(self):
356 """Return True if the buffer is empty and 'feed_eof' was called."""
357 return self._eof and not self._buffer
358
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500360 assert not self._eof, 'feed_data after feed_eof'
361
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 if not data:
363 return
364
Yury Selivanove694c972014-02-05 18:11:13 -0500365 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100366 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
368 if (self._transport is not None and
369 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500370 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700372 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 except NotImplementedError:
374 # The transport can't be paused.
375 # We'll just have to buffer all data.
376 # Forget the transport so we don't keep trying.
377 self._transport = None
378 else:
379 self._paused = True
380
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100381 def _wait_for_data(self, func_name):
382 """Wait until feed_data() or feed_eof() is called."""
Victor Stinner183e3472014-01-23 17:40:03 +0100383 # StreamReader uses a future to link the protocol feed_data() method
384 # to a read coroutine. Running two read coroutines at the same time
385 # would have an unexpected behaviour. It would not possible to know
386 # which coroutine would get the next data.
387 if self._waiter is not None:
388 raise RuntimeError('%s() called while another coroutine is '
389 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100390
391 self._waiter = futures.Future(loop=self._loop)
392 try:
393 yield from self._waiter
394 finally:
395 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100396
Victor Stinnerf951d282014-06-29 00:46:45 +0200397 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 def readline(self):
399 if self._exception is not None:
400 raise self._exception
401
Yury Selivanove694c972014-02-05 18:11:13 -0500402 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 not_enough = True
404
405 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700406 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500407 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500409 line.extend(self._buffer)
410 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 else:
412 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500413 line.extend(self._buffer[:ichar])
414 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
Yury Selivanove694c972014-02-05 18:11:13 -0500417 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 self._maybe_resume_transport()
419 raise ValueError('Line is too long')
420
Guido van Rossum355491d2013-10-18 15:17:11 -0700421 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 break
423
424 if not_enough:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100425 yield from self._wait_for_data('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500428 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
Victor Stinnerf951d282014-06-29 00:46:45 +0200430 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 def read(self, n=-1):
432 if self._exception is not None:
433 raise self._exception
434
435 if not n:
436 return b''
437
438 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700439 # This used to just loop creating a new waiter hoping to
440 # collect everything in self._buffer, but that would
441 # deadlock if the subprocess sends more than self.limit
442 # bytes. So just call self.read(self._limit) until EOF.
443 blocks = []
444 while True:
445 block = yield from self.read(self._limit)
446 if not block:
447 break
448 blocks.append(block)
449 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500451 if not self._buffer and not self._eof:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100452 yield from self._wait_for_data('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Yury Selivanove694c972014-02-05 18:11:13 -0500454 if n < 0 or len(self._buffer) <= n:
455 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700456 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500457 else:
458 # n > 0 and len(self._buffer) > n
459 data = bytes(self._buffer[:n])
460 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
Yury Selivanove694c972014-02-05 18:11:13 -0500462 self._maybe_resume_transport()
463 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
Victor Stinnerf951d282014-06-29 00:46:45 +0200465 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 def readexactly(self, n):
467 if self._exception is not None:
468 raise self._exception
469
Guido van Rossum38455212014-01-06 16:09:18 -0800470 # There used to be "optimized" code here. It created its own
471 # Future and waited until self._buffer had at least the n
472 # bytes, then called read(n). Unfortunately, this could pause
473 # the transport if the argument was larger than the pause
474 # limit (which is twice self._limit). So now we just read()
475 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
Guido van Rossum38455212014-01-06 16:09:18 -0800477 blocks = []
478 while n > 0:
479 block = yield from self.read(n)
480 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100481 partial = b''.join(blocks)
482 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800483 blocks.append(block)
484 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Guido van Rossum38455212014-01-06 16:09:18 -0800486 return b''.join(blocks)