blob: 0afc66a473d4185247cacc06b4bbfa298403da7f [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07003 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
7import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05008
Guido van Rossume3e786c2014-02-18 10:24:30 -08009if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050010 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080011
Victor Stinnerf951d282014-06-29 00:46:45 +020012from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070014from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070015from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020017from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020018from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Victor Stinner9551f772018-05-29 16:02:07 +020021_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Andrew Svetlov5f841b52017-12-09 00:23:48 +020024async def open_connection(host=None, port=None, *,
25 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 """A wrapper for create_connection() returning a (reader, writer) pair.
27
28 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010029 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
31 The arguments are all the usual arguments to create_connection()
32 except protocol_factory; most common are positional host and port,
33 with various optional keyword arguments following.
34
35 Additional optional keyword arguments are loop (to set the event loop
36 instance to use) and limit (to set the buffer limit passed to the
37 StreamReader).
38
39 (If you want to customize the StreamReader and/or
40 StreamReaderProtocol classes, just copy the code -- there's
41 really nothing special here except some convenience.)
42 """
43 if loop is None:
44 loop = events.get_event_loop()
45 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080046 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020047 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070049 writer = StreamWriter(transport, protocol, reader, loop)
50 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52
Andrew Svetlov5f841b52017-12-09 00:23:48 +020053async def start_server(client_connected_cb, host=None, port=None, *,
54 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080055 """Start a socket server, call back for each client connected.
56
57 The first parameter, `client_connected_cb`, takes two parameters:
58 client_reader, client_writer. client_reader is a StreamReader
59 object, while client_writer is a StreamWriter object. This
60 parameter can either be a plain callback function or a coroutine;
61 if it is a coroutine, it will be automatically converted into a
62 Task.
63
64 The rest of the arguments are all the usual arguments to
65 loop.create_server() except protocol_factory; most common are
66 positional host and port, with various optional keyword arguments
67 following. The return value is the same as loop.create_server().
68
69 Additional optional keyword arguments are loop (to set the event loop
70 instance to use) and limit (to set the buffer limit passed to the
71 StreamReader).
72
73 The return value is the same as loop.create_server(), i.e. a
74 Server object which can be used to stop the service.
75 """
76 if loop is None:
77 loop = events.get_event_loop()
78
79 def factory():
80 reader = StreamReader(limit=limit, loop=loop)
81 protocol = StreamReaderProtocol(reader, client_connected_cb,
82 loop=loop)
83 return protocol
84
Andrew Svetlov5f841b52017-12-09 00:23:48 +020085 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080086
87
Yury Selivanovb057c522014-02-18 12:15:06 -050088if hasattr(socket, 'AF_UNIX'):
89 # UNIX Domain Sockets are supported on this platform
90
Andrew Svetlov5f841b52017-12-09 00:23:48 +020091 async def open_unix_connection(path=None, *,
92 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -050093 """Similar to `open_connection` but works with UNIX Domain Sockets."""
94 if loop is None:
95 loop = events.get_event_loop()
96 reader = StreamReader(limit=limit, loop=loop)
97 protocol = StreamReaderProtocol(reader, loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020098 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -050099 lambda: protocol, path, **kwds)
100 writer = StreamWriter(transport, protocol, reader, loop)
101 return reader, writer
102
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200103 async def start_unix_server(client_connected_cb, path=None, *,
104 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500105 """Similar to `start_server` but works with UNIX Domain Sockets."""
106 if loop is None:
107 loop = events.get_event_loop()
108
109 def factory():
110 reader = StreamReader(limit=limit, loop=loop)
111 protocol = StreamReaderProtocol(reader, client_connected_cb,
112 loop=loop)
113 return protocol
114
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200115 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500116
117
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800118class FlowControlMixin(protocols.Protocol):
119 """Reusable flow control logic for StreamWriter.drain().
120
121 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800122 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800123 these it must call the super methods.
124
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200125 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800126 """
127
128 def __init__(self, loop=None):
Victor Stinner70db9e42015-01-09 21:32:05 +0100129 if loop is None:
130 self._loop = events.get_event_loop()
131 else:
132 self._loop = loop
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800133 self._paused = False
134 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200135 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800136
137 def pause_writing(self):
138 assert not self._paused
139 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200140 if self._loop.get_debug():
141 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800142
143 def resume_writing(self):
144 assert self._paused
145 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200146 if self._loop.get_debug():
147 logger.debug("%r resumes writing", self)
148
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800149 waiter = self._drain_waiter
150 if waiter is not None:
151 self._drain_waiter = None
152 if not waiter.done():
153 waiter.set_result(None)
154
155 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200156 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800157 # Wake up the writer if currently paused.
158 if not self._paused:
159 return
160 waiter = self._drain_waiter
161 if waiter is None:
162 return
163 self._drain_waiter = None
164 if waiter.done():
165 return
166 if exc is None:
167 waiter.set_result(None)
168 else:
169 waiter.set_exception(exc)
170
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200171 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200172 if self._connection_lost:
173 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800174 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200175 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800176 waiter = self._drain_waiter
177 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400178 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800179 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200180 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800181
182
183class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
184 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
186 (This is a helper class instead of making StreamReader itself a
187 Protocol subclass, because the StreamReader has other potential
188 uses, and to prevent the user of the StreamReader to accidentally
189 call inappropriate methods of the protocol.)
190 """
191
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700192 _source_traceback = None
193
Guido van Rossum1540b162013-11-19 11:43:38 -0800194 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800195 super().__init__(loop=loop)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700196 if stream_reader is not None:
197 self._stream_reader_wr = weakref.ref(stream_reader,
198 self._on_reader_gc)
199 self._source_traceback = stream_reader._source_traceback
200 else:
201 self._stream_reader_wr = None
202 if client_connected_cb is not None:
203 # This is a stream created by the `create_server()` function.
204 # Keep a strong reference to the reader until a connection
205 # is established.
206 self._strong_reader = stream_reader
207 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800208 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700209 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800210 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400211 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200212 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700214 def _on_reader_gc(self, wr):
215 transport = self._transport
216 if transport is not None:
217 # connection_made was called
218 context = {
219 'message': ('An open stream object is being garbage '
220 'collected; call "stream.close()" explicitly.')
221 }
222 if self._source_traceback:
223 context['source_traceback'] = self._source_traceback
224 self._loop.call_exception_handler(context)
225 transport.abort()
226 else:
227 self._reject_connection = True
228 self._stream_reader_wr = None
229
230 def _untrack_reader(self):
231 self._stream_reader_wr = None
232
233 @property
234 def _stream_reader(self):
235 if self._stream_reader_wr is None:
236 return None
237 return self._stream_reader_wr()
238
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 def connection_made(self, transport):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700240 if self._reject_connection:
241 context = {
242 'message': ('An open stream was garbage collected prior to '
243 'establishing network connection; '
244 'call "stream.close()" explicitly.')
245 }
246 if self._source_traceback:
247 context['source_traceback'] = self._source_traceback
248 self._loop.call_exception_handler(context)
249 transport.abort()
250 return
251 self._transport = transport
252 reader = self._stream_reader
253 if reader is not None:
254 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400255 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800256 if self._client_connected_cb is not None:
257 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700258 reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800259 self._loop)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700260 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800261 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200262 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200263 self._loop.create_task(res)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700264 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
266 def connection_lost(self, exc):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700267 reader = self._stream_reader
268 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400269 if exc is None:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700270 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400271 else:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700272 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200273 if not self._closed.done():
274 if exc is None:
275 self._closed.set_result(None)
276 else:
277 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800278 super().connection_lost(exc)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700279 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400280 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700281 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def data_received(self, data):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700284 reader = self._stream_reader
285 if reader is not None:
286 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288 def eof_received(self):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700289 reader = self._stream_reader
290 if reader is not None:
291 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400292 if self._over_ssl:
293 # Prevent a warning in SSLProtocol.eof_received:
294 # "returning true from eof_received()
295 # has no effect when using ssl"
296 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200297 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700298
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200299 def __del__(self):
300 # Prevent reports about unhandled exceptions.
301 # Better than self._closed._log_traceback = False hack
302 closed = self._closed
303 if closed.done() and not closed.cancelled():
304 closed.exception()
305
Guido van Rossum355491d2013-10-18 15:17:11 -0700306
307class StreamWriter:
308 """Wraps a Transport.
309
310 This exposes write(), writelines(), [can_]write_eof(),
311 get_extra_info() and close(). It adds drain() which returns an
312 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800313 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700314 directly.
315 """
316
317 def __init__(self, transport, protocol, reader, loop):
318 self._transport = transport
319 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000320 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200321 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700322 self._reader = reader
323 self._loop = loop
324
Victor Stinneracdb7822014-07-14 18:33:40 +0200325 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500326 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200327 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500328 info.append(f'reader={self._reader!r}')
329 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200330
Guido van Rossum355491d2013-10-18 15:17:11 -0700331 @property
332 def transport(self):
333 return self._transport
334
335 def write(self, data):
336 self._transport.write(data)
337
338 def writelines(self, data):
339 self._transport.writelines(data)
340
341 def write_eof(self):
342 return self._transport.write_eof()
343
344 def can_write_eof(self):
345 return self._transport.can_write_eof()
346
Victor Stinner406204c2015-01-15 21:50:19 +0100347 def close(self):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700348 # a reader can be garbage collected
349 # after connection closing
350 self._protocol._untrack_reader()
Andrew Svetlov11194c82018-09-13 16:53:49 -0700351 self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100352
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200353 def is_closing(self):
354 return self._transport.is_closing()
355
356 async def wait_closed(self):
357 await self._protocol._closed
358
Guido van Rossum355491d2013-10-18 15:17:11 -0700359 def get_extra_info(self, name, default=None):
360 return self._transport.get_extra_info(name, default)
361
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200362 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200363 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700364
365 The intended use is to write
366
367 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200368 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700369 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200370 if self._reader is not None:
371 exc = self._reader.exception()
372 if exc is not None:
373 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200374 if self._transport.is_closing():
375 # Yield to the event loop so connection_lost() may be
376 # called. Without this, _drain_helper() would return
377 # immediately, and code that calls
378 # write(...); await drain()
379 # in a loop would never call connection_lost(), so it
380 # would not see an error when the socket is closed.
381 await sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200382 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Andrew Svetlov11194c82018-09-13 16:53:49 -0700384 async def aclose(self):
385 self.close()
386 await self.wait_closed()
387
388 async def awrite(self, data):
389 self.write(data)
390 await self.drain()
391
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
393class StreamReader:
394
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700395 _source_traceback = None
396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
398 # The line length limit is a security feature;
399 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500400
401 if limit <= 0:
402 raise ValueError('Limit cannot be <= 0')
403
Guido van Rossum355491d2013-10-18 15:17:11 -0700404 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100406 self._loop = events.get_event_loop()
407 else:
408 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500409 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100410 self._eof = False # Whether we're done.
411 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 self._exception = None
413 self._transport = None
414 self._paused = False
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700415 if self._loop.get_debug():
416 self._source_traceback = format_helpers.extract_stack(
417 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200419 def __repr__(self):
420 info = ['StreamReader']
421 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500422 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200423 if self._eof:
424 info.append('eof')
425 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500426 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200427 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500428 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200429 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500430 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200431 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500432 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200433 if self._paused:
434 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500435 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200436
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 def exception(self):
438 return self._exception
439
440 def set_exception(self, exc):
441 self._exception = exc
442
Guido van Rossum355491d2013-10-18 15:17:11 -0700443 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700445 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 if not waiter.cancelled():
447 waiter.set_exception(exc)
448
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100449 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500450 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100451 waiter = self._waiter
452 if waiter is not None:
453 self._waiter = None
454 if not waiter.cancelled():
455 waiter.set_result(None)
456
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 def set_transport(self, transport):
458 assert self._transport is None, 'Transport already set'
459 self._transport = transport
460
461 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500462 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700464 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
466 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700467 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100468 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Yury Selivanovf0020f52014-02-06 00:14:30 -0500470 def at_eof(self):
471 """Return True if the buffer is empty and 'feed_eof' was called."""
472 return self._eof and not self._buffer
473
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500475 assert not self._eof, 'feed_data after feed_eof'
476
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 if not data:
478 return
479
Yury Selivanove694c972014-02-05 18:11:13 -0500480 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100481 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482
483 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400484 not self._paused and
485 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700487 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 except NotImplementedError:
489 # The transport can't be paused.
490 # We'll just have to buffer all data.
491 # Forget the transport so we don't keep trying.
492 self._transport = None
493 else:
494 self._paused = True
495
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200496 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500497 """Wait until feed_data() or feed_eof() is called.
498
499 If stream was paused, automatically resume it.
500 """
Victor Stinner183e3472014-01-23 17:40:03 +0100501 # StreamReader uses a future to link the protocol feed_data() method
502 # to a read coroutine. Running two read coroutines at the same time
503 # would have an unexpected behaviour. It would not possible to know
504 # which coroutine would get the next data.
505 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500506 raise RuntimeError(
507 f'{func_name}() called while another coroutine is '
508 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100509
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500510 assert not self._eof, '_wait_for_data after EOF'
511
512 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400513 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500514 if self._paused:
515 self._paused = False
516 self._transport.resume_reading()
517
Yury Selivanov7661db62016-05-16 15:38:39 -0400518 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100519 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200520 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100521 finally:
522 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100523
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200524 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500525 """Read chunk of data from the stream until newline (b'\n') is found.
526
527 On success, return chunk that ends with newline. If only partial
528 line can be read due to EOF, return incomplete line without
529 terminating newline. When EOF was reached while no bytes read, empty
530 bytes object is returned.
531
532 If limit is reached, ValueError will be raised. In that case, if
533 newline was found, complete line including newline will be removed
534 from internal buffer. Else, internal buffer will be cleared. Limit is
535 compared against part of the line without newline.
536
537 If stream was paused, this function will automatically resume it if
538 needed.
539 """
540 sep = b'\n'
541 seplen = len(sep)
542 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200543 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700544 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500545 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700546 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500547 if self._buffer.startswith(sep, e.consumed):
548 del self._buffer[:e.consumed + seplen]
549 else:
550 self._buffer.clear()
551 self._maybe_resume_transport()
552 raise ValueError(e.args[0])
553 return line
554
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200555 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400556 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500557
Yury Selivanovb4617912016-05-16 16:32:38 -0400558 On success, the data and separator will be removed from the
559 internal buffer (consumed). Returned data will include the
560 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500561
Yury Selivanovb4617912016-05-16 16:32:38 -0400562 Configured stream limit is used to check result. Limit sets the
563 maximal length of data that can be returned, not counting the
564 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500565
Yury Selivanovb4617912016-05-16 16:32:38 -0400566 If an EOF occurs and the complete separator is still not found,
567 an IncompleteReadError exception will be raised, and the internal
568 buffer will be reset. The IncompleteReadError.partial attribute
569 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500570
Yury Selivanovb4617912016-05-16 16:32:38 -0400571 If the data cannot be read because of over limit, a
572 LimitOverrunError exception will be raised, and the data
573 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500574 """
575 seplen = len(separator)
576 if seplen == 0:
577 raise ValueError('Separator should be at least one-byte string')
578
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 if self._exception is not None:
580 raise self._exception
581
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500582 # Consume whole buffer except last bytes, which length is
583 # one less than seplen. Let's check corner cases with
584 # separator='SEPARATOR':
585 # * we have received almost complete separator (without last
586 # byte). i.e buffer='some textSEPARATO'. In this case we
587 # can safely consume len(separator) - 1 bytes.
588 # * last byte of buffer is first byte of separator, i.e.
589 # buffer='abcdefghijklmnopqrS'. We may safely consume
590 # everything except that last byte, but this require to
591 # analyze bytes of buffer that match partial separator.
592 # This is slow and/or require FSM. For this case our
593 # implementation is not optimal, since require rescanning
594 # of data that is known to not belong to separator. In
595 # real world, separator will not be so long to notice
596 # performance problems. Even when reading MIME-encoded
597 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
Yury Selivanovb4617912016-05-16 16:32:38 -0400599 # `offset` is the number of bytes from the beginning of the buffer
600 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500601 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500603 # Loop until we find `separator` in the buffer, exceed the buffer size,
604 # or an EOF has happened.
605 while True:
606 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500608 # Check if we now have enough data in the buffer for `separator` to
609 # fit.
610 if buflen - offset >= seplen:
611 isep = self._buffer.find(separator, offset)
612
613 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400614 # `separator` is in the buffer. `isep` will be used later
615 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500616 break
617
618 # see upper comment for explanation.
619 offset = buflen + 1 - seplen
620 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700621 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400622 'Separator is not found, and chunk exceed the limit',
623 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500624
625 # Complete message (with full separator) may be present in buffer
626 # even when EOF flag is set. This may happen when the last chunk
627 # adds data which makes separator be found. That's why we check for
628 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700629 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500630 chunk = bytes(self._buffer)
631 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700632 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500634 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200635 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500637 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700638 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400639 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500640
641 chunk = self._buffer[:isep + seplen]
642 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500644 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200646 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500647 """Read up to `n` bytes from the stream.
648
649 If n is not provided, or set to -1, read until EOF and return all read
650 bytes. If the EOF was received and the internal buffer is empty, return
651 an empty bytes object.
652
Martin Panter0be894b2016-09-07 12:03:06 +0000653 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500654
655 If n is positive, this function try to read `n` bytes, and may return
656 less or equal bytes than requested, but at least one byte. If EOF was
657 received before any byte is read, this function returns empty byte
658 object.
659
Yury Selivanovb4617912016-05-16 16:32:38 -0400660 Returned value is not limited with limit, configured at stream
661 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500662
663 If stream was paused, this function will automatically resume it if
664 needed.
665 """
666
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 if self._exception is not None:
668 raise self._exception
669
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500670 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 return b''
672
673 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700674 # This used to just loop creating a new waiter hoping to
675 # collect everything in self._buffer, but that would
676 # deadlock if the subprocess sends more than self.limit
677 # bytes. So just call self.read(self._limit) until EOF.
678 blocks = []
679 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200680 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700681 if not block:
682 break
683 blocks.append(block)
684 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500686 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200687 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500688
689 # This will work right even if buffer is less than n bytes
690 data = bytes(self._buffer[:n])
691 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692
Yury Selivanove694c972014-02-05 18:11:13 -0500693 self._maybe_resume_transport()
694 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200696 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500697 """Read exactly `n` bytes.
698
Yury Selivanovb4617912016-05-16 16:32:38 -0400699 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
700 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500701 contain the partial read bytes.
702
703 if n is zero, return empty bytes object.
704
Yury Selivanovb4617912016-05-16 16:32:38 -0400705 Returned value is not limited with limit, configured at stream
706 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500707
708 If stream was paused, this function will automatically resume it if
709 needed.
710 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500711 if n < 0:
712 raise ValueError('readexactly size can not be less than zero')
713
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 if self._exception is not None:
715 raise self._exception
716
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500717 if n == 0:
718 return b''
719
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400720 while len(self._buffer) < n:
721 if self._eof:
722 incomplete = bytes(self._buffer)
723 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700724 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200726 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400728 if len(self._buffer) == n:
729 data = bytes(self._buffer)
730 self._buffer.clear()
731 else:
732 data = bytes(self._buffer[:n])
733 del self._buffer[:n]
734 self._maybe_resume_transport()
735 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400736
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400737 def __aiter__(self):
738 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400739
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200740 async def __anext__(self):
741 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400742 if val == b'':
743 raise StopAsyncIteration
744 return val