blob: 176c65e3969cb4bfc11c5e90e8aabadc41d58cf9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
Yury Selivanovd08c3632015-05-13 15:15:56 -04009import sys
Yury Selivanovb057c522014-02-18 12:15:06 -050010
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
16from . import futures
17from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
22_DEFAULT_LIMIT = 2**16
Yury Selivanovd08c3632015-05-13 15:15:56 -040023_PY35 = sys.version_info >= (3, 5)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
Guido van Rossuma849be92014-01-30 16:05:28 -080025
Victor Stinner8dffc452014-01-25 15:32:06 +010026class IncompleteReadError(EOFError):
27 """
28 Incomplete read error. Attributes:
29
30 - partial: read bytes string before the end of stream was reached
31 - expected: total number of expected bytes
32 """
33 def __init__(self, partial, expected):
34 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
35 % (len(partial), expected))
36 self.partial = partial
37 self.expected = expected
38
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
Victor Stinnerf951d282014-06-29 00:46:45 +020040@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041def open_connection(host=None, port=None, *,
42 loop=None, limit=_DEFAULT_LIMIT, **kwds):
43 """A wrapper for create_connection() returning a (reader, writer) pair.
44
45 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010046 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 The arguments are all the usual arguments to create_connection()
49 except protocol_factory; most common are positional host and port,
50 with various optional keyword arguments following.
51
52 Additional optional keyword arguments are loop (to set the event loop
53 instance to use) and limit (to set the buffer limit passed to the
54 StreamReader).
55
56 (If you want to customize the StreamReader and/or
57 StreamReaderProtocol classes, just copy the code -- there's
58 really nothing special here except some convenience.)
59 """
60 if loop is None:
61 loop = events.get_event_loop()
62 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080063 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 transport, _ = yield from loop.create_connection(
65 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070066 writer = StreamWriter(transport, protocol, reader, loop)
67 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068
69
Victor Stinnerf951d282014-06-29 00:46:45 +020070@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080071def start_server(client_connected_cb, host=None, port=None, *,
72 loop=None, limit=_DEFAULT_LIMIT, **kwds):
73 """Start a socket server, call back for each client connected.
74
75 The first parameter, `client_connected_cb`, takes two parameters:
76 client_reader, client_writer. client_reader is a StreamReader
77 object, while client_writer is a StreamWriter object. This
78 parameter can either be a plain callback function or a coroutine;
79 if it is a coroutine, it will be automatically converted into a
80 Task.
81
82 The rest of the arguments are all the usual arguments to
83 loop.create_server() except protocol_factory; most common are
84 positional host and port, with various optional keyword arguments
85 following. The return value is the same as loop.create_server().
86
87 Additional optional keyword arguments are loop (to set the event loop
88 instance to use) and limit (to set the buffer limit passed to the
89 StreamReader).
90
91 The return value is the same as loop.create_server(), i.e. a
92 Server object which can be used to stop the service.
93 """
94 if loop is None:
95 loop = events.get_event_loop()
96
97 def factory():
98 reader = StreamReader(limit=limit, loop=loop)
99 protocol = StreamReaderProtocol(reader, client_connected_cb,
100 loop=loop)
101 return protocol
102
103 return (yield from loop.create_server(factory, host, port, **kwds))
104
105
Yury Selivanovb057c522014-02-18 12:15:06 -0500106if hasattr(socket, 'AF_UNIX'):
107 # UNIX Domain Sockets are supported on this platform
108
Victor Stinnerf951d282014-06-29 00:46:45 +0200109 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500110 def open_unix_connection(path=None, *,
111 loop=None, limit=_DEFAULT_LIMIT, **kwds):
112 """Similar to `open_connection` but works with UNIX Domain Sockets."""
113 if loop is None:
114 loop = events.get_event_loop()
115 reader = StreamReader(limit=limit, loop=loop)
116 protocol = StreamReaderProtocol(reader, loop=loop)
117 transport, _ = yield from loop.create_unix_connection(
118 lambda: protocol, path, **kwds)
119 writer = StreamWriter(transport, protocol, reader, loop)
120 return reader, writer
121
122
Victor Stinnerf951d282014-06-29 00:46:45 +0200123 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500124 def start_unix_server(client_connected_cb, path=None, *,
125 loop=None, limit=_DEFAULT_LIMIT, **kwds):
126 """Similar to `start_server` but works with UNIX Domain Sockets."""
127 if loop is None:
128 loop = events.get_event_loop()
129
130 def factory():
131 reader = StreamReader(limit=limit, loop=loop)
132 protocol = StreamReaderProtocol(reader, client_connected_cb,
133 loop=loop)
134 return protocol
135
136 return (yield from loop.create_unix_server(factory, path, **kwds))
137
138
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800139class FlowControlMixin(protocols.Protocol):
140 """Reusable flow control logic for StreamWriter.drain().
141
142 This implements the protocol methods pause_writing(),
143 resume_reading() and connection_lost(). If the subclass overrides
144 these it must call the super methods.
145
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200146 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800147 """
148
149 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100150 if loop is None:
151 self._loop = events.get_event_loop()
152 else:
153 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800154 self._paused = False
155 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200156 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800157
158 def pause_writing(self):
159 assert not self._paused
160 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200161 if self._loop.get_debug():
162 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800163
164 def resume_writing(self):
165 assert self._paused
166 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200167 if self._loop.get_debug():
168 logger.debug("%r resumes writing", self)
169
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800170 waiter = self._drain_waiter
171 if waiter is not None:
172 self._drain_waiter = None
173 if not waiter.done():
174 waiter.set_result(None)
175
176 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200177 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800178 # Wake up the writer if currently paused.
179 if not self._paused:
180 return
181 waiter = self._drain_waiter
182 if waiter is None:
183 return
184 self._drain_waiter = None
185 if waiter.done():
186 return
187 if exc is None:
188 waiter.set_result(None)
189 else:
190 waiter.set_exception(exc)
191
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 @coroutine
193 def _drain_helper(self):
194 if self._connection_lost:
195 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200197 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800198 waiter = self._drain_waiter
199 assert waiter is None or waiter.cancelled()
200 waiter = futures.Future(loop=self._loop)
201 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200202 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800203
204
205class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
206 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
208 (This is a helper class instead of making StreamReader itself a
209 Protocol subclass, because the StreamReader has other potential
210 uses, and to prevent the user of the StreamReader to accidentally
211 call inappropriate methods of the protocol.)
212 """
213
Guido van Rossum1540b162013-11-19 11:43:38 -0800214 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800215 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700216 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800217 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800218 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219
220 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700221 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800222 if self._client_connected_cb is not None:
223 self._stream_writer = StreamWriter(transport, self,
224 self._stream_reader,
225 self._loop)
226 res = self._client_connected_cb(self._stream_reader,
227 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200228 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200229 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230
231 def connection_lost(self, exc):
232 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700233 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700235 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800236 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
238 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700239 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240
241 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700242 self._stream_reader.feed_eof()
243
Guido van Rossum355491d2013-10-18 15:17:11 -0700244
245class StreamWriter:
246 """Wraps a Transport.
247
248 This exposes write(), writelines(), [can_]write_eof(),
249 get_extra_info() and close(). It adds drain() which returns an
250 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800251 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700252 directly.
253 """
254
255 def __init__(self, transport, protocol, reader, loop):
256 self._transport = transport
257 self._protocol = protocol
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200258 # drain() expects that the reader has a exception() method
259 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700260 self._reader = reader
261 self._loop = loop
262
Victor Stinneracdb7822014-07-14 18:33:40 +0200263 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100264 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200265 if self._reader is not None:
266 info.append('reader=%r' % self._reader)
267 return '<%s>' % ' '.join(info)
268
Guido van Rossum355491d2013-10-18 15:17:11 -0700269 @property
270 def transport(self):
271 return self._transport
272
273 def write(self, data):
274 self._transport.write(data)
275
276 def writelines(self, data):
277 self._transport.writelines(data)
278
279 def write_eof(self):
280 return self._transport.write_eof()
281
282 def can_write_eof(self):
283 return self._transport.can_write_eof()
284
Victor Stinner406204c2015-01-15 21:50:19 +0100285 def close(self):
286 return self._transport.close()
287
Guido van Rossum355491d2013-10-18 15:17:11 -0700288 def get_extra_info(self, name, default=None):
289 return self._transport.get_extra_info(name, default)
290
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200291 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700292 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200293 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700294
295 The intended use is to write
296
297 w.write(data)
298 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700299 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200300 if self._reader is not None:
301 exc = self._reader.exception()
302 if exc is not None:
303 raise exc
304 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
306
307class StreamReader:
308
309 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
310 # The line length limit is a security feature;
311 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100314 self._loop = events.get_event_loop()
315 else:
316 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500317 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100318 self._eof = False # Whether we're done.
319 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 self._exception = None
321 self._transport = None
322 self._paused = False
323
324 def exception(self):
325 return self._exception
326
327 def set_exception(self, exc):
328 self._exception = exc
329
Guido van Rossum355491d2013-10-18 15:17:11 -0700330 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700332 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 if not waiter.cancelled():
334 waiter.set_exception(exc)
335
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100336 def _wakeup_waiter(self):
337 """Wakeup read() or readline() function waiting for data or EOF."""
338 waiter = self._waiter
339 if waiter is not None:
340 self._waiter = None
341 if not waiter.cancelled():
342 waiter.set_result(None)
343
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 def set_transport(self, transport):
345 assert self._transport is None, 'Transport already set'
346 self._transport = transport
347
348 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500349 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700351 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352
353 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700354 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100355 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Yury Selivanovf0020f52014-02-06 00:14:30 -0500357 def at_eof(self):
358 """Return True if the buffer is empty and 'feed_eof' was called."""
359 return self._eof and not self._buffer
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500362 assert not self._eof, 'feed_data after feed_eof'
363
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 if not data:
365 return
366
Yury Selivanove694c972014-02-05 18:11:13 -0500367 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100368 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 if (self._transport is not None and
371 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500372 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700374 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 except NotImplementedError:
376 # The transport can't be paused.
377 # We'll just have to buffer all data.
378 # Forget the transport so we don't keep trying.
379 self._transport = None
380 else:
381 self._paused = True
382
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100383 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100384 def _wait_for_data(self, func_name):
385 """Wait until feed_data() or feed_eof() is called."""
Victor Stinner183e3472014-01-23 17:40:03 +0100386 # StreamReader uses a future to link the protocol feed_data() method
387 # to a read coroutine. Running two read coroutines at the same time
388 # would have an unexpected behaviour. It would not possible to know
389 # which coroutine would get the next data.
390 if self._waiter is not None:
391 raise RuntimeError('%s() called while another coroutine is '
392 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100393
394 self._waiter = futures.Future(loop=self._loop)
395 try:
396 yield from self._waiter
397 finally:
398 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100399
Victor Stinnerf951d282014-06-29 00:46:45 +0200400 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 def readline(self):
402 if self._exception is not None:
403 raise self._exception
404
Yury Selivanove694c972014-02-05 18:11:13 -0500405 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 not_enough = True
407
408 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700409 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500410 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500412 line.extend(self._buffer)
413 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 else:
415 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500416 line.extend(self._buffer[:ichar])
417 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
Yury Selivanove694c972014-02-05 18:11:13 -0500420 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._maybe_resume_transport()
422 raise ValueError('Line is too long')
423
Guido van Rossum355491d2013-10-18 15:17:11 -0700424 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 break
426
427 if not_enough:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100428 yield from self._wait_for_data('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500431 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
Victor Stinnerf951d282014-06-29 00:46:45 +0200433 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def read(self, n=-1):
435 if self._exception is not None:
436 raise self._exception
437
438 if not n:
439 return b''
440
441 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700442 # This used to just loop creating a new waiter hoping to
443 # collect everything in self._buffer, but that would
444 # deadlock if the subprocess sends more than self.limit
445 # bytes. So just call self.read(self._limit) until EOF.
446 blocks = []
447 while True:
448 block = yield from self.read(self._limit)
449 if not block:
450 break
451 blocks.append(block)
452 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500454 if not self._buffer and not self._eof:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100455 yield from self._wait_for_data('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
Yury Selivanove694c972014-02-05 18:11:13 -0500457 if n < 0 or len(self._buffer) <= n:
458 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700459 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500460 else:
461 # n > 0 and len(self._buffer) > n
462 data = bytes(self._buffer[:n])
463 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
Yury Selivanove694c972014-02-05 18:11:13 -0500465 self._maybe_resume_transport()
466 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
Victor Stinnerf951d282014-06-29 00:46:45 +0200468 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 def readexactly(self, n):
470 if self._exception is not None:
471 raise self._exception
472
Guido van Rossum38455212014-01-06 16:09:18 -0800473 # There used to be "optimized" code here. It created its own
474 # Future and waited until self._buffer had at least the n
475 # bytes, then called read(n). Unfortunately, this could pause
476 # the transport if the argument was larger than the pause
477 # limit (which is twice self._limit). So now we just read()
478 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
Guido van Rossum38455212014-01-06 16:09:18 -0800480 blocks = []
481 while n > 0:
482 block = yield from self.read(n)
483 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100484 partial = b''.join(blocks)
485 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800486 blocks.append(block)
487 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488
Guido van Rossum38455212014-01-06 16:09:18 -0800489 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400490
491 if _PY35:
492 @coroutine
493 def __aiter__(self):
494 return self
495
496 @coroutine
497 def __anext__(self):
498 val = yield from self.readline()
499 if val == b'':
500 raise StopAsyncIteration
501 return val