blob: d18db77b4f445206588077c7b3be28d32d9acb90 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
15from . import futures
16from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020017from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
21_DEFAULT_LIMIT = 2**16
22
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
29 - expected: total number of expected bytes
30 """
31 def __init__(self, partial, expected):
32 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
33 % (len(partial), expected))
34 self.partial = partial
35 self.expected = expected
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Victor Stinnerf951d282014-06-29 00:46:45 +020038@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039def open_connection(host=None, port=None, *,
40 loop=None, limit=_DEFAULT_LIMIT, **kwds):
41 """A wrapper for create_connection() returning a (reader, writer) pair.
42
43 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010044 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
46 The arguments are all the usual arguments to create_connection()
47 except protocol_factory; most common are positional host and port,
48 with various optional keyword arguments following.
49
50 Additional optional keyword arguments are loop (to set the event loop
51 instance to use) and limit (to set the buffer limit passed to the
52 StreamReader).
53
54 (If you want to customize the StreamReader and/or
55 StreamReaderProtocol classes, just copy the code -- there's
56 really nothing special here except some convenience.)
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080061 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 transport, _ = yield from loop.create_connection(
63 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070064 writer = StreamWriter(transport, protocol, reader, loop)
65 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
67
Victor Stinnerf951d282014-06-29 00:46:45 +020068@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080069def start_server(client_connected_cb, host=None, port=None, *,
70 loop=None, limit=_DEFAULT_LIMIT, **kwds):
71 """Start a socket server, call back for each client connected.
72
73 The first parameter, `client_connected_cb`, takes two parameters:
74 client_reader, client_writer. client_reader is a StreamReader
75 object, while client_writer is a StreamWriter object. This
76 parameter can either be a plain callback function or a coroutine;
77 if it is a coroutine, it will be automatically converted into a
78 Task.
79
80 The rest of the arguments are all the usual arguments to
81 loop.create_server() except protocol_factory; most common are
82 positional host and port, with various optional keyword arguments
83 following. The return value is the same as loop.create_server().
84
85 Additional optional keyword arguments are loop (to set the event loop
86 instance to use) and limit (to set the buffer limit passed to the
87 StreamReader).
88
89 The return value is the same as loop.create_server(), i.e. a
90 Server object which can be used to stop the service.
91 """
92 if loop is None:
93 loop = events.get_event_loop()
94
95 def factory():
96 reader = StreamReader(limit=limit, loop=loop)
97 protocol = StreamReaderProtocol(reader, client_connected_cb,
98 loop=loop)
99 return protocol
100
101 return (yield from loop.create_server(factory, host, port, **kwds))
102
103
Yury Selivanovb057c522014-02-18 12:15:06 -0500104if hasattr(socket, 'AF_UNIX'):
105 # UNIX Domain Sockets are supported on this platform
106
Victor Stinnerf951d282014-06-29 00:46:45 +0200107 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500108 def open_unix_connection(path=None, *,
109 loop=None, limit=_DEFAULT_LIMIT, **kwds):
110 """Similar to `open_connection` but works with UNIX Domain Sockets."""
111 if loop is None:
112 loop = events.get_event_loop()
113 reader = StreamReader(limit=limit, loop=loop)
114 protocol = StreamReaderProtocol(reader, loop=loop)
115 transport, _ = yield from loop.create_unix_connection(
116 lambda: protocol, path, **kwds)
117 writer = StreamWriter(transport, protocol, reader, loop)
118 return reader, writer
119
120
Victor Stinnerf951d282014-06-29 00:46:45 +0200121 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 def start_unix_server(client_connected_cb, path=None, *,
123 loop=None, limit=_DEFAULT_LIMIT, **kwds):
124 """Similar to `start_server` but works with UNIX Domain Sockets."""
125 if loop is None:
126 loop = events.get_event_loop()
127
128 def factory():
129 reader = StreamReader(limit=limit, loop=loop)
130 protocol = StreamReaderProtocol(reader, client_connected_cb,
131 loop=loop)
132 return protocol
133
134 return (yield from loop.create_unix_server(factory, path, **kwds))
135
136
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137class FlowControlMixin(protocols.Protocol):
138 """Reusable flow control logic for StreamWriter.drain().
139
140 This implements the protocol methods pause_writing(),
141 resume_reading() and connection_lost(). If the subclass overrides
142 these it must call the super methods.
143
144 StreamWriter.drain() must check for error conditions and then call
145 _make_drain_waiter(), which will return either () or a Future
146 depending on the paused state.
147 """
148
149 def __init__(self, loop=None):
150 self._loop = loop # May be None; we may never need it.
151 self._paused = False
152 self._drain_waiter = None
153
154 def pause_writing(self):
155 assert not self._paused
156 self._paused = True
Victor Stinneracdb7822014-07-14 18:33:40 +0200157 if self._loop.get_debug():
158 logger.debug("%r pauses writing", self)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800159
160 def resume_writing(self):
161 assert self._paused
162 self._paused = False
Victor Stinneracdb7822014-07-14 18:33:40 +0200163 if self._loop.get_debug():
164 logger.debug("%r resumes writing", self)
165
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800166 waiter = self._drain_waiter
167 if waiter is not None:
168 self._drain_waiter = None
169 if not waiter.done():
170 waiter.set_result(None)
171
172 def connection_lost(self, exc):
173 # Wake up the writer if currently paused.
174 if not self._paused:
175 return
176 waiter = self._drain_waiter
177 if waiter is None:
178 return
179 self._drain_waiter = None
180 if waiter.done():
181 return
182 if exc is None:
183 waiter.set_result(None)
184 else:
185 waiter.set_exception(exc)
186
187 def _make_drain_waiter(self):
188 if not self._paused:
189 return ()
190 waiter = self._drain_waiter
191 assert waiter is None or waiter.cancelled()
192 waiter = futures.Future(loop=self._loop)
193 self._drain_waiter = waiter
194 return waiter
195
196
197class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
198 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199
200 (This is a helper class instead of making StreamReader itself a
201 Protocol subclass, because the StreamReader has other potential
202 uses, and to prevent the user of the StreamReader to accidentally
203 call inappropriate methods of the protocol.)
204 """
205
Guido van Rossum1540b162013-11-19 11:43:38 -0800206 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800207 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700208 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800209 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800210 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211
212 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700213 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800214 if self._client_connected_cb is not None:
215 self._stream_writer = StreamWriter(transport, self,
216 self._stream_reader,
217 self._loop)
218 res = self._client_connected_cb(self._stream_reader,
219 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200220 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200221 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222
223 def connection_lost(self, exc):
224 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700227 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800228 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
230 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700231 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232
233 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700234 self._stream_reader.feed_eof()
235
Guido van Rossum355491d2013-10-18 15:17:11 -0700236
237class StreamWriter:
238 """Wraps a Transport.
239
240 This exposes write(), writelines(), [can_]write_eof(),
241 get_extra_info() and close(). It adds drain() which returns an
242 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800243 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700244 directly.
245 """
246
247 def __init__(self, transport, protocol, reader, loop):
248 self._transport = transport
249 self._protocol = protocol
250 self._reader = reader
251 self._loop = loop
252
Victor Stinneracdb7822014-07-14 18:33:40 +0200253 def __repr__(self):
254 info = [self.__class__.__name__, 'transport=%r' % self._transport]
255 if self._reader is not None:
256 info.append('reader=%r' % self._reader)
257 return '<%s>' % ' '.join(info)
258
Guido van Rossum355491d2013-10-18 15:17:11 -0700259 @property
260 def transport(self):
261 return self._transport
262
263 def write(self, data):
264 self._transport.write(data)
265
266 def writelines(self, data):
267 self._transport.writelines(data)
268
269 def write_eof(self):
270 return self._transport.write_eof()
271
272 def can_write_eof(self):
273 return self._transport.can_write_eof()
274
275 def close(self):
276 return self._transport.close()
277
278 def get_extra_info(self, name, default=None):
279 return self._transport.get_extra_info(name, default)
280
281 def drain(self):
282 """This method has an unusual return value.
283
284 The intended use is to write
285
286 w.write(data)
287 yield from w.drain()
288
289 When there's nothing to wait for, drain() returns (), and the
290 yield-from continues immediately. When the transport buffer
291 is full (the protocol is paused), drain() creates and returns
292 a Future and the yield-from will block until that Future is
293 completed, which will happen when the buffer is (partially)
294 drained and the protocol is resumed.
295 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800296 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800297 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700298 if self._transport._conn_lost: # Uses private variable.
299 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800300 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302
303class StreamReader:
304
305 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
306 # The line length limit is a security feature;
307 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700308 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 if loop is None:
310 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700311 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500312 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700313 self._eof = False # Whether we're done.
314 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 self._exception = None
316 self._transport = None
317 self._paused = False
318
319 def exception(self):
320 return self._exception
321
322 def set_exception(self, exc):
323 self._exception = exc
324
Guido van Rossum355491d2013-10-18 15:17:11 -0700325 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700327 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 if not waiter.cancelled():
329 waiter.set_exception(exc)
330
331 def set_transport(self, transport):
332 assert self._transport is None, 'Transport already set'
333 self._transport = transport
334
335 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500336 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700338 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700341 self._eof = True
342 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700344 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if not waiter.cancelled():
346 waiter.set_result(True)
347
Yury Selivanovf0020f52014-02-06 00:14:30 -0500348 def at_eof(self):
349 """Return True if the buffer is empty and 'feed_eof' was called."""
350 return self._eof and not self._buffer
351
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500353 assert not self._eof, 'feed_data after feed_eof'
354
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 if not data:
356 return
357
Yury Selivanove694c972014-02-05 18:11:13 -0500358 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359
Guido van Rossum355491d2013-10-18 15:17:11 -0700360 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700362 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 if not waiter.cancelled():
364 waiter.set_result(False)
365
366 if (self._transport is not None and
367 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500368 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700370 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 except NotImplementedError:
372 # The transport can't be paused.
373 # We'll just have to buffer all data.
374 # Forget the transport so we don't keep trying.
375 self._transport = None
376 else:
377 self._paused = True
378
Victor Stinner183e3472014-01-23 17:40:03 +0100379 def _create_waiter(self, func_name):
380 # StreamReader uses a future to link the protocol feed_data() method
381 # to a read coroutine. Running two read coroutines at the same time
382 # would have an unexpected behaviour. It would not possible to know
383 # which coroutine would get the next data.
384 if self._waiter is not None:
385 raise RuntimeError('%s() called while another coroutine is '
386 'already waiting for incoming data' % func_name)
387 return futures.Future(loop=self._loop)
388
Victor Stinnerf951d282014-06-29 00:46:45 +0200389 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 def readline(self):
391 if self._exception is not None:
392 raise self._exception
393
Yury Selivanove694c972014-02-05 18:11:13 -0500394 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 not_enough = True
396
397 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700398 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500399 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500401 line.extend(self._buffer)
402 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 else:
404 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500405 line.extend(self._buffer[:ichar])
406 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408
Yury Selivanove694c972014-02-05 18:11:13 -0500409 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 self._maybe_resume_transport()
411 raise ValueError('Line is too long')
412
Guido van Rossum355491d2013-10-18 15:17:11 -0700413 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 break
415
416 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100417 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700419 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700421 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500424 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
Victor Stinnerf951d282014-06-29 00:46:45 +0200426 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 def read(self, n=-1):
428 if self._exception is not None:
429 raise self._exception
430
431 if not n:
432 return b''
433
434 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700435 # This used to just loop creating a new waiter hoping to
436 # collect everything in self._buffer, but that would
437 # deadlock if the subprocess sends more than self.limit
438 # bytes. So just call self.read(self._limit) until EOF.
439 blocks = []
440 while True:
441 block = yield from self.read(self._limit)
442 if not block:
443 break
444 blocks.append(block)
445 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500447 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100448 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700450 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700452 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Yury Selivanove694c972014-02-05 18:11:13 -0500454 if n < 0 or len(self._buffer) <= n:
455 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700456 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500457 else:
458 # n > 0 and len(self._buffer) > n
459 data = bytes(self._buffer[:n])
460 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
Yury Selivanove694c972014-02-05 18:11:13 -0500462 self._maybe_resume_transport()
463 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
Victor Stinnerf951d282014-06-29 00:46:45 +0200465 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 def readexactly(self, n):
467 if self._exception is not None:
468 raise self._exception
469
Guido van Rossum38455212014-01-06 16:09:18 -0800470 # There used to be "optimized" code here. It created its own
471 # Future and waited until self._buffer had at least the n
472 # bytes, then called read(n). Unfortunately, this could pause
473 # the transport if the argument was larger than the pause
474 # limit (which is twice self._limit). So now we just read()
475 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
Guido van Rossum38455212014-01-06 16:09:18 -0800477 blocks = []
478 while n > 0:
479 block = yield from self.read(n)
480 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100481 partial = b''.join(blocks)
482 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800483 blocks.append(block)
484 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Guido van Rossum38455212014-01-06 16:09:18 -0800486 return b''.join(blocks)