blob: baa9ec94439a1b21e84a5bf0351f4efbd041628f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Yury Selivanovd9d0e862016-01-11 12:28:19 -05006 'LimitOverrunError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossume3e786c2014-02-18 10:24:30 -080011if hasattr(socket, 'AF_UNIX'):
12 __all__.extend(['open_unix_connection', 'start_unix_server'])
13
Victor Stinnerf951d282014-06-29 00:46:45 +020014from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020017from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020018from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Yury Selivanovb4617912016-05-16 16:32:38 -040021_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050029 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010030 """
31 def __init__(self, partial, expected):
Yury Selivanovd9d0e862016-01-11 12:28:19 -050032 super().__init__("%d bytes read on a total of %r expected bytes"
33 % (len(partial), expected))
Victor Stinner8dffc452014-01-25 15:32:06 +010034 self.partial = partial
35 self.expected = expected
36
Yury Selivanov43605e62017-11-15 17:14:28 -050037 def __reduce__(self):
38 return type(self), (self.partial, self.expected)
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040042 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050043
44 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040045 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050046 """
47 def __init__(self, message, consumed):
48 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050049 self.consumed = consumed
50
Yury Selivanov43605e62017-11-15 17:14:28 -050051 def __reduce__(self):
52 return type(self), (self.args[0], self.consumed)
53
Yury Selivanovd9d0e862016-01-11 12:28:19 -050054
Andrew Svetlov5f841b52017-12-09 00:23:48 +020055async def open_connection(host=None, port=None, *,
56 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 """A wrapper for create_connection() returning a (reader, writer) pair.
58
59 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010060 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061
62 The arguments are all the usual arguments to create_connection()
63 except protocol_factory; most common are positional host and port,
64 with various optional keyword arguments following.
65
66 Additional optional keyword arguments are loop (to set the event loop
67 instance to use) and limit (to set the buffer limit passed to the
68 StreamReader).
69
70 (If you want to customize the StreamReader and/or
71 StreamReaderProtocol classes, just copy the code -- there's
72 really nothing special here except some convenience.)
73 """
74 if loop is None:
75 loop = events.get_event_loop()
76 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080077 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020078 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070080 writer = StreamWriter(transport, protocol, reader, loop)
81 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082
83
Andrew Svetlov5f841b52017-12-09 00:23:48 +020084async def start_server(client_connected_cb, host=None, port=None, *,
85 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080086 """Start a socket server, call back for each client connected.
87
88 The first parameter, `client_connected_cb`, takes two parameters:
89 client_reader, client_writer. client_reader is a StreamReader
90 object, while client_writer is a StreamWriter object. This
91 parameter can either be a plain callback function or a coroutine;
92 if it is a coroutine, it will be automatically converted into a
93 Task.
94
95 The rest of the arguments are all the usual arguments to
96 loop.create_server() except protocol_factory; most common are
97 positional host and port, with various optional keyword arguments
98 following. The return value is the same as loop.create_server().
99
100 Additional optional keyword arguments are loop (to set the event loop
101 instance to use) and limit (to set the buffer limit passed to the
102 StreamReader).
103
104 The return value is the same as loop.create_server(), i.e. a
105 Server object which can be used to stop the service.
106 """
107 if loop is None:
108 loop = events.get_event_loop()
109
110 def factory():
111 reader = StreamReader(limit=limit, loop=loop)
112 protocol = StreamReaderProtocol(reader, client_connected_cb,
113 loop=loop)
114 return protocol
115
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200116 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800117
118
Yury Selivanovb057c522014-02-18 12:15:06 -0500119if hasattr(socket, 'AF_UNIX'):
120 # UNIX Domain Sockets are supported on this platform
121
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200122 async def open_unix_connection(path=None, *,
123 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500124 """Similar to `open_connection` but works with UNIX Domain Sockets."""
125 if loop is None:
126 loop = events.get_event_loop()
127 reader = StreamReader(limit=limit, loop=loop)
128 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200129 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500130 lambda: protocol, path, **kwds)
131 writer = StreamWriter(transport, protocol, reader, loop)
132 return reader, writer
133
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200134 async def start_unix_server(client_connected_cb, path=None, *,
135 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500136 """Similar to `start_server` but works with UNIX Domain Sockets."""
137 if loop is None:
138 loop = events.get_event_loop()
139
140 def factory():
141 reader = StreamReader(limit=limit, loop=loop)
142 protocol = StreamReaderProtocol(reader, client_connected_cb,
143 loop=loop)
144 return protocol
145
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200146 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500147
148
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800149class FlowControlMixin(protocols.Protocol):
150 """Reusable flow control logic for StreamWriter.drain().
151
152 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800153 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800154 these it must call the super methods.
155
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200156 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800157 """
158
159 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100160 if loop is None:
161 self._loop = events.get_event_loop()
162 else:
163 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800164 self._paused = False
165 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200166 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800167
168 def pause_writing(self):
169 assert not self._paused
170 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200171 if self._loop.get_debug():
172 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800173
174 def resume_writing(self):
175 assert self._paused
176 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200177 if self._loop.get_debug():
178 logger.debug("%r resumes writing", self)
179
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800180 waiter = self._drain_waiter
181 if waiter is not None:
182 self._drain_waiter = None
183 if not waiter.done():
184 waiter.set_result(None)
185
186 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200187 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800188 # Wake up the writer if currently paused.
189 if not self._paused:
190 return
191 waiter = self._drain_waiter
192 if waiter is None:
193 return
194 self._drain_waiter = None
195 if waiter.done():
196 return
197 if exc is None:
198 waiter.set_result(None)
199 else:
200 waiter.set_exception(exc)
201
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200202 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200203 if self._connection_lost:
204 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200206 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800207 waiter = self._drain_waiter
208 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400209 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800212
213
214class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
215 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216
217 (This is a helper class instead of making StreamReader itself a
218 Protocol subclass, because the StreamReader has other potential
219 uses, and to prevent the user of the StreamReader to accidentally
220 call inappropriate methods of the protocol.)
221 """
222
Guido van Rossum1540b162013-11-19 11:43:38 -0800223 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800224 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800226 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400228 self._over_ssl = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400232 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800233 if self._client_connected_cb is not None:
234 self._stream_writer = StreamWriter(transport, self,
235 self._stream_reader,
236 self._loop)
237 res = self._client_connected_cb(self._stream_reader,
238 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200239 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200240 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400243 if self._stream_reader is not None:
244 if exc is None:
245 self._stream_reader.feed_eof()
246 else:
247 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800248 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400249 self._stream_reader = None
250 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700253 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
255 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700256 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400257 if self._over_ssl:
258 # Prevent a warning in SSLProtocol.eof_received:
259 # "returning true from eof_received()
260 # has no effect when using ssl"
261 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200262 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700263
Guido van Rossum355491d2013-10-18 15:17:11 -0700264
265class StreamWriter:
266 """Wraps a Transport.
267
268 This exposes write(), writelines(), [can_]write_eof(),
269 get_extra_info() and close(). It adds drain() which returns an
270 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800271 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700272 directly.
273 """
274
275 def __init__(self, transport, protocol, reader, loop):
276 self._transport = transport
277 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000278 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200279 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700280 self._reader = reader
281 self._loop = loop
282
Victor Stinneracdb7822014-07-14 18:33:40 +0200283 def __repr__(self):
Victor Stinner406204c2015-01-15 21:50:19 +0100284 info = [self.__class__.__name__, 'transport=%r' % self._transport]
Victor Stinneracdb7822014-07-14 18:33:40 +0200285 if self._reader is not None:
286 info.append('reader=%r' % self._reader)
287 return '<%s>' % ' '.join(info)
288
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 @property
290 def transport(self):
291 return self._transport
292
293 def write(self, data):
294 self._transport.write(data)
295
296 def writelines(self, data):
297 self._transport.writelines(data)
298
299 def write_eof(self):
300 return self._transport.write_eof()
301
302 def can_write_eof(self):
303 return self._transport.can_write_eof()
304
Victor Stinner406204c2015-01-15 21:50:19 +0100305 def close(self):
306 return self._transport.close()
307
Guido van Rossum355491d2013-10-18 15:17:11 -0700308 def get_extra_info(self, name, default=None):
309 return self._transport.get_extra_info(name, default)
310
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200311 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200312 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700313
314 The intended use is to write
315
316 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200317 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700318 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200319 if self._reader is not None:
320 exc = self._reader.exception()
321 if exc is not None:
322 raise exc
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700323 if self._transport is not None:
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500324 if self._transport.is_closing():
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700325 # Yield to the event loop so connection_lost() may be
326 # called. Without this, _drain_helper() would return
327 # immediately, and code that calls
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200328 # write(...); await drain()
Guido van Rossumc44ecdf2015-10-19 11:49:30 -0700329 # in a loop would never call connection_lost(), so it
330 # would not see an error when the socket is closed.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200331 await sleep(0, loop=self._loop)
332 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333
334
335class StreamReader:
336
337 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
338 # The line length limit is a security feature;
339 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500340
341 if limit <= 0:
342 raise ValueError('Limit cannot be <= 0')
343
Guido van Rossum355491d2013-10-18 15:17:11 -0700344 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100346 self._loop = events.get_event_loop()
347 else:
348 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500349 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100350 self._eof = False # Whether we're done.
351 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 self._exception = None
353 self._transport = None
354 self._paused = False
355
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200356 def __repr__(self):
357 info = ['StreamReader']
358 if self._buffer:
Andrew Svetlovd94c1b92015-09-29 18:36:00 +0300359 info.append('%d bytes' % len(self._buffer))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200360 if self._eof:
361 info.append('eof')
362 if self._limit != _DEFAULT_LIMIT:
363 info.append('l=%d' % self._limit)
364 if self._waiter:
365 info.append('w=%r' % self._waiter)
366 if self._exception:
367 info.append('e=%r' % self._exception)
368 if self._transport:
369 info.append('t=%r' % self._transport)
370 if self._paused:
371 info.append('paused')
372 return '<%s>' % ' '.join(info)
373
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 def exception(self):
375 return self._exception
376
377 def set_exception(self, exc):
378 self._exception = exc
379
Guido van Rossum355491d2013-10-18 15:17:11 -0700380 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700382 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 if not waiter.cancelled():
384 waiter.set_exception(exc)
385
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100386 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500387 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100388 waiter = self._waiter
389 if waiter is not None:
390 self._waiter = None
391 if not waiter.cancelled():
392 waiter.set_result(None)
393
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 def set_transport(self, transport):
395 assert self._transport is None, 'Transport already set'
396 self._transport = transport
397
398 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500399 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700401 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
403 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700404 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100405 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Yury Selivanovf0020f52014-02-06 00:14:30 -0500407 def at_eof(self):
408 """Return True if the buffer is empty and 'feed_eof' was called."""
409 return self._eof and not self._buffer
410
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500412 assert not self._eof, 'feed_data after feed_eof'
413
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 if not data:
415 return
416
Yury Selivanove694c972014-02-05 18:11:13 -0500417 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100418 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
420 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400421 not self._paused and
422 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700424 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 except NotImplementedError:
426 # The transport can't be paused.
427 # We'll just have to buffer all data.
428 # Forget the transport so we don't keep trying.
429 self._transport = None
430 else:
431 self._paused = True
432
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200433 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500434 """Wait until feed_data() or feed_eof() is called.
435
436 If stream was paused, automatically resume it.
437 """
Victor Stinner183e3472014-01-23 17:40:03 +0100438 # StreamReader uses a future to link the protocol feed_data() method
439 # to a read coroutine. Running two read coroutines at the same time
440 # would have an unexpected behaviour. It would not possible to know
441 # which coroutine would get the next data.
442 if self._waiter is not None:
443 raise RuntimeError('%s() called while another coroutine is '
444 'already waiting for incoming data' % func_name)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100445
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500446 assert not self._eof, '_wait_for_data after EOF'
447
448 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400449 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500450 if self._paused:
451 self._paused = False
452 self._transport.resume_reading()
453
Yury Selivanov7661db62016-05-16 15:38:39 -0400454 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100455 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200456 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100457 finally:
458 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100459
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200460 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500461 """Read chunk of data from the stream until newline (b'\n') is found.
462
463 On success, return chunk that ends with newline. If only partial
464 line can be read due to EOF, return incomplete line without
465 terminating newline. When EOF was reached while no bytes read, empty
466 bytes object is returned.
467
468 If limit is reached, ValueError will be raised. In that case, if
469 newline was found, complete line including newline will be removed
470 from internal buffer. Else, internal buffer will be cleared. Limit is
471 compared against part of the line without newline.
472
473 If stream was paused, this function will automatically resume it if
474 needed.
475 """
476 sep = b'\n'
477 seplen = len(sep)
478 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200479 line = await self.readuntil(sep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500480 except IncompleteReadError as e:
481 return e.partial
482 except LimitOverrunError as e:
483 if self._buffer.startswith(sep, e.consumed):
484 del self._buffer[:e.consumed + seplen]
485 else:
486 self._buffer.clear()
487 self._maybe_resume_transport()
488 raise ValueError(e.args[0])
489 return line
490
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200491 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400492 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500493
Yury Selivanovb4617912016-05-16 16:32:38 -0400494 On success, the data and separator will be removed from the
495 internal buffer (consumed). Returned data will include the
496 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500497
Yury Selivanovb4617912016-05-16 16:32:38 -0400498 Configured stream limit is used to check result. Limit sets the
499 maximal length of data that can be returned, not counting the
500 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500501
Yury Selivanovb4617912016-05-16 16:32:38 -0400502 If an EOF occurs and the complete separator is still not found,
503 an IncompleteReadError exception will be raised, and the internal
504 buffer will be reset. The IncompleteReadError.partial attribute
505 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500506
Yury Selivanovb4617912016-05-16 16:32:38 -0400507 If the data cannot be read because of over limit, a
508 LimitOverrunError exception will be raised, and the data
509 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500510 """
511 seplen = len(separator)
512 if seplen == 0:
513 raise ValueError('Separator should be at least one-byte string')
514
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 if self._exception is not None:
516 raise self._exception
517
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500518 # Consume whole buffer except last bytes, which length is
519 # one less than seplen. Let's check corner cases with
520 # separator='SEPARATOR':
521 # * we have received almost complete separator (without last
522 # byte). i.e buffer='some textSEPARATO'. In this case we
523 # can safely consume len(separator) - 1 bytes.
524 # * last byte of buffer is first byte of separator, i.e.
525 # buffer='abcdefghijklmnopqrS'. We may safely consume
526 # everything except that last byte, but this require to
527 # analyze bytes of buffer that match partial separator.
528 # This is slow and/or require FSM. For this case our
529 # implementation is not optimal, since require rescanning
530 # of data that is known to not belong to separator. In
531 # real world, separator will not be so long to notice
532 # performance problems. Even when reading MIME-encoded
533 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
Yury Selivanovb4617912016-05-16 16:32:38 -0400535 # `offset` is the number of bytes from the beginning of the buffer
536 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500537 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500539 # Loop until we find `separator` in the buffer, exceed the buffer size,
540 # or an EOF has happened.
541 while True:
542 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500544 # Check if we now have enough data in the buffer for `separator` to
545 # fit.
546 if buflen - offset >= seplen:
547 isep = self._buffer.find(separator, offset)
548
549 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400550 # `separator` is in the buffer. `isep` will be used later
551 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500552 break
553
554 # see upper comment for explanation.
555 offset = buflen + 1 - seplen
556 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400557 raise LimitOverrunError(
558 'Separator is not found, and chunk exceed the limit',
559 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500560
561 # Complete message (with full separator) may be present in buffer
562 # even when EOF flag is set. This may happen when the last chunk
563 # adds data which makes separator be found. That's why we check for
564 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700565 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500566 chunk = bytes(self._buffer)
567 self._buffer.clear()
568 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500570 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200571 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500573 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400574 raise LimitOverrunError(
575 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576
577 chunk = self._buffer[:isep + seplen]
578 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500580 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200582 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500583 """Read up to `n` bytes from the stream.
584
585 If n is not provided, or set to -1, read until EOF and return all read
586 bytes. If the EOF was received and the internal buffer is empty, return
587 an empty bytes object.
588
Martin Panter0be894b2016-09-07 12:03:06 +0000589 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500590
591 If n is positive, this function try to read `n` bytes, and may return
592 less or equal bytes than requested, but at least one byte. If EOF was
593 received before any byte is read, this function returns empty byte
594 object.
595
Yury Selivanovb4617912016-05-16 16:32:38 -0400596 Returned value is not limited with limit, configured at stream
597 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500598
599 If stream was paused, this function will automatically resume it if
600 needed.
601 """
602
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 if self._exception is not None:
604 raise self._exception
605
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500606 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 return b''
608
609 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700610 # This used to just loop creating a new waiter hoping to
611 # collect everything in self._buffer, but that would
612 # deadlock if the subprocess sends more than self.limit
613 # bytes. So just call self.read(self._limit) until EOF.
614 blocks = []
615 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200616 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700617 if not block:
618 break
619 blocks.append(block)
620 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500622 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200623 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500624
625 # This will work right even if buffer is less than n bytes
626 data = bytes(self._buffer[:n])
627 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
Yury Selivanove694c972014-02-05 18:11:13 -0500629 self._maybe_resume_transport()
630 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200632 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500633 """Read exactly `n` bytes.
634
Yury Selivanovb4617912016-05-16 16:32:38 -0400635 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
636 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500637 contain the partial read bytes.
638
639 if n is zero, return empty bytes object.
640
Yury Selivanovb4617912016-05-16 16:32:38 -0400641 Returned value is not limited with limit, configured at stream
642 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500643
644 If stream was paused, this function will automatically resume it if
645 needed.
646 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500647 if n < 0:
648 raise ValueError('readexactly size can not be less than zero')
649
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650 if self._exception is not None:
651 raise self._exception
652
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500653 if n == 0:
654 return b''
655
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400656 while len(self._buffer) < n:
657 if self._eof:
658 incomplete = bytes(self._buffer)
659 self._buffer.clear()
660 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200662 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400664 if len(self._buffer) == n:
665 data = bytes(self._buffer)
666 self._buffer.clear()
667 else:
668 data = bytes(self._buffer[:n])
669 del self._buffer[:n]
670 self._maybe_resume_transport()
671 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400672
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400673 def __aiter__(self):
674 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400675
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200676 async def __anext__(self):
677 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400678 if val == b'':
679 raise StopAsyncIteration
680 return val