blob: bb9fb313af7b36544167420aec13c47d417f1bf8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Victor Stinner71080fc2015-07-25 02:23:21 +020014from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
16from . import futures
17from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
22_DEFAULT_LIMIT = 2**16
23
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Victor Stinner8dffc452014-01-25 15:32:06 +010025class IncompleteReadError(EOFError):
26 """
27 Incomplete read error. Attributes:
28
29 - partial: read bytes string before the end of stream was reached
30 - expected: total number of expected bytes
31 """
32 def __init__(self, partial, expected):
33 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
34 % (len(partial), expected))
35 self.partial = partial
36 self.expected = expected
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Victor Stinnerf951d282014-06-29 00:46:45 +020039@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040def open_connection(host=None, port=None, *,
41 loop=None, limit=_DEFAULT_LIMIT, **kwds):
42 """A wrapper for create_connection() returning a (reader, writer) pair.
43
44 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010045 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
47 The arguments are all the usual arguments to create_connection()
48 except protocol_factory; most common are positional host and port,
49 with various optional keyword arguments following.
50
51 Additional optional keyword arguments are loop (to set the event loop
52 instance to use) and limit (to set the buffer limit passed to the
53 StreamReader).
54
55 (If you want to customize the StreamReader and/or
56 StreamReaderProtocol classes, just copy the code -- there's
57 really nothing special here except some convenience.)
58 """
59 if loop is None:
60 loop = events.get_event_loop()
61 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080062 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 transport, _ = yield from loop.create_connection(
64 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070065 writer = StreamWriter(transport, protocol, reader, loop)
66 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
68
Victor Stinnerf951d282014-06-29 00:46:45 +020069@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080070def start_server(client_connected_cb, host=None, port=None, *,
71 loop=None, limit=_DEFAULT_LIMIT, **kwds):
72 """Start a socket server, call back for each client connected.
73
74 The first parameter, `client_connected_cb`, takes two parameters:
75 client_reader, client_writer. client_reader is a StreamReader
76 object, while client_writer is a StreamWriter object. This
77 parameter can either be a plain callback function or a coroutine;
78 if it is a coroutine, it will be automatically converted into a
79 Task.
80
81 The rest of the arguments are all the usual arguments to
82 loop.create_server() except protocol_factory; most common are
83 positional host and port, with various optional keyword arguments
84 following. The return value is the same as loop.create_server().
85
86 Additional optional keyword arguments are loop (to set the event loop
87 instance to use) and limit (to set the buffer limit passed to the
88 StreamReader).
89
90 The return value is the same as loop.create_server(), i.e. a
91 Server object which can be used to stop the service.
92 """
93 if loop is None:
94 loop = events.get_event_loop()
95
96 def factory():
97 reader = StreamReader(limit=limit, loop=loop)
98 protocol = StreamReaderProtocol(reader, client_connected_cb,
99 loop=loop)
100 return protocol
101
102 return (yield from loop.create_server(factory, host, port, **kwds))
103
104
Yury Selivanovb057c522014-02-18 12:15:06 -0500105if hasattr(socket, 'AF_UNIX'):
106 # UNIX Domain Sockets are supported on this platform
107
Victor Stinnerf951d282014-06-29 00:46:45 +0200108 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500109 def open_unix_connection(path=None, *,
110 loop=None, limit=_DEFAULT_LIMIT, **kwds):
111 """Similar to `open_connection` but works with UNIX Domain Sockets."""
112 if loop is None:
113 loop = events.get_event_loop()
114 reader = StreamReader(limit=limit, loop=loop)
115 protocol = StreamReaderProtocol(reader, loop=loop)
116 transport, _ = yield from loop.create_unix_connection(
117 lambda: protocol, path, **kwds)
118 writer = StreamWriter(transport, protocol, reader, loop)
119 return reader, writer
120
121
Victor Stinnerf951d282014-06-29 00:46:45 +0200122 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500123 def start_unix_server(client_connected_cb, path=None, *,
124 loop=None, limit=_DEFAULT_LIMIT, **kwds):
125 """Similar to `start_server` but works with UNIX Domain Sockets."""
126 if loop is None:
127 loop = events.get_event_loop()
128
129 def factory():
130 reader = StreamReader(limit=limit, loop=loop)
131 protocol = StreamReaderProtocol(reader, client_connected_cb,
132 loop=loop)
133 return protocol
134
135 return (yield from loop.create_unix_server(factory, path, **kwds))
136
137
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800138class FlowControlMixin(protocols.Protocol):
139 """Reusable flow control logic for StreamWriter.drain().
140
141 This implements the protocol methods pause_writing(),
142 resume_reading() and connection_lost(). If the subclass overrides
143 these it must call the super methods.
144
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200145 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800146 """
147
148 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100149 if loop is None:
150 self._loop = events.get_event_loop()
151 else:
152 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153 self._paused = False
154 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156
157 def pause_writing(self):
158 assert not self._paused
159 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200160 if self._loop.get_debug():
161 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162
163 def resume_writing(self):
164 assert self._paused
165 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200166 if self._loop.get_debug():
167 logger.debug("%r resumes writing", self)
168
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800169 waiter = self._drain_waiter
170 if waiter is not None:
171 self._drain_waiter = None
172 if not waiter.done():
173 waiter.set_result(None)
174
175 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200176 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177 # Wake up the writer if currently paused.
178 if not self._paused:
179 return
180 waiter = self._drain_waiter
181 if waiter is None:
182 return
183 self._drain_waiter = None
184 if waiter.done():
185 return
186 if exc is None:
187 waiter.set_result(None)
188 else:
189 waiter.set_exception(exc)
190
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200191 @coroutine
192 def _drain_helper(self):
193 if self._connection_lost:
194 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800195 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200196 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800197 waiter = self._drain_waiter
198 assert waiter is None or waiter.cancelled()
199 waiter = futures.Future(loop=self._loop)
200 self._drain_waiter = waiter
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 yield from waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800202
203
204class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
205 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 (This is a helper class instead of making StreamReader itself a
208 Protocol subclass, because the StreamReader has other potential
209 uses, and to prevent the user of the StreamReader to accidentally
210 call inappropriate methods of the protocol.)
211 """
212
Guido van Rossum1540b162013-11-19 11:43:38 -0800213 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800214 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700215 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800216 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800217 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
219 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700220 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800221 if self._client_connected_cb is not None:
222 self._stream_writer = StreamWriter(transport, self,
223 self._stream_reader,
224 self._loop)
225 res = self._client_connected_cb(self._stream_reader,
226 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200227 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200228 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def connection_lost(self, exc):
231 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700232 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800235 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236
237 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700238 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239
240 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700241 self._stream_reader.feed_eof()
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200242 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700243
Guido van Rossum355491d2013-10-18 15:17:11 -0700244
245class StreamWriter:
246 """Wraps a Transport.
247
248 This exposes write(), writelines(), [can_]write_eof(),
249 get_extra_info() and close(). It adds drain() which returns an
250 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800251 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700252 directly.
253 """
254
255 def __init__(self, transport, protocol, reader, loop):
256 self._transport = transport
257 self._protocol = protocol
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200258 # drain() expects that the reader has a exception() method
259 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700260 self._reader = reader
261 self._loop = loop
262
Victor Stinneracdb7822014-07-14 18:33:40 +0200263 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100264 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200265 if self._reader is not None:
266 info.append('reader=%r' % self._reader)
267 return '<%s>' % ' '.join(info)
268
Guido van Rossum355491d2013-10-18 15:17:11 -0700269 @property
270 def transport(self):
271 return self._transport
272
273 def write(self, data):
274 self._transport.write(data)
275
276 def writelines(self, data):
277 self._transport.writelines(data)
278
279 def write_eof(self):
280 return self._transport.write_eof()
281
282 def can_write_eof(self):
283 return self._transport.can_write_eof()
284
Victor Stinner406204c2015-01-15 21:50:19 +0100285 def close(self):
286 return self._transport.close()
287
Guido van Rossum355491d2013-10-18 15:17:11 -0700288 def get_extra_info(self, name, default=None):
289 return self._transport.get_extra_info(name, default)
290
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200291 @coroutine
Guido van Rossum355491d2013-10-18 15:17:11 -0700292 def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200293 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700294
295 The intended use is to write
296
297 w.write(data)
298 yield from w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700299 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200300 if self._reader is not None:
301 exc = self._reader.exception()
302 if exc is not None:
303 raise exc
304 yield from self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
306
307class StreamReader:
308
309 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
310 # The line length limit is a security feature;
311 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100314 self._loop = events.get_event_loop()
315 else:
316 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500317 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100318 self._eof = False # Whether we're done.
319 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 self._exception = None
321 self._transport = None
322 self._paused = False
323
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200324 def __repr__(self):
325 info = ['StreamReader']
326 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300327 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200328 if self._eof:
329 info.append('eof')
330 if self._limit != _DEFAULT_LIMIT:
331 info.append('l=%d' % self._limit)
332 if self._waiter:
333 info.append('w=%r' % self._waiter)
334 if self._exception:
335 info.append('e=%r' % self._exception)
336 if self._transport:
337 info.append('t=%r' % self._transport)
338 if self._paused:
339 info.append('paused')
340 return '<%s>' % ' '.join(info)
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 def exception(self):
343 return self._exception
344
345 def set_exception(self, exc):
346 self._exception = exc
347
Guido van Rossum355491d2013-10-18 15:17:11 -0700348 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700350 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 if not waiter.cancelled():
352 waiter.set_exception(exc)
353
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100354 def _wakeup_waiter(self):
355 """Wakeup read() or readline() function waiting for data or EOF."""
356 waiter = self._waiter
357 if waiter is not None:
358 self._waiter = None
359 if not waiter.cancelled():
360 waiter.set_result(None)
361
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 def set_transport(self, transport):
363 assert self._transport is None, 'Transport already set'
364 self._transport = transport
365
366 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500367 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700369 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700372 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100373 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
Yury Selivanovf0020f52014-02-06 00:14:30 -0500375 def at_eof(self):
376 """Return True if the buffer is empty and 'feed_eof' was called."""
377 return self._eof and not self._buffer
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500380 assert not self._eof, 'feed_data after feed_eof'
381
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 if not data:
383 return
384
Yury Selivanove694c972014-02-05 18:11:13 -0500385 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100386 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
388 if (self._transport is not None and
389 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500390 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700392 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 except NotImplementedError:
394 # The transport can't be paused.
395 # We'll just have to buffer all data.
396 # Forget the transport so we don't keep trying.
397 self._transport = None
398 else:
399 self._paused = True
400
Victor Stinnerd6dc7bd2015-03-18 11:37:42 +0100401 @coroutine
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100402 def _wait_for_data(self, func_name):
403 """Wait until feed_data() or feed_eof() is called."""
Victor Stinner183e3472014-01-23 17:40:03 +0100404 # StreamReader uses a future to link the protocol feed_data() method
405 # to a read coroutine. Running two read coroutines at the same time
406 # would have an unexpected behaviour. It would not possible to know
407 # which coroutine would get the next data.
408 if self._waiter is not None:
409 raise RuntimeError('%s() called while another coroutine is '
410 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100411
412 self._waiter = futures.Future(loop=self._loop)
413 try:
414 yield from self._waiter
415 finally:
416 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100417
Victor Stinnerf951d282014-06-29 00:46:45 +0200418 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 def readline(self):
420 if self._exception is not None:
421 raise self._exception
422
Yury Selivanove694c972014-02-05 18:11:13 -0500423 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 not_enough = True
425
426 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700427 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500428 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500430 line.extend(self._buffer)
431 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 else:
433 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500434 line.extend(self._buffer[:ichar])
435 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
Yury Selivanove694c972014-02-05 18:11:13 -0500438 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 self._maybe_resume_transport()
440 raise ValueError('Line is too long')
441
Guido van Rossum355491d2013-10-18 15:17:11 -0700442 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 break
444
445 if not_enough:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100446 yield from self._wait_for_data('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500449 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Victor Stinnerf951d282014-06-29 00:46:45 +0200451 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 def read(self, n=-1):
453 if self._exception is not None:
454 raise self._exception
455
456 if not n:
457 return b''
458
459 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700460 # This used to just loop creating a new waiter hoping to
461 # collect everything in self._buffer, but that would
462 # deadlock if the subprocess sends more than self.limit
463 # bytes. So just call self.read(self._limit) until EOF.
464 blocks = []
465 while True:
466 block = yield from self.read(self._limit)
467 if not block:
468 break
469 blocks.append(block)
470 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500472 if not self._buffer and not self._eof:
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100473 yield from self._wait_for_data('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
Yury Selivanove694c972014-02-05 18:11:13 -0500475 if n < 0 or len(self._buffer) <= n:
476 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700477 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500478 else:
479 # n > 0 and len(self._buffer) > n
480 data = bytes(self._buffer[:n])
481 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482
Yury Selivanove694c972014-02-05 18:11:13 -0500483 self._maybe_resume_transport()
484 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Victor Stinnerf951d282014-06-29 00:46:45 +0200486 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 def readexactly(self, n):
488 if self._exception is not None:
489 raise self._exception
490
Guido van Rossum38455212014-01-06 16:09:18 -0800491 # There used to be "optimized" code here. It created its own
492 # Future and waited until self._buffer had at least the n
493 # bytes, then called read(n). Unfortunately, this could pause
494 # the transport if the argument was larger than the pause
495 # limit (which is twice self._limit). So now we just read()
496 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
Guido van Rossum38455212014-01-06 16:09:18 -0800498 blocks = []
499 while n > 0:
500 block = yield from self.read(n)
501 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100502 partial = b''.join(blocks)
503 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800504 blocks.append(block)
505 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
Guido van Rossum38455212014-01-06 16:09:18 -0800507 return b''.join(blocks)
Yury Selivanovd08c3632015-05-13 15:15:56 -0400508
Victor Stinner71080fc2015-07-25 02:23:21 +0200509 if compat.PY35:
Yury Selivanovd08c3632015-05-13 15:15:56 -0400510 @coroutine
511 def __aiter__(self):
512 return self
513
514 @coroutine
515 def __anext__(self):
516 val = yield from self.readline()
517 if val == b'':
518 raise StopAsyncIteration
519 return val