blob: 50badd6b24799aa86bd25329d6d6068facd6d689 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
3 'open_connection', 'start_server',
4 'IncompleteReadError', 'LimitOverrunError',
5)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Yury Selivanovb057c522014-02-18 12:15:06 -05007import socket
8
Guido van Rossume3e786c2014-02-18 10:24:30 -08009if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050010 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080011
Victor Stinnerf951d282014-06-29 00:46:45 +020012from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020015from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020016from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
18
Miss Islington (bot)441afbd2018-05-29 07:21:46 -070019_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
Guido van Rossuma849be92014-01-30 16:05:28 -080021
Victor Stinner8dffc452014-01-25 15:32:06 +010022class IncompleteReadError(EOFError):
23 """
24 Incomplete read error. Attributes:
25
26 - partial: read bytes string before the end of stream was reached
Yury Selivanovd9d0e862016-01-11 12:28:19 -050027 - expected: total number of expected bytes (or None if unknown)
Victor Stinner8dffc452014-01-25 15:32:06 +010028 """
29 def __init__(self, partial, expected):
Yury Selivanov6370f342017-12-10 18:36:12 -050030 super().__init__(f'{len(partial)} bytes read on a total of '
31 f'{expected!r} expected bytes')
Victor Stinner8dffc452014-01-25 15:32:06 +010032 self.partial = partial
33 self.expected = expected
34
Yury Selivanov43605e62017-11-15 17:14:28 -050035 def __reduce__(self):
36 return type(self), (self.partial, self.expected)
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Yury Selivanovd9d0e862016-01-11 12:28:19 -050039class LimitOverrunError(Exception):
Yury Selivanovb4617912016-05-16 16:32:38 -040040 """Reached the buffer limit while looking for a separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050041
42 Attributes:
Yury Selivanovb4617912016-05-16 16:32:38 -040043 - consumed: total number of to be consumed bytes.
Yury Selivanovd9d0e862016-01-11 12:28:19 -050044 """
45 def __init__(self, message, consumed):
46 super().__init__(message)
Yury Selivanovd9d0e862016-01-11 12:28:19 -050047 self.consumed = consumed
48
Yury Selivanov43605e62017-11-15 17:14:28 -050049 def __reduce__(self):
50 return type(self), (self.args[0], self.consumed)
51
Yury Selivanovd9d0e862016-01-11 12:28:19 -050052
Andrew Svetlov5f841b52017-12-09 00:23:48 +020053async def open_connection(host=None, port=None, *,
54 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 """A wrapper for create_connection() returning a (reader, writer) pair.
56
57 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010058 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60 The arguments are all the usual arguments to create_connection()
61 except protocol_factory; most common are positional host and port,
62 with various optional keyword arguments following.
63
64 Additional optional keyword arguments are loop (to set the event loop
65 instance to use) and limit (to set the buffer limit passed to the
66 StreamReader).
67
68 (If you want to customize the StreamReader and/or
69 StreamReaderProtocol classes, just copy the code -- there's
70 really nothing special here except some convenience.)
71 """
72 if loop is None:
73 loop = events.get_event_loop()
74 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080075 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020076 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070078 writer = StreamWriter(transport, protocol, reader, loop)
79 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
81
Andrew Svetlov5f841b52017-12-09 00:23:48 +020082async def start_server(client_connected_cb, host=None, port=None, *,
83 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080084 """Start a socket server, call back for each client connected.
85
86 The first parameter, `client_connected_cb`, takes two parameters:
87 client_reader, client_writer. client_reader is a StreamReader
88 object, while client_writer is a StreamWriter object. This
89 parameter can either be a plain callback function or a coroutine;
90 if it is a coroutine, it will be automatically converted into a
91 Task.
92
93 The rest of the arguments are all the usual arguments to
94 loop.create_server() except protocol_factory; most common are
95 positional host and port, with various optional keyword arguments
96 following. The return value is the same as loop.create_server().
97
98 Additional optional keyword arguments are loop (to set the event loop
99 instance to use) and limit (to set the buffer limit passed to the
100 StreamReader).
101
102 The return value is the same as loop.create_server(), i.e. a
103 Server object which can be used to stop the service.
104 """
105 if loop is None:
106 loop = events.get_event_loop()
107
108 def factory():
109 reader = StreamReader(limit=limit, loop=loop)
110 protocol = StreamReaderProtocol(reader, client_connected_cb,
111 loop=loop)
112 return protocol
113
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200114 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -0800115
116
Yury Selivanovb057c522014-02-18 12:15:06 -0500117if hasattr(socket, 'AF_UNIX'):
118 # UNIX Domain Sockets are supported on this platform
119
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200120 async def open_unix_connection(path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 """Similar to `open_connection` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200127 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500128 lambda: protocol, path, **kwds)
129 writer = StreamWriter(transport, protocol, reader, loop)
130 return reader, writer
131
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200132 async def start_unix_server(client_connected_cb, path=None, *,
133 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500134 """Similar to `start_server` but works with UNIX Domain Sockets."""
135 if loop is None:
136 loop = events.get_event_loop()
137
138 def factory():
139 reader = StreamReader(limit=limit, loop=loop)
140 protocol = StreamReaderProtocol(reader, client_connected_cb,
141 loop=loop)
142 return protocol
143
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200144 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500145
146
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800147class FlowControlMixin(protocols.Protocol):
148 """Reusable flow control logic for StreamWriter.drain().
149
150 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800151 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800152 these it must call the super methods.
153
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200154 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800155 """
156
157 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100158 if loop is None:
159 self._loop = events.get_event_loop()
160 else:
161 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162 self._paused = False
163 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200164 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800165
166 def pause_writing(self):
167 assert not self._paused
168 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200169 if self._loop.get_debug():
170 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800171
172 def resume_writing(self):
173 assert self._paused
174 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200175 if self._loop.get_debug():
176 logger.debug("%r resumes writing", self)
177
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800178 waiter = self._drain_waiter
179 if waiter is not None:
180 self._drain_waiter = None
181 if not waiter.done():
182 waiter.set_result(None)
183
184 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200185 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800186 # Wake up the writer if currently paused.
187 if not self._paused:
188 return
189 waiter = self._drain_waiter
190 if waiter is None:
191 return
192 self._drain_waiter = None
193 if waiter.done():
194 return
195 if exc is None:
196 waiter.set_result(None)
197 else:
198 waiter.set_exception(exc)
199
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200201 if self._connection_lost:
202 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800203 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200204 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800205 waiter = self._drain_waiter
206 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400207 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800208 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200209 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800210
Miss Islington (bot)93aa57a2019-05-07 14:48:35 -0700211 def _get_close_waiter(self, stream):
212 raise NotImplementedError
213
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800214
215class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
216 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218 (This is a helper class instead of making StreamReader itself a
219 Protocol subclass, because the StreamReader has other potential
220 uses, and to prevent the user of the StreamReader to accidentally
221 call inappropriate methods of the protocol.)
222 """
223
Guido van Rossum1540b162013-11-19 11:43:38 -0800224 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800225 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700226 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800227 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800228 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400229 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200230 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231
232 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700233 self._stream_reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400234 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800235 if self._client_connected_cb is not None:
236 self._stream_writer = StreamWriter(transport, self,
237 self._stream_reader,
238 self._loop)
239 res = self._client_connected_cb(self._stream_reader,
240 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200241 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200242 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243
244 def connection_lost(self, exc):
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400245 if self._stream_reader is not None:
246 if exc is None:
247 self._stream_reader.feed_eof()
248 else:
249 self._stream_reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200250 if not self._closed.done():
251 if exc is None:
252 self._closed.set_result(None)
253 else:
254 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800255 super().connection_lost(exc)
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400256 self._stream_reader = None
257 self._stream_writer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258
259 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700260 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
262 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700263 self._stream_reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400264 if self._over_ssl:
265 # Prevent a warning in SSLProtocol.eof_received:
266 # "returning true from eof_received()
267 # has no effect when using ssl"
268 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200269 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700270
Miss Islington (bot)93aa57a2019-05-07 14:48:35 -0700271 def _get_close_waiter(self, stream):
272 return self._closed
273
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200274 def __del__(self):
275 # Prevent reports about unhandled exceptions.
276 # Better than self._closed._log_traceback = False hack
277 closed = self._closed
278 if closed.done() and not closed.cancelled():
279 closed.exception()
280
Guido van Rossum355491d2013-10-18 15:17:11 -0700281
282class StreamWriter:
283 """Wraps a Transport.
284
285 This exposes write(), writelines(), [can_]write_eof(),
286 get_extra_info() and close(). It adds drain() which returns an
287 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800288 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 directly.
290 """
291
292 def __init__(self, transport, protocol, reader, loop):
293 self._transport = transport
294 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000295 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200296 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700297 self._reader = reader
298 self._loop = loop
299
Victor Stinneracdb7822014-07-14 18:33:40 +0200300 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500301 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200302 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500303 info.append(f'reader={self._reader!r}')
304 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200305
Guido van Rossum355491d2013-10-18 15:17:11 -0700306 @property
307 def transport(self):
308 return self._transport
309
310 def write(self, data):
311 self._transport.write(data)
312
313 def writelines(self, data):
314 self._transport.writelines(data)
315
316 def write_eof(self):
317 return self._transport.write_eof()
318
319 def can_write_eof(self):
320 return self._transport.can_write_eof()
321
Victor Stinner406204c2015-01-15 21:50:19 +0100322 def close(self):
323 return self._transport.close()
324
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200325 def is_closing(self):
326 return self._transport.is_closing()
327
328 async def wait_closed(self):
Miss Islington (bot)93aa57a2019-05-07 14:48:35 -0700329 await self._protocol._get_close_waiter(self)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200330
Guido van Rossum355491d2013-10-18 15:17:11 -0700331 def get_extra_info(self, name, default=None):
332 return self._transport.get_extra_info(name, default)
333
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200334 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200335 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700336
337 The intended use is to write
338
339 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200340 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700341 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200342 if self._reader is not None:
343 exc = self._reader.exception()
344 if exc is not None:
345 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200346 if self._transport.is_closing():
Miss Islington (bot)93aa57a2019-05-07 14:48:35 -0700347 # Wait for protocol.connection_lost() call
348 # Raise connection closing error if any,
349 # ConnectionResetError otherwise
350 fut = self._protocol._get_close_waiter(self)
351 await fut
352 raise ConnectionResetError('Connection lost')
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200353 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355
356class StreamReader:
357
358 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
359 # The line length limit is a security feature;
360 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500361
362 if limit <= 0:
363 raise ValueError('Limit cannot be <= 0')
364
Guido van Rossum355491d2013-10-18 15:17:11 -0700365 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100367 self._loop = events.get_event_loop()
368 else:
369 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500370 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100371 self._eof = False # Whether we're done.
372 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._exception = None
374 self._transport = None
375 self._paused = False
376
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200377 def __repr__(self):
378 info = ['StreamReader']
379 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500380 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200381 if self._eof:
382 info.append('eof')
383 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500384 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200385 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500386 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200387 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500388 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200389 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500390 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200391 if self._paused:
392 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500393 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200394
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 def exception(self):
396 return self._exception
397
398 def set_exception(self, exc):
399 self._exception = exc
400
Guido van Rossum355491d2013-10-18 15:17:11 -0700401 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700403 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if not waiter.cancelled():
405 waiter.set_exception(exc)
406
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100407 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500408 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100409 waiter = self._waiter
410 if waiter is not None:
411 self._waiter = None
412 if not waiter.cancelled():
413 waiter.set_result(None)
414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 def set_transport(self, transport):
416 assert self._transport is None, 'Transport already set'
417 self._transport = transport
418
419 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500420 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700422 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
424 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700425 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100426 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
Yury Selivanovf0020f52014-02-06 00:14:30 -0500428 def at_eof(self):
429 """Return True if the buffer is empty and 'feed_eof' was called."""
430 return self._eof and not self._buffer
431
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500433 assert not self._eof, 'feed_data after feed_eof'
434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 if not data:
436 return
437
Yury Selivanove694c972014-02-05 18:11:13 -0500438 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100439 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400442 not self._paused and
443 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700445 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 except NotImplementedError:
447 # The transport can't be paused.
448 # We'll just have to buffer all data.
449 # Forget the transport so we don't keep trying.
450 self._transport = None
451 else:
452 self._paused = True
453
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200454 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500455 """Wait until feed_data() or feed_eof() is called.
456
457 If stream was paused, automatically resume it.
458 """
Victor Stinner183e3472014-01-23 17:40:03 +0100459 # StreamReader uses a future to link the protocol feed_data() method
460 # to a read coroutine. Running two read coroutines at the same time
461 # would have an unexpected behaviour. It would not possible to know
462 # which coroutine would get the next data.
463 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500464 raise RuntimeError(
465 f'{func_name}() called while another coroutine is '
466 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100467
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500468 assert not self._eof, '_wait_for_data after EOF'
469
470 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400471 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500472 if self._paused:
473 self._paused = False
474 self._transport.resume_reading()
475
Yury Selivanov7661db62016-05-16 15:38:39 -0400476 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100477 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200478 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100479 finally:
480 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100481
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200482 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500483 """Read chunk of data from the stream until newline (b'\n') is found.
484
485 On success, return chunk that ends with newline. If only partial
486 line can be read due to EOF, return incomplete line without
487 terminating newline. When EOF was reached while no bytes read, empty
488 bytes object is returned.
489
490 If limit is reached, ValueError will be raised. In that case, if
491 newline was found, complete line including newline will be removed
492 from internal buffer. Else, internal buffer will be cleared. Limit is
493 compared against part of the line without newline.
494
495 If stream was paused, this function will automatically resume it if
496 needed.
497 """
498 sep = b'\n'
499 seplen = len(sep)
500 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200501 line = await self.readuntil(sep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500502 except IncompleteReadError as e:
503 return e.partial
504 except LimitOverrunError as e:
505 if self._buffer.startswith(sep, e.consumed):
506 del self._buffer[:e.consumed + seplen]
507 else:
508 self._buffer.clear()
509 self._maybe_resume_transport()
510 raise ValueError(e.args[0])
511 return line
512
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200513 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400514 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500515
Yury Selivanovb4617912016-05-16 16:32:38 -0400516 On success, the data and separator will be removed from the
517 internal buffer (consumed). Returned data will include the
518 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500519
Yury Selivanovb4617912016-05-16 16:32:38 -0400520 Configured stream limit is used to check result. Limit sets the
521 maximal length of data that can be returned, not counting the
522 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500523
Yury Selivanovb4617912016-05-16 16:32:38 -0400524 If an EOF occurs and the complete separator is still not found,
525 an IncompleteReadError exception will be raised, and the internal
526 buffer will be reset. The IncompleteReadError.partial attribute
527 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500528
Yury Selivanovb4617912016-05-16 16:32:38 -0400529 If the data cannot be read because of over limit, a
530 LimitOverrunError exception will be raised, and the data
531 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500532 """
533 seplen = len(separator)
534 if seplen == 0:
535 raise ValueError('Separator should be at least one-byte string')
536
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 if self._exception is not None:
538 raise self._exception
539
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500540 # Consume whole buffer except last bytes, which length is
541 # one less than seplen. Let's check corner cases with
542 # separator='SEPARATOR':
543 # * we have received almost complete separator (without last
544 # byte). i.e buffer='some textSEPARATO'. In this case we
545 # can safely consume len(separator) - 1 bytes.
546 # * last byte of buffer is first byte of separator, i.e.
547 # buffer='abcdefghijklmnopqrS'. We may safely consume
548 # everything except that last byte, but this require to
549 # analyze bytes of buffer that match partial separator.
550 # This is slow and/or require FSM. For this case our
551 # implementation is not optimal, since require rescanning
552 # of data that is known to not belong to separator. In
553 # real world, separator will not be so long to notice
554 # performance problems. Even when reading MIME-encoded
555 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556
Yury Selivanovb4617912016-05-16 16:32:38 -0400557 # `offset` is the number of bytes from the beginning of the buffer
558 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500559 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561 # Loop until we find `separator` in the buffer, exceed the buffer size,
562 # or an EOF has happened.
563 while True:
564 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500566 # Check if we now have enough data in the buffer for `separator` to
567 # fit.
568 if buflen - offset >= seplen:
569 isep = self._buffer.find(separator, offset)
570
571 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400572 # `separator` is in the buffer. `isep` will be used later
573 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500574 break
575
576 # see upper comment for explanation.
577 offset = buflen + 1 - seplen
578 if offset > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400579 raise LimitOverrunError(
580 'Separator is not found, and chunk exceed the limit',
581 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500582
583 # Complete message (with full separator) may be present in buffer
584 # even when EOF flag is set. This may happen when the last chunk
585 # adds data which makes separator be found. That's why we check for
586 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700587 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500588 chunk = bytes(self._buffer)
589 self._buffer.clear()
590 raise IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500592 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200593 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500595 if isep > self._limit:
Yury Selivanovb4617912016-05-16 16:32:38 -0400596 raise LimitOverrunError(
597 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500598
599 chunk = self._buffer[:isep + seplen]
600 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500602 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200604 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500605 """Read up to `n` bytes from the stream.
606
607 If n is not provided, or set to -1, read until EOF and return all read
608 bytes. If the EOF was received and the internal buffer is empty, return
609 an empty bytes object.
610
Martin Panter0be894b2016-09-07 12:03:06 +0000611 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500612
613 If n is positive, this function try to read `n` bytes, and may return
614 less or equal bytes than requested, but at least one byte. If EOF was
615 received before any byte is read, this function returns empty byte
616 object.
617
Yury Selivanovb4617912016-05-16 16:32:38 -0400618 Returned value is not limited with limit, configured at stream
619 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500620
621 If stream was paused, this function will automatically resume it if
622 needed.
623 """
624
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 if self._exception is not None:
626 raise self._exception
627
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500628 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 return b''
630
631 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700632 # This used to just loop creating a new waiter hoping to
633 # collect everything in self._buffer, but that would
634 # deadlock if the subprocess sends more than self.limit
635 # bytes. So just call self.read(self._limit) until EOF.
636 blocks = []
637 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200638 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700639 if not block:
640 break
641 blocks.append(block)
642 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200645 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500646
647 # This will work right even if buffer is less than n bytes
648 data = bytes(self._buffer[:n])
649 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650
Yury Selivanove694c972014-02-05 18:11:13 -0500651 self._maybe_resume_transport()
652 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200654 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500655 """Read exactly `n` bytes.
656
Yury Selivanovb4617912016-05-16 16:32:38 -0400657 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
658 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500659 contain the partial read bytes.
660
661 if n is zero, return empty bytes object.
662
Yury Selivanovb4617912016-05-16 16:32:38 -0400663 Returned value is not limited with limit, configured at stream
664 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500665
666 If stream was paused, this function will automatically resume it if
667 needed.
668 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500669 if n < 0:
670 raise ValueError('readexactly size can not be less than zero')
671
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 if self._exception is not None:
673 raise self._exception
674
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500675 if n == 0:
676 return b''
677
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400678 while len(self._buffer) < n:
679 if self._eof:
680 incomplete = bytes(self._buffer)
681 self._buffer.clear()
682 raise IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200684 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400686 if len(self._buffer) == n:
687 data = bytes(self._buffer)
688 self._buffer.clear()
689 else:
690 data = bytes(self._buffer[:n])
691 del self._buffer[:n]
692 self._maybe_resume_transport()
693 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400694
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400695 def __aiter__(self):
696 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400697
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200698 async def __anext__(self):
699 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400700 if val == b'':
701 raise StopAsyncIteration
702 return val