blob: c9b1f32813d4376c9214e0399f5226a1bf51000a [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = (
2 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Andrew Svetlov0baa72f2018-09-11 10:13:04 -07003 'open_connection', 'start_server')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
Yury Selivanovb057c522014-02-18 12:15:06 -05005import socket
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07006import sys
Andrew Svetlovad4ed872019-05-06 22:52:11 -04007import warnings
Andrew Svetlova5d1eb82018-09-12 11:43:04 -07008import weakref
Yury Selivanovb057c522014-02-18 12:15:06 -05009
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
Yury Selivanov6370f342017-12-10 18:36:12 -050011 __all__ += ('open_unix_connection', 'start_unix_server')
Guido van Rossume3e786c2014-02-18 10:24:30 -080012
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070016from . import format_helpers
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import protocols
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Andrew Svetlov5f841b52017-12-09 00:23:48 +020019from .tasks import sleep
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Victor Stinner9551f772018-05-29 16:02:07 +020022_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
Guido van Rossuma849be92014-01-30 16:05:28 -080024
Andrew Svetlov5f841b52017-12-09 00:23:48 +020025async def open_connection(host=None, port=None, *,
26 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027 """A wrapper for create_connection() returning a (reader, writer) pair.
28
29 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010030 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32 The arguments are all the usual arguments to create_connection()
33 except protocol_factory; most common are positional host and port,
34 with various optional keyword arguments following.
35
36 Additional optional keyword arguments are loop (to set the event loop
37 instance to use) and limit (to set the buffer limit passed to the
38 StreamReader).
39
40 (If you want to customize the StreamReader and/or
41 StreamReaderProtocol classes, just copy the code -- there's
42 really nothing special here except some convenience.)
43 """
44 if loop is None:
45 loop = events.get_event_loop()
Andrew Svetlovad4ed872019-05-06 22:52:11 -040046 reader = StreamReader(limit=limit, loop=loop,
47 _asyncio_internal=True)
48 protocol = StreamReaderProtocol(reader, loop=loop,
49 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +020050 transport, _ = await loop.create_connection(
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 lambda: protocol, host, port, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -040052 writer = StreamWriter(transport, protocol, reader, loop,
53 _asyncio_internal=True)
Guido van Rossum355491d2013-10-18 15:17:11 -070054 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055
56
Andrew Svetlov5f841b52017-12-09 00:23:48 +020057async def start_server(client_connected_cb, host=None, port=None, *,
58 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Guido van Rossum1540b162013-11-19 11:43:38 -080059 """Start a socket server, call back for each client connected.
60
61 The first parameter, `client_connected_cb`, takes two parameters:
62 client_reader, client_writer. client_reader is a StreamReader
63 object, while client_writer is a StreamWriter object. This
64 parameter can either be a plain callback function or a coroutine;
65 if it is a coroutine, it will be automatically converted into a
66 Task.
67
68 The rest of the arguments are all the usual arguments to
69 loop.create_server() except protocol_factory; most common are
70 positional host and port, with various optional keyword arguments
71 following. The return value is the same as loop.create_server().
72
73 Additional optional keyword arguments are loop (to set the event loop
74 instance to use) and limit (to set the buffer limit passed to the
75 StreamReader).
76
77 The return value is the same as loop.create_server(), i.e. a
78 Server object which can be used to stop the service.
79 """
80 if loop is None:
81 loop = events.get_event_loop()
82
83 def factory():
Andrew Svetlovad4ed872019-05-06 22:52:11 -040084 reader = StreamReader(limit=limit, loop=loop,
85 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -080086 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040087 loop=loop,
88 _asyncio_internal=True)
Guido van Rossum1540b162013-11-19 11:43:38 -080089 return protocol
90
Andrew Svetlov5f841b52017-12-09 00:23:48 +020091 return await loop.create_server(factory, host, port, **kwds)
Guido van Rossum1540b162013-11-19 11:43:38 -080092
93
Yury Selivanovb057c522014-02-18 12:15:06 -050094if hasattr(socket, 'AF_UNIX'):
95 # UNIX Domain Sockets are supported on this platform
96
Andrew Svetlov5f841b52017-12-09 00:23:48 +020097 async def open_unix_connection(path=None, *,
98 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -050099 """Similar to `open_connection` but works with UNIX Domain Sockets."""
100 if loop is None:
101 loop = events.get_event_loop()
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400102 reader = StreamReader(limit=limit, loop=loop,
103 _asyncio_internal=True)
104 protocol = StreamReaderProtocol(reader, loop=loop,
105 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200106 transport, _ = await loop.create_unix_connection(
Yury Selivanovb057c522014-02-18 12:15:06 -0500107 lambda: protocol, path, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400108 writer = StreamWriter(transport, protocol, reader, loop,
109 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500110 return reader, writer
111
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200112 async def start_unix_server(client_connected_cb, path=None, *,
113 loop=None, limit=_DEFAULT_LIMIT, **kwds):
Yury Selivanovb057c522014-02-18 12:15:06 -0500114 """Similar to `start_server` but works with UNIX Domain Sockets."""
115 if loop is None:
116 loop = events.get_event_loop()
117
118 def factory():
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400119 reader = StreamReader(limit=limit, loop=loop,
120 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500121 protocol = StreamReaderProtocol(reader, client_connected_cb,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400122 loop=loop,
123 _asyncio_internal=True)
Yury Selivanovb057c522014-02-18 12:15:06 -0500124 return protocol
125
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200126 return await loop.create_unix_server(factory, path, **kwds)
Yury Selivanovb057c522014-02-18 12:15:06 -0500127
128
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800129class FlowControlMixin(protocols.Protocol):
130 """Reusable flow control logic for StreamWriter.drain().
131
132 This implements the protocol methods pause_writing(),
John Chen8f5c28b2017-12-01 20:33:40 +0800133 resume_writing() and connection_lost(). If the subclass overrides
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800134 these it must call the super methods.
135
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200136 StreamWriter.drain() must wait for _drain_helper() coroutine.
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137 """
138
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400139 def __init__(self, loop=None, *, _asyncio_internal=False):
Victor Stinner70db9e42015-01-09 21:32:05 +0100140 if loop is None:
141 self._loop = events.get_event_loop()
142 else:
143 self._loop = loop
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400144 if not _asyncio_internal:
145 # NOTE:
146 # Avoid inheritance from FlowControlMixin
147 # Copy-paste the code to your project
148 # if you need flow control helpers
149 warnings.warn(f"{self.__class__} should be instaniated "
150 "by asyncio internals only, "
151 "please avoid its creation from user code",
152 DeprecationWarning)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800153 self._paused = False
154 self._drain_waiter = None
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200155 self._connection_lost = False
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800156
157 def pause_writing(self):
158 assert not self._paused
159 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200160 if self._loop.get_debug():
161 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800162
163 def resume_writing(self):
164 assert self._paused
165 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200166 if self._loop.get_debug():
167 logger.debug("%r resumes writing", self)
168
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800169 waiter = self._drain_waiter
170 if waiter is not None:
171 self._drain_waiter = None
172 if not waiter.done():
173 waiter.set_result(None)
174
175 def connection_lost(self, exc):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200176 self._connection_lost = True
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800177 # Wake up the writer if currently paused.
178 if not self._paused:
179 return
180 waiter = self._drain_waiter
181 if waiter is None:
182 return
183 self._drain_waiter = None
184 if waiter.done():
185 return
186 if exc is None:
187 waiter.set_result(None)
188 else:
189 waiter.set_exception(exc)
190
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200191 async def _drain_helper(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200192 if self._connection_lost:
193 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800194 if not self._paused:
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200195 return
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800196 waiter = self._drain_waiter
197 assert waiter is None or waiter.cancelled()
Yury Selivanov7661db62016-05-16 15:38:39 -0400198 waiter = self._loop.create_future()
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800199 self._drain_waiter = waiter
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200200 await waiter
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800201
202
203class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
204 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 (This is a helper class instead of making StreamReader itself a
207 Protocol subclass, because the StreamReader has other potential
208 uses, and to prevent the user of the StreamReader to accidentally
209 call inappropriate methods of the protocol.)
210 """
211
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700212 _source_traceback = None
213
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400214 def __init__(self, stream_reader, client_connected_cb=None, loop=None,
215 *, _asyncio_internal=False):
216 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700217 if stream_reader is not None:
218 self._stream_reader_wr = weakref.ref(stream_reader,
219 self._on_reader_gc)
220 self._source_traceback = stream_reader._source_traceback
221 else:
222 self._stream_reader_wr = None
223 if client_connected_cb is not None:
224 # This is a stream created by the `create_server()` function.
225 # Keep a strong reference to the reader until a connection
226 # is established.
227 self._strong_reader = stream_reader
228 self._reject_connection = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800229 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700230 self._transport = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800231 self._client_connected_cb = client_connected_cb
Yury Selivanov3dc51292016-05-20 11:31:40 -0400232 self._over_ssl = False
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200233 self._closed = self._loop.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700235 def _on_reader_gc(self, wr):
236 transport = self._transport
237 if transport is not None:
238 # connection_made was called
239 context = {
240 'message': ('An open stream object is being garbage '
241 'collected; call "stream.close()" explicitly.')
242 }
243 if self._source_traceback:
244 context['source_traceback'] = self._source_traceback
245 self._loop.call_exception_handler(context)
246 transport.abort()
247 else:
248 self._reject_connection = True
249 self._stream_reader_wr = None
250
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700251 @property
252 def _stream_reader(self):
253 if self._stream_reader_wr is None:
254 return None
255 return self._stream_reader_wr()
256
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 def connection_made(self, transport):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700258 if self._reject_connection:
259 context = {
260 'message': ('An open stream was garbage collected prior to '
261 'establishing network connection; '
262 'call "stream.close()" explicitly.')
263 }
264 if self._source_traceback:
265 context['source_traceback'] = self._source_traceback
266 self._loop.call_exception_handler(context)
267 transport.abort()
268 return
269 self._transport = transport
270 reader = self._stream_reader
271 if reader is not None:
272 reader.set_transport(transport)
Yury Selivanov3dc51292016-05-20 11:31:40 -0400273 self._over_ssl = transport.get_extra_info('sslcontext') is not None
Guido van Rossum1540b162013-11-19 11:43:38 -0800274 if self._client_connected_cb is not None:
275 self._stream_writer = StreamWriter(transport, self,
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700276 reader,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400277 self._loop,
278 _asyncio_internal=True)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700279 res = self._client_connected_cb(reader,
Guido van Rossum1540b162013-11-19 11:43:38 -0800280 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200281 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200282 self._loop.create_task(res)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700283 self._strong_reader = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284
285 def connection_lost(self, exc):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700286 reader = self._stream_reader
287 if reader is not None:
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400288 if exc is None:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700289 reader.feed_eof()
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400290 else:
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700291 reader.set_exception(exc)
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200292 if not self._closed.done():
293 if exc is None:
294 self._closed.set_result(None)
295 else:
296 self._closed.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800297 super().connection_lost(exc)
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700298 self._stream_reader_wr = None
Yury Selivanov32dae3d2016-05-13 15:58:00 -0400299 self._stream_writer = None
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700300 self._transport = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def data_received(self, data):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700303 reader = self._stream_reader
304 if reader is not None:
305 reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
307 def eof_received(self):
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700308 reader = self._stream_reader
309 if reader is not None:
310 reader.feed_eof()
Yury Selivanov3dc51292016-05-20 11:31:40 -0400311 if self._over_ssl:
312 # Prevent a warning in SSLProtocol.eof_received:
313 # "returning true from eof_received()
314 # has no effect when using ssl"
315 return False
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200316 return True
Guido van Rossum355491d2013-10-18 15:17:11 -0700317
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200318 def __del__(self):
319 # Prevent reports about unhandled exceptions.
320 # Better than self._closed._log_traceback = False hack
321 closed = self._closed
322 if closed.done() and not closed.cancelled():
323 closed.exception()
324
Guido van Rossum355491d2013-10-18 15:17:11 -0700325
326class StreamWriter:
327 """Wraps a Transport.
328
329 This exposes write(), writelines(), [can_]write_eof(),
330 get_extra_info() and close(). It adds drain() which returns an
331 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800332 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700333 directly.
334 """
335
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400336 def __init__(self, transport, protocol, reader, loop,
337 *, _asyncio_internal=False):
338 if not _asyncio_internal:
339 warnings.warn(f"{self.__class__} should be instaniated "
340 "by asyncio internals only, "
341 "please avoid its creation from user code",
342 DeprecationWarning)
Guido van Rossum355491d2013-10-18 15:17:11 -0700343 self._transport = transport
344 self._protocol = protocol
Martin Panter7462b6492015-11-02 03:37:02 +0000345 # drain() expects that the reader has an exception() method
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200346 assert reader is None or isinstance(reader, StreamReader)
Guido van Rossum355491d2013-10-18 15:17:11 -0700347 self._reader = reader
348 self._loop = loop
349
Victor Stinneracdb7822014-07-14 18:33:40 +0200350 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500351 info = [self.__class__.__name__, f'transport={self._transport!r}']
Victor Stinneracdb7822014-07-14 18:33:40 +0200352 if self._reader is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500353 info.append(f'reader={self._reader!r}')
354 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +0200355
Guido van Rossum355491d2013-10-18 15:17:11 -0700356 @property
357 def transport(self):
358 return self._transport
359
360 def write(self, data):
361 self._transport.write(data)
362
363 def writelines(self, data):
364 self._transport.writelines(data)
365
366 def write_eof(self):
367 return self._transport.write_eof()
368
369 def can_write_eof(self):
370 return self._transport.can_write_eof()
371
Victor Stinner406204c2015-01-15 21:50:19 +0100372 def close(self):
Andrew Svetlov11194c82018-09-13 16:53:49 -0700373 self._transport.close()
Victor Stinner406204c2015-01-15 21:50:19 +0100374
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200375 def is_closing(self):
376 return self._transport.is_closing()
377
378 async def wait_closed(self):
379 await self._protocol._closed
380
Guido van Rossum355491d2013-10-18 15:17:11 -0700381 def get_extra_info(self, name, default=None):
382 return self._transport.get_extra_info(name, default)
383
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 async def drain(self):
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200385 """Flush the write buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700386
387 The intended use is to write
388
389 w.write(data)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200390 await w.drain()
Guido van Rossum355491d2013-10-18 15:17:11 -0700391 """
Victor Stinner31e7bfa2014-07-22 12:03:40 +0200392 if self._reader is not None:
393 exc = self._reader.exception()
394 if exc is not None:
395 raise exc
Andrew Svetlovfe133aa2018-01-25 00:30:30 +0200396 if self._transport.is_closing():
397 # Yield to the event loop so connection_lost() may be
398 # called. Without this, _drain_helper() would return
399 # immediately, and code that calls
400 # write(...); await drain()
401 # in a loop would never call connection_lost(), so it
402 # would not see an error when the socket is closed.
403 await sleep(0, loop=self._loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200404 await self._protocol._drain_helper()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
Andrew Svetlov11194c82018-09-13 16:53:49 -0700406 async def aclose(self):
407 self.close()
408 await self.wait_closed()
409
410 async def awrite(self, data):
411 self.write(data)
412 await self.drain()
413
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
415class StreamReader:
416
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700417 _source_traceback = None
418
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400419 def __init__(self, limit=_DEFAULT_LIMIT, loop=None,
420 *, _asyncio_internal=False):
421 if not _asyncio_internal:
422 warnings.warn(f"{self.__class__} should be instaniated "
423 "by asyncio internals only, "
424 "please avoid its creation from user code",
425 DeprecationWarning)
426
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 # The line length limit is a security feature;
428 # it also doubles as half the buffer limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500429
430 if limit <= 0:
431 raise ValueError('Limit cannot be <= 0')
432
Guido van Rossum355491d2013-10-18 15:17:11 -0700433 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 if loop is None:
Victor Stinner70db9e42015-01-09 21:32:05 +0100435 self._loop = events.get_event_loop()
436 else:
437 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500438 self._buffer = bytearray()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100439 self._eof = False # Whether we're done.
440 self._waiter = None # A future used by _wait_for_data()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 self._exception = None
442 self._transport = None
443 self._paused = False
Andrew Svetlova5d1eb82018-09-12 11:43:04 -0700444 if self._loop.get_debug():
445 self._source_traceback = format_helpers.extract_stack(
446 sys._getframe(1))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200448 def __repr__(self):
449 info = ['StreamReader']
450 if self._buffer:
Yury Selivanov6370f342017-12-10 18:36:12 -0500451 info.append(f'{len(self._buffer)} bytes')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200452 if self._eof:
453 info.append('eof')
454 if self._limit != _DEFAULT_LIMIT:
Yury Selivanov6370f342017-12-10 18:36:12 -0500455 info.append(f'limit={self._limit}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200456 if self._waiter:
Yury Selivanov6370f342017-12-10 18:36:12 -0500457 info.append(f'waiter={self._waiter!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200458 if self._exception:
Yury Selivanov6370f342017-12-10 18:36:12 -0500459 info.append(f'exception={self._exception!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200460 if self._transport:
Yury Selivanov6370f342017-12-10 18:36:12 -0500461 info.append(f'transport={self._transport!r}')
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200462 if self._paused:
463 info.append('paused')
Yury Selivanov6370f342017-12-10 18:36:12 -0500464 return '<{}>'.format(' '.join(info))
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200465
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 def exception(self):
467 return self._exception
468
469 def set_exception(self, exc):
470 self._exception = exc
471
Guido van Rossum355491d2013-10-18 15:17:11 -0700472 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700474 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 if not waiter.cancelled():
476 waiter.set_exception(exc)
477
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100478 def _wakeup_waiter(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500479 """Wakeup read*() functions waiting for data or EOF."""
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100480 waiter = self._waiter
481 if waiter is not None:
482 self._waiter = None
483 if not waiter.cancelled():
484 waiter.set_result(None)
485
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 def set_transport(self, transport):
487 assert self._transport is None, 'Transport already set'
488 self._transport = transport
489
490 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500491 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700493 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
495 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700496 self._eof = True
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100497 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498
Yury Selivanovf0020f52014-02-06 00:14:30 -0500499 def at_eof(self):
500 """Return True if the buffer is empty and 'feed_eof' was called."""
501 return self._eof and not self._buffer
502
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500504 assert not self._eof, 'feed_data after feed_eof'
505
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 if not data:
507 return
508
Yury Selivanove694c972014-02-05 18:11:13 -0500509 self._buffer.extend(data)
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100510 self._wakeup_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
512 if (self._transport is not None and
Yury Selivanovb4617912016-05-16 16:32:38 -0400513 not self._paused and
514 len(self._buffer) > 2 * self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700516 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 except NotImplementedError:
518 # The transport can't be paused.
519 # We'll just have to buffer all data.
520 # Forget the transport so we don't keep trying.
521 self._transport = None
522 else:
523 self._paused = True
524
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200525 async def _wait_for_data(self, func_name):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500526 """Wait until feed_data() or feed_eof() is called.
527
528 If stream was paused, automatically resume it.
529 """
Victor Stinner183e3472014-01-23 17:40:03 +0100530 # StreamReader uses a future to link the protocol feed_data() method
531 # to a read coroutine. Running two read coroutines at the same time
532 # would have an unexpected behaviour. It would not possible to know
533 # which coroutine would get the next data.
534 if self._waiter is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500535 raise RuntimeError(
536 f'{func_name}() called while another coroutine is '
537 f'already waiting for incoming data')
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100538
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500539 assert not self._eof, '_wait_for_data after EOF'
540
541 # Waiting for data while paused will make deadlock, so prevent it.
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400542 # This is essential for readexactly(n) for case when n > self._limit.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500543 if self._paused:
544 self._paused = False
545 self._transport.resume_reading()
546
Yury Selivanov7661db62016-05-16 15:38:39 -0400547 self._waiter = self._loop.create_future()
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100548 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200549 await self._waiter
Victor Stinnerc2c12e42015-01-14 00:53:37 +0100550 finally:
551 self._waiter = None
Victor Stinner183e3472014-01-23 17:40:03 +0100552
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200553 async def readline(self):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500554 """Read chunk of data from the stream until newline (b'\n') is found.
555
556 On success, return chunk that ends with newline. If only partial
557 line can be read due to EOF, return incomplete line without
558 terminating newline. When EOF was reached while no bytes read, empty
559 bytes object is returned.
560
561 If limit is reached, ValueError will be raised. In that case, if
562 newline was found, complete line including newline will be removed
563 from internal buffer. Else, internal buffer will be cleared. Limit is
564 compared against part of the line without newline.
565
566 If stream was paused, this function will automatically resume it if
567 needed.
568 """
569 sep = b'\n'
570 seplen = len(sep)
571 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200572 line = await self.readuntil(sep)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700573 except exceptions.IncompleteReadError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500574 return e.partial
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700575 except exceptions.LimitOverrunError as e:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500576 if self._buffer.startswith(sep, e.consumed):
577 del self._buffer[:e.consumed + seplen]
578 else:
579 self._buffer.clear()
580 self._maybe_resume_transport()
581 raise ValueError(e.args[0])
582 return line
583
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200584 async def readuntil(self, separator=b'\n'):
Yury Selivanovb4617912016-05-16 16:32:38 -0400585 """Read data from the stream until ``separator`` is found.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500586
Yury Selivanovb4617912016-05-16 16:32:38 -0400587 On success, the data and separator will be removed from the
588 internal buffer (consumed). Returned data will include the
589 separator at the end.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500590
Yury Selivanovb4617912016-05-16 16:32:38 -0400591 Configured stream limit is used to check result. Limit sets the
592 maximal length of data that can be returned, not counting the
593 separator.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500594
Yury Selivanovb4617912016-05-16 16:32:38 -0400595 If an EOF occurs and the complete separator is still not found,
596 an IncompleteReadError exception will be raised, and the internal
597 buffer will be reset. The IncompleteReadError.partial attribute
598 may contain the separator partially.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500599
Yury Selivanovb4617912016-05-16 16:32:38 -0400600 If the data cannot be read because of over limit, a
601 LimitOverrunError exception will be raised, and the data
602 will be left in the internal buffer, so it can be read again.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500603 """
604 seplen = len(separator)
605 if seplen == 0:
606 raise ValueError('Separator should be at least one-byte string')
607
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 if self._exception is not None:
609 raise self._exception
610
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500611 # Consume whole buffer except last bytes, which length is
612 # one less than seplen. Let's check corner cases with
613 # separator='SEPARATOR':
614 # * we have received almost complete separator (without last
615 # byte). i.e buffer='some textSEPARATO'. In this case we
616 # can safely consume len(separator) - 1 bytes.
617 # * last byte of buffer is first byte of separator, i.e.
618 # buffer='abcdefghijklmnopqrS'. We may safely consume
619 # everything except that last byte, but this require to
620 # analyze bytes of buffer that match partial separator.
621 # This is slow and/or require FSM. For this case our
622 # implementation is not optimal, since require rescanning
623 # of data that is known to not belong to separator. In
624 # real world, separator will not be so long to notice
625 # performance problems. Even when reading MIME-encoded
626 # messages :)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
Yury Selivanovb4617912016-05-16 16:32:38 -0400628 # `offset` is the number of bytes from the beginning of the buffer
629 # where there is no occurrence of `separator`.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500630 offset = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500632 # Loop until we find `separator` in the buffer, exceed the buffer size,
633 # or an EOF has happened.
634 while True:
635 buflen = len(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500637 # Check if we now have enough data in the buffer for `separator` to
638 # fit.
639 if buflen - offset >= seplen:
640 isep = self._buffer.find(separator, offset)
641
642 if isep != -1:
Yury Selivanovb4617912016-05-16 16:32:38 -0400643 # `separator` is in the buffer. `isep` will be used later
644 # to retrieve the data.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500645 break
646
647 # see upper comment for explanation.
648 offset = buflen + 1 - seplen
649 if offset > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700650 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400651 'Separator is not found, and chunk exceed the limit',
652 offset)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500653
654 # Complete message (with full separator) may be present in buffer
655 # even when EOF flag is set. This may happen when the last chunk
656 # adds data which makes separator be found. That's why we check for
657 # EOF *ater* inspecting the buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700658 if self._eof:
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500659 chunk = bytes(self._buffer)
660 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700661 raise exceptions.IncompleteReadError(chunk, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500663 # _wait_for_data() will resume reading if stream was paused.
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200664 await self._wait_for_data('readuntil')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500666 if isep > self._limit:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700667 raise exceptions.LimitOverrunError(
Yury Selivanovb4617912016-05-16 16:32:38 -0400668 'Separator is found, but chunk is longer than limit', isep)
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500669
670 chunk = self._buffer[:isep + seplen]
671 del self._buffer[:isep + seplen]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 self._maybe_resume_transport()
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500673 return bytes(chunk)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200675 async def read(self, n=-1):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500676 """Read up to `n` bytes from the stream.
677
678 If n is not provided, or set to -1, read until EOF and return all read
679 bytes. If the EOF was received and the internal buffer is empty, return
680 an empty bytes object.
681
Martin Panter0be894b2016-09-07 12:03:06 +0000682 If n is zero, return empty bytes object immediately.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500683
684 If n is positive, this function try to read `n` bytes, and may return
685 less or equal bytes than requested, but at least one byte. If EOF was
686 received before any byte is read, this function returns empty byte
687 object.
688
Yury Selivanovb4617912016-05-16 16:32:38 -0400689 Returned value is not limited with limit, configured at stream
690 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500691
692 If stream was paused, this function will automatically resume it if
693 needed.
694 """
695
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 if self._exception is not None:
697 raise self._exception
698
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500699 if n == 0:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 return b''
701
702 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700703 # This used to just loop creating a new waiter hoping to
704 # collect everything in self._buffer, but that would
705 # deadlock if the subprocess sends more than self.limit
706 # bytes. So just call self.read(self._limit) until EOF.
707 blocks = []
708 while True:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200709 block = await self.read(self._limit)
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700710 if not block:
711 break
712 blocks.append(block)
713 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500715 if not self._buffer and not self._eof:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200716 await self._wait_for_data('read')
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500717
718 # This will work right even if buffer is less than n bytes
719 data = bytes(self._buffer[:n])
720 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721
Yury Selivanove694c972014-02-05 18:11:13 -0500722 self._maybe_resume_transport()
723 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200725 async def readexactly(self, n):
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500726 """Read exactly `n` bytes.
727
Yury Selivanovb4617912016-05-16 16:32:38 -0400728 Raise an IncompleteReadError if EOF is reached before `n` bytes can be
729 read. The IncompleteReadError.partial attribute of the exception will
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500730 contain the partial read bytes.
731
732 if n is zero, return empty bytes object.
733
Yury Selivanovb4617912016-05-16 16:32:38 -0400734 Returned value is not limited with limit, configured at stream
735 creation.
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500736
737 If stream was paused, this function will automatically resume it if
738 needed.
739 """
Yury Selivanovdddc7812015-12-11 11:32:59 -0500740 if n < 0:
741 raise ValueError('readexactly size can not be less than zero')
742
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 if self._exception is not None:
744 raise self._exception
745
Yury Selivanovd9d0e862016-01-11 12:28:19 -0500746 if n == 0:
747 return b''
748
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400749 while len(self._buffer) < n:
750 if self._eof:
751 incomplete = bytes(self._buffer)
752 self._buffer.clear()
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700753 raise exceptions.IncompleteReadError(incomplete, n)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200755 await self._wait_for_data('readexactly')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756
Yury Selivanov3e56ff02016-10-05 18:01:12 -0400757 if len(self._buffer) == n:
758 data = bytes(self._buffer)
759 self._buffer.clear()
760 else:
761 data = bytes(self._buffer[:n])
762 del self._buffer[:n]
763 self._maybe_resume_transport()
764 return data
Yury Selivanovd08c3632015-05-13 15:15:56 -0400765
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400766 def __aiter__(self):
767 return self
Yury Selivanovd08c3632015-05-13 15:15:56 -0400768
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200769 async def __anext__(self):
770 val = await self.readline()
Yury Selivanovfaa135a2017-10-06 02:08:57 -0400771 if val == b'':
772 raise StopAsyncIteration
773 return val