blob: 9a53ee482478135f0431f9dfbdbe8953b51d2887 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server',
4 'IncompleteReadError', 'LimitOverrunError',
5)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Yury Selivanovb057c522014-02-18 12:15:06 -05007import socket
8
Guido van Rossume3e786c2014-02-18 10:24:30 -08009if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050010 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080011
Victor Stinnerf951d282014-06-29 00:46:45 +020012from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020015from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
Yury Selivanovb4617912016-05-16 16:32:38 -040019_DEFAULT_LIMIT = 2 ** 16
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
Guido van Rossuma849be92014-01-30 16:05:28 -080021
Victor Stinner8dffc452014-01-25 15:32:06 +010022class IncompleteReadError(EOFError):
23 """
24 Incomplete read error. Attributes:
25
26 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050027 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010028 """
29 def __init__(self, partial, expected):
Yury Selivanov6370f342017-12-10 18:36:12 -050030 super().__init__(f'{len(partial)} bytes read on a total of '
31 f'{expected!r} expected bytes')
Victor Stinner8dffc452014-01-25 15:32:06 +010032 self.partial = partial
33 self.expected = expected
34
Yury Selivanov43605e62017-11-15 17:14:28 -050035 def __reduce__(self):
36 return type(self), (self.partial, self.expected)
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
Yury Selivanov43605e62017-11-15 17:14:28 -050049 def __reduce__(self):
50 return type(self), (self.args[0], self.consumed)
51
Yury Selivanovd9d0e862016-01-11 12:28:19 -050052
Andrew Svetlov5f841b52017-12-09 00:23:48 +020053async def open_connection(host=None, port=None, *,
54 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 """A wrapper for create_connection() returning a (reader, writer) pair.
56
57 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010058 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60 The arguments are all the usual arguments to create_connection()
61 except protocol_factory; most common are positional host and port,
62 with various optional keyword arguments following.
63
64 Additional optional keyword arguments are loop (to set the event loop
65 instance to use) and limit (to set the buffer limit passed to the
66 StreamReader).
67
68 (If you want to customize the StreamReader and/or
69 StreamReaderProtocol classes, just copy the code -- there's
70 really nothing special here except some convenience.)
71 """
72 if loop is None:
73 loop = events.get_event_loop()
74 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080075 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020076 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070078 writer = StreamWriter(transport, protocol, reader, loop)
79 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
81
Andrew Svetlov5f841b52017-12-09 00:23:48 +020082async def start_server(client_connected_cb, host=None, port=None, *,
83 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080084 """Start a socket server, call back for each client connected.
85
86 The first parameter, `client_connected_cb`, takes two parameters:
87 client_reader, client_writer. client_reader is a StreamReader
88 object, while client_writer is a StreamWriter object. This
89 parameter can either be a plain callback function or a coroutine;
90 if it is a coroutine, it will be automatically converted into a
91 Task.
92
93 The rest of the arguments are all the usual arguments to
94 loop.create_server() except protocol_factory; most common are
95 positional host and port, with various optional keyword arguments
96 following. The return value is the same as loop.create_server().
97
98 Additional optional keyword arguments are loop (to set the event loop
99 instance to use) and limit (to set the buffer limit passed to the
100 StreamReader).
101
102 The return value is the same as loop.create_server(), i.e. a
103 Server object which can be used to stop the service.
104 """
105 if loop is None:
106 loop = events.get_event_loop()
107
108 def factory():
109 reader = StreamReader(limit=limit, loop=loop)
110 protocol = StreamReaderProtocol(reader, client_connected_cb,
111 loop=loop)
112 return protocol
113
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200114 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800115
116
Yury Selivanovb057c522014-02-18 12:15:06 -0500117if hasattr(socket, 'AF_UNIX'):
118 # UNIX Domain Sockets are supported on this platform
119
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200120 async def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200127 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200132 async def start_unix_server(client_connected_cb, path=None, *,
133 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500134 """Similar to `start_server` but works with UNIX Domain Sockets."""
135 if loop is None:
136 loop = events.get_event_loop()
137
138 def factory():
139 reader = StreamReader(limit=limit, loop=loop)
140 protocol = StreamReaderProtocol(reader, client_connected_cb,
141 loop=loop)
142 return protocol
143
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200144 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500145
146
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800147class FlowControlMixin(protocols.Protocol):
148 """Reusable flow control logic for StreamWriter.drain().
149
150 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800151 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152 these it must call the super methods.
153
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200154 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800155 """
156
157 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100158 if loop is None:
159 self._loop = events.get_event_loop()
160 else:
161 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162 self._paused = False
163 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200164 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800165
166 def pause_writing(self):
167 assert not self._paused
168 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200169 if self._loop.get_debug():
170 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800171
172 def resume_writing(self):
173 assert self._paused
174 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200175 if self._loop.get_debug():
176 logger.debug("%r resumes writing", self)
177
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800178 waiter = self._drain_waiter
179 if waiter is not None:
180 self._drain_waiter = None
181 if not waiter.done():
182 waiter.set_result(None)
183
184 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200185 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800186 # Wake up the writer if currently paused.
187 if not self._paused:
188 return
189 waiter = self._drain_waiter
190 if waiter is None:
191 return
192 self._drain_waiter = None
193 if waiter.done():
194 return
195 if exc is None:
196 waiter.set_result(None)
197 else:
198 waiter.set_exception(exc)
199
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 if self._connection_lost:
202 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800203 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200204 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 waiter = self._drain_waiter
206 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400207 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800208 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200209 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210
211
212class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
213 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
215 (This is a helper class instead of making StreamReader itself a
216 Protocol subclass, because the StreamReader has other potential
217 uses, and to prevent the user of the StreamReader to accidentally
218 call inappropriate methods of the protocol.)
219 """
220
Guido van Rossum1540b162013-11-19 11:43:38 -0800221 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800222 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700223 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800224 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800225 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400226 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200227 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
229 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700230 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400231 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800232 if self._client_connected_cb is not None:
233 self._stream_writer = StreamWriter(transport, self,
234 self._stream_reader,
235 self._loop)
236 res = self._client_connected_cb(self._stream_reader,
237 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200238 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200239 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240
241 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400242 if self._stream_reader is not None:
243 if exc is None:
244 self._stream_reader.feed_eof()
245 else:
246 self._stream_reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200247 if not self._closed.done():
248 if exc is None:
249 self._closed.set_result(None)
250 else:
251 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800252 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400253 self._stream_reader = None
254 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255
256 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700257 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258
259 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700260 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400261 if self._over_ssl:
262 # Prevent a warning in SSLProtocol.eof_received:
263 # "returning true from eof_received()
264 # has no effect when using ssl"
265 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200266 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700267
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200268 def __del__(self):
269 # Prevent reports about unhandled exceptions.
270 # Better than self._closed._log_traceback = False hack
271 closed = self._closed
272 if closed.done() and not closed.cancelled():
273 closed.exception()
274
Guido van Rossum355491d2013-10-18 15:17:11 -0700275
276class StreamWriter:
277 """Wraps a Transport.
278
279 This exposes write(), writelines(), [can_]write_eof(),
280 get_extra_info() and close(). It adds drain() which returns an
281 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800282 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700283 directly.
284 """
285
286 def __init__(self, transport, protocol, reader, loop):
287 self._transport = transport
288 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000289 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200290 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700291 self._reader = reader
292 self._loop = loop
293
Victor Stinneracdb7822014-07-14 18:33:40 +0200294 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500295 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200296 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500297 info.append(f'reader={self._reader!r}')
298 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200299
Guido van Rossum355491d2013-10-18 15:17:11 -0700300 @property
301 def transport(self):
302 return self._transport
303
304 def write(self, data):
305 self._transport.write(data)
306
307 def writelines(self, data):
308 self._transport.writelines(data)
309
310 def write_eof(self):
311 return self._transport.write_eof()
312
313 def can_write_eof(self):
314 return self._transport.can_write_eof()
315
Victor Stinner406204c2015-01-15 21:50:19 +0100316 def close(self):
317 return self._transport.close()
318
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200319 def is_closing(self):
320 return self._transport.is_closing()
321
322 async def wait_closed(self):
323 await self._protocol._closed
324
Guido van Rossum355491d2013-10-18 15:17:11 -0700325 def get_extra_info(self, name, default=None):
326 return self._transport.get_extra_info(name, default)
327
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200328 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200329 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700330
331 The intended use is to write
332
333 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200334 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700335 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200336 if self._reader is not None:
337 exc = self._reader.exception()
338 if exc is not None:
339 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200340 if self._transport.is_closing():
341 # Yield to the event loop so connection_lost() may be
342 # called. Without this, _drain_helper() would return
343 # immediately, and code that calls
344 # write(...); await drain()
345 # in a loop would never call connection_lost(), so it
346 # would not see an error when the socket is closed.
347 await sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200348 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350
351class StreamReader:
352
353 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
354 # The line length limit is a security feature;
355 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500356
357 if limit <= 0:
358 raise ValueError('Limit cannot be <= 0')
359
Guido van Rossum355491d2013-10-18 15:17:11 -0700360 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100362 self._loop = events.get_event_loop()
363 else:
364 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500365 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100366 self._eof = False # Whether we're done.
367 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._exception = None
369 self._transport = None
370 self._paused = False
371
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200372 def __repr__(self):
373 info = ['StreamReader']
374 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500375 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200376 if self._eof:
377 info.append('eof')
378 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500379 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200380 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500381 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200382 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500383 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200384 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500385 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200386 if self._paused:
387 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500388 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200389
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 def exception(self):
391 return self._exception
392
393 def set_exception(self, exc):
394 self._exception = exc
395
Guido van Rossum355491d2013-10-18 15:17:11 -0700396 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700398 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 if not waiter.cancelled():
400 waiter.set_exception(exc)
401
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100402 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500403 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100404 waiter = self._waiter
405 if waiter is not None:
406 self._waiter = None
407 if not waiter.cancelled():
408 waiter.set_result(None)
409
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 def set_transport(self, transport):
411 assert self._transport is None, 'Transport already set'
412 self._transport = transport
413
414 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500415 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700417 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
419 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700420 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100421 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
Yury Selivanovf0020f52014-02-06 00:14:30 -0500423 def at_eof(self):
424 """Return True if the buffer is empty and 'feed_eof' was called."""
425 return self._eof and not self._buffer
426
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500428 assert not self._eof, 'feed_data after feed_eof'
429
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 if not data:
431 return
432
Yury Selivanove694c972014-02-05 18:11:13 -0500433 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100434 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435
436 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400437 not self._paused and
438 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700440 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 except NotImplementedError:
442 # The transport can't be paused.
443 # We'll just have to buffer all data.
444 # Forget the transport so we don't keep trying.
445 self._transport = None
446 else:
447 self._paused = True
448
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200449 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500450 """Wait until feed_data() or feed_eof() is called.
451
452 If stream was paused, automatically resume it.
453 """
Victor Stinner183e3472014-01-23 17:40:03 +0100454 # StreamReader uses a future to link the protocol feed_data() method
455 # to a read coroutine. Running two read coroutines at the same time
456 # would have an unexpected behaviour. It would not possible to know
457 # which coroutine would get the next data.
458 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500459 raise RuntimeError(
460 f'{func_name}() called while another coroutine is '
461 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100462
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500463 assert not self._eof, '_wait_for_data after EOF'
464
465 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400466 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500467 if self._paused:
468 self._paused = False
469 self._transport.resume_reading()
470
Yury Selivanov7661db62016-05-16 15:38:39 -0400471 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100472 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200473 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100474 finally:
475 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100476
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200477 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500478 """Read chunk of data from the stream until newline (b'\n') is found.
479
480 On success, return chunk that ends with newline. If only partial
481 line can be read due to EOF, return incomplete line without
482 terminating newline. When EOF was reached while no bytes read, empty
483 bytes object is returned.
484
485 If limit is reached, ValueError will be raised. In that case, if
486 newline was found, complete line including newline will be removed
487 from internal buffer. Else, internal buffer will be cleared. Limit is
488 compared against part of the line without newline.
489
490 If stream was paused, this function will automatically resume it if
491 needed.
492 """
493 sep = b'\n'
494 seplen = len(sep)
495 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200496 line = await self.readuntil(sep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500497 except IncompleteReadError as e:
498 return e.partial
499 except LimitOverrunError as e:
500 if self._buffer.startswith(sep, e.consumed):
501 del self._buffer[:e.consumed + seplen]
502 else:
503 self._buffer.clear()
504 self._maybe_resume_transport()
505 raise ValueError(e.args[0])
506 return line
507
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200508 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400509 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500510
Yury Selivanovb4617912016-05-16 16:32:38 -0400511 On success, the data and separator will be removed from the
512 internal buffer (consumed). Returned data will include the
513 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500514
Yury Selivanovb4617912016-05-16 16:32:38 -0400515 Configured stream limit is used to check result. Limit sets the
516 maximal length of data that can be returned, not counting the
517 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500518
Yury Selivanovb4617912016-05-16 16:32:38 -0400519 If an EOF occurs and the complete separator is still not found,
520 an IncompleteReadError exception will be raised, and the internal
521 buffer will be reset. The IncompleteReadError.partial attribute
522 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500523
Yury Selivanovb4617912016-05-16 16:32:38 -0400524 If the data cannot be read because of over limit, a
525 LimitOverrunError exception will be raised, and the data
526 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500527 """
528 seplen = len(separator)
529 if seplen == 0:
530 raise ValueError('Separator should be at least one-byte string')
531
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 if self._exception is not None:
533 raise self._exception
534
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500535 # Consume whole buffer except last bytes, which length is
536 # one less than seplen. Let's check corner cases with
537 # separator='SEPARATOR':
538 # * we have received almost complete separator (without last
539 # byte). i.e buffer='some textSEPARATO'. In this case we
540 # can safely consume len(separator) - 1 bytes.
541 # * last byte of buffer is first byte of separator, i.e.
542 # buffer='abcdefghijklmnopqrS'. We may safely consume
543 # everything except that last byte, but this require to
544 # analyze bytes of buffer that match partial separator.
545 # This is slow and/or require FSM. For this case our
546 # implementation is not optimal, since require rescanning
547 # of data that is known to not belong to separator. In
548 # real world, separator will not be so long to notice
549 # performance problems. Even when reading MIME-encoded
550 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551
Yury Selivanovb4617912016-05-16 16:32:38 -0400552 # `offset` is the number of bytes from the beginning of the buffer
553 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500554 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500556 # Loop until we find `separator` in the buffer, exceed the buffer size,
557 # or an EOF has happened.
558 while True:
559 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561 # Check if we now have enough data in the buffer for `separator` to
562 # fit.
563 if buflen - offset >= seplen:
564 isep = self._buffer.find(separator, offset)
565
566 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400567 # `separator` is in the buffer. `isep` will be used later
568 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500569 break
570
571 # see upper comment for explanation.
572 offset = buflen + 1 - seplen
573 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400574 raise LimitOverrunError(
575 'Separator is not found, and chunk exceed the limit',
576 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500577
578 # Complete message (with full separator) may be present in buffer
579 # even when EOF flag is set. This may happen when the last chunk
580 # adds data which makes separator be found. That's why we check for
581 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700582 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500583 chunk = bytes(self._buffer)
584 self._buffer.clear()
585 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500587 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200588 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500590 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400591 raise LimitOverrunError(
592 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500593
594 chunk = self._buffer[:isep + seplen]
595 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500597 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200599 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500600 """Read up to `n` bytes from the stream.
601
602 If n is not provided, or set to -1, read until EOF and return all read
603 bytes. If the EOF was received and the internal buffer is empty, return
604 an empty bytes object.
605
Martin Panter0be894b2016-09-07 12:03:06 +0000606 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500607
608 If n is positive, this function try to read `n` bytes, and may return
609 less or equal bytes than requested, but at least one byte. If EOF was
610 received before any byte is read, this function returns empty byte
611 object.
612
Yury Selivanovb4617912016-05-16 16:32:38 -0400613 Returned value is not limited with limit, configured at stream
614 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500615
616 If stream was paused, this function will automatically resume it if
617 needed.
618 """
619
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 if self._exception is not None:
621 raise self._exception
622
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500623 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 return b''
625
626 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700627 # This used to just loop creating a new waiter hoping to
628 # collect everything in self._buffer, but that would
629 # deadlock if the subprocess sends more than self.limit
630 # bytes. So just call self.read(self._limit) until EOF.
631 blocks = []
632 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200633 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700634 if not block:
635 break
636 blocks.append(block)
637 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500639 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200640 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500641
642 # This will work right even if buffer is less than n bytes
643 data = bytes(self._buffer[:n])
644 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645
Yury Selivanove694c972014-02-05 18:11:13 -0500646 self._maybe_resume_transport()
647 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200649 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500650 """Read exactly `n` bytes.
651
Yury Selivanovb4617912016-05-16 16:32:38 -0400652 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
653 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500654 contain the partial read bytes.
655
656 if n is zero, return empty bytes object.
657
Yury Selivanovb4617912016-05-16 16:32:38 -0400658 Returned value is not limited with limit, configured at stream
659 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500660
661 If stream was paused, this function will automatically resume it if
662 needed.
663 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500664 if n < 0:
665 raise ValueError('readexactly size can not be less than zero')
666
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 if self._exception is not None:
668 raise self._exception
669
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500670 if n == 0:
671 return b''
672
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400673 while len(self._buffer) < n:
674 if self._eof:
675 incomplete = bytes(self._buffer)
676 self._buffer.clear()
677 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200679 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400681 if len(self._buffer) == n:
682 data = bytes(self._buffer)
683 self._buffer.clear()
684 else:
685 data = bytes(self._buffer[:n])
686 del self._buffer[:n]
687 self._maybe_resume_transport()
688 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400689
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400690 def __aiter__(self):
691 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400692
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200693 async def __anext__(self):
694 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400695 if val == b'':
696 raise StopAsyncIteration
697 return val