blob: 9bde218bfa4983c73f5ec1f25bc00cb4fca637ff [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
15from . import futures
16from . import protocols
17from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
21_DEFAULT_LIMIT = 2**16
22
Guido van Rossuma849be92014-01-30 16:05:28 -080023
Victor Stinner8dffc452014-01-25 15:32:06 +010024class IncompleteReadError(EOFError):
25 """
26 Incomplete read error. Attributes:
27
28 - partial: read bytes string before the end of stream was reached
29 - expected: total number of expected bytes
30 """
31 def __init__(self, partial, expected):
32 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
33 % (len(partial), expected))
34 self.partial = partial
35 self.expected = expected
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Victor Stinnerf951d282014-06-29 00:46:45 +020038@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039def open_connection(host=None, port=None, *,
40 loop=None, limit=_DEFAULT_LIMIT, **kwds):
41 """A wrapper for create_connection() returning a (reader, writer) pair.
42
43 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010044 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
46 The arguments are all the usual arguments to create_connection()
47 except protocol_factory; most common are positional host and port,
48 with various optional keyword arguments following.
49
50 Additional optional keyword arguments are loop (to set the event loop
51 instance to use) and limit (to set the buffer limit passed to the
52 StreamReader).
53
54 (If you want to customize the StreamReader and/or
55 StreamReaderProtocol classes, just copy the code -- there's
56 really nothing special here except some convenience.)
57 """
58 if loop is None:
59 loop = events.get_event_loop()
60 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080061 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 transport, _ = yield from loop.create_connection(
63 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070064 writer = StreamWriter(transport, protocol, reader, loop)
65 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
67
Victor Stinnerf951d282014-06-29 00:46:45 +020068@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080069def start_server(client_connected_cb, host=None, port=None, *,
70 loop=None, limit=_DEFAULT_LIMIT, **kwds):
71 """Start a socket server, call back for each client connected.
72
73 The first parameter, `client_connected_cb`, takes two parameters:
74 client_reader, client_writer. client_reader is a StreamReader
75 object, while client_writer is a StreamWriter object. This
76 parameter can either be a plain callback function or a coroutine;
77 if it is a coroutine, it will be automatically converted into a
78 Task.
79
80 The rest of the arguments are all the usual arguments to
81 loop.create_server() except protocol_factory; most common are
82 positional host and port, with various optional keyword arguments
83 following. The return value is the same as loop.create_server().
84
85 Additional optional keyword arguments are loop (to set the event loop
86 instance to use) and limit (to set the buffer limit passed to the
87 StreamReader).
88
89 The return value is the same as loop.create_server(), i.e. a
90 Server object which can be used to stop the service.
91 """
92 if loop is None:
93 loop = events.get_event_loop()
94
95 def factory():
96 reader = StreamReader(limit=limit, loop=loop)
97 protocol = StreamReaderProtocol(reader, client_connected_cb,
98 loop=loop)
99 return protocol
100
101 return (yield from loop.create_server(factory, host, port, **kwds))
102
103
Yury Selivanovb057c522014-02-18 12:15:06 -0500104if hasattr(socket, 'AF_UNIX'):
105 # UNIX Domain Sockets are supported on this platform
106
Victor Stinnerf951d282014-06-29 00:46:45 +0200107 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500108 def open_unix_connection(path=None, *,
109 loop=None, limit=_DEFAULT_LIMIT, **kwds):
110 """Similar to `open_connection` but works with UNIX Domain Sockets."""
111 if loop is None:
112 loop = events.get_event_loop()
113 reader = StreamReader(limit=limit, loop=loop)
114 protocol = StreamReaderProtocol(reader, loop=loop)
115 transport, _ = yield from loop.create_unix_connection(
116 lambda: protocol, path, **kwds)
117 writer = StreamWriter(transport, protocol, reader, loop)
118 return reader, writer
119
120
Victor Stinnerf951d282014-06-29 00:46:45 +0200121 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500122 def start_unix_server(client_connected_cb, path=None, *,
123 loop=None, limit=_DEFAULT_LIMIT, **kwds):
124 """Similar to `start_server` but works with UNIX Domain Sockets."""
125 if loop is None:
126 loop = events.get_event_loop()
127
128 def factory():
129 reader = StreamReader(limit=limit, loop=loop)
130 protocol = StreamReaderProtocol(reader, client_connected_cb,
131 loop=loop)
132 return protocol
133
134 return (yield from loop.create_unix_server(factory, path, **kwds))
135
136
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800137class FlowControlMixin(protocols.Protocol):
138 """Reusable flow control logic for StreamWriter.drain().
139
140 This implements the protocol methods pause_writing(),
141 resume_reading() and connection_lost(). If the subclass overrides
142 these it must call the super methods.
143
144 StreamWriter.drain() must check for error conditions and then call
145 _make_drain_waiter(), which will return either () or a Future
146 depending on the paused state.
147 """
148
149 def __init__(self, loop=None):
150 self._loop = loop # May be None; we may never need it.
151 self._paused = False
152 self._drain_waiter = None
153
154 def pause_writing(self):
155 assert not self._paused
156 self._paused = True
157
158 def resume_writing(self):
159 assert self._paused
160 self._paused = False
161 waiter = self._drain_waiter
162 if waiter is not None:
163 self._drain_waiter = None
164 if not waiter.done():
165 waiter.set_result(None)
166
167 def connection_lost(self, exc):
168 # Wake up the writer if currently paused.
169 if not self._paused:
170 return
171 waiter = self._drain_waiter
172 if waiter is None:
173 return
174 self._drain_waiter = None
175 if waiter.done():
176 return
177 if exc is None:
178 waiter.set_result(None)
179 else:
180 waiter.set_exception(exc)
181
182 def _make_drain_waiter(self):
183 if not self._paused:
184 return ()
185 waiter = self._drain_waiter
186 assert waiter is None or waiter.cancelled()
187 waiter = futures.Future(loop=self._loop)
188 self._drain_waiter = waiter
189 return waiter
190
191
192class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
193 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194
195 (This is a helper class instead of making StreamReader itself a
196 Protocol subclass, because the StreamReader has other potential
197 uses, and to prevent the user of the StreamReader to accidentally
198 call inappropriate methods of the protocol.)
199 """
200
Guido van Rossum1540b162013-11-19 11:43:38 -0800201 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800202 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700203 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800204 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800205 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206
207 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700208 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800209 if self._client_connected_cb is not None:
210 self._stream_writer = StreamWriter(transport, self,
211 self._stream_reader,
212 self._loop)
213 res = self._client_connected_cb(self._stream_reader,
214 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200215 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200216 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218 def connection_lost(self, exc):
219 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700220 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700222 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800223 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224
225 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700226 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
228 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700229 self._stream_reader.feed_eof()
230
Guido van Rossum355491d2013-10-18 15:17:11 -0700231
232class StreamWriter:
233 """Wraps a Transport.
234
235 This exposes write(), writelines(), [can_]write_eof(),
236 get_extra_info() and close(). It adds drain() which returns an
237 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800238 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700239 directly.
240 """
241
242 def __init__(self, transport, protocol, reader, loop):
243 self._transport = transport
244 self._protocol = protocol
245 self._reader = reader
246 self._loop = loop
247
248 @property
249 def transport(self):
250 return self._transport
251
252 def write(self, data):
253 self._transport.write(data)
254
255 def writelines(self, data):
256 self._transport.writelines(data)
257
258 def write_eof(self):
259 return self._transport.write_eof()
260
261 def can_write_eof(self):
262 return self._transport.can_write_eof()
263
264 def close(self):
265 return self._transport.close()
266
267 def get_extra_info(self, name, default=None):
268 return self._transport.get_extra_info(name, default)
269
270 def drain(self):
271 """This method has an unusual return value.
272
273 The intended use is to write
274
275 w.write(data)
276 yield from w.drain()
277
278 When there's nothing to wait for, drain() returns (), and the
279 yield-from continues immediately. When the transport buffer
280 is full (the protocol is paused), drain() creates and returns
281 a Future and the yield-from will block until that Future is
282 completed, which will happen when the buffer is (partially)
283 drained and the protocol is resumed.
284 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800285 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800286 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700287 if self._transport._conn_lost: # Uses private variable.
288 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800289 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290
291
292class StreamReader:
293
294 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
295 # The line length limit is a security feature;
296 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700297 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 if loop is None:
299 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700300 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500301 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700302 self._eof = False # Whether we're done.
303 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 self._exception = None
305 self._transport = None
306 self._paused = False
307
308 def exception(self):
309 return self._exception
310
311 def set_exception(self, exc):
312 self._exception = exc
313
Guido van Rossum355491d2013-10-18 15:17:11 -0700314 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700316 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 if not waiter.cancelled():
318 waiter.set_exception(exc)
319
320 def set_transport(self, transport):
321 assert self._transport is None, 'Transport already set'
322 self._transport = transport
323
324 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500325 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700327 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328
329 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700330 self._eof = True
331 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700333 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 if not waiter.cancelled():
335 waiter.set_result(True)
336
Yury Selivanovf0020f52014-02-06 00:14:30 -0500337 def at_eof(self):
338 """Return True if the buffer is empty and 'feed_eof' was called."""
339 return self._eof and not self._buffer
340
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500342 assert not self._eof, 'feed_data after feed_eof'
343
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if not data:
345 return
346
Yury Selivanove694c972014-02-05 18:11:13 -0500347 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Guido van Rossum355491d2013-10-18 15:17:11 -0700349 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700351 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 if not waiter.cancelled():
353 waiter.set_result(False)
354
355 if (self._transport is not None and
356 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500357 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700359 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 except NotImplementedError:
361 # The transport can't be paused.
362 # We'll just have to buffer all data.
363 # Forget the transport so we don't keep trying.
364 self._transport = None
365 else:
366 self._paused = True
367
Victor Stinner183e3472014-01-23 17:40:03 +0100368 def _create_waiter(self, func_name):
369 # StreamReader uses a future to link the protocol feed_data() method
370 # to a read coroutine. Running two read coroutines at the same time
371 # would have an unexpected behaviour. It would not possible to know
372 # which coroutine would get the next data.
373 if self._waiter is not None:
374 raise RuntimeError('%s() called while another coroutine is '
375 'already waiting for incoming data' % func_name)
376 return futures.Future(loop=self._loop)
377
Victor Stinnerf951d282014-06-29 00:46:45 +0200378 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def readline(self):
380 if self._exception is not None:
381 raise self._exception
382
Yury Selivanove694c972014-02-05 18:11:13 -0500383 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 not_enough = True
385
386 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700387 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500388 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500390 line.extend(self._buffer)
391 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 else:
393 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500394 line.extend(self._buffer[:ichar])
395 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
Yury Selivanove694c972014-02-05 18:11:13 -0500398 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._maybe_resume_transport()
400 raise ValueError('Line is too long')
401
Guido van Rossum355491d2013-10-18 15:17:11 -0700402 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 break
404
405 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100406 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700408 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700410 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500413 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
Victor Stinnerf951d282014-06-29 00:46:45 +0200415 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 def read(self, n=-1):
417 if self._exception is not None:
418 raise self._exception
419
420 if not n:
421 return b''
422
423 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700424 # This used to just loop creating a new waiter hoping to
425 # collect everything in self._buffer, but that would
426 # deadlock if the subprocess sends more than self.limit
427 # bytes. So just call self.read(self._limit) until EOF.
428 blocks = []
429 while True:
430 block = yield from self.read(self._limit)
431 if not block:
432 break
433 blocks.append(block)
434 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500436 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100437 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700439 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700441 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442
Yury Selivanove694c972014-02-05 18:11:13 -0500443 if n < 0 or len(self._buffer) <= n:
444 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700445 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500446 else:
447 # n > 0 and len(self._buffer) > n
448 data = bytes(self._buffer[:n])
449 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Yury Selivanove694c972014-02-05 18:11:13 -0500451 self._maybe_resume_transport()
452 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
Victor Stinnerf951d282014-06-29 00:46:45 +0200454 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 def readexactly(self, n):
456 if self._exception is not None:
457 raise self._exception
458
Guido van Rossum38455212014-01-06 16:09:18 -0800459 # There used to be "optimized" code here. It created its own
460 # Future and waited until self._buffer had at least the n
461 # bytes, then called read(n). Unfortunately, this could pause
462 # the transport if the argument was larger than the pause
463 # limit (which is twice self._limit). So now we just read()
464 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
Guido van Rossum38455212014-01-06 16:09:18 -0800466 blocks = []
467 while n > 0:
468 block = yield from self.read(n)
469 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100470 partial = b''.join(blocks)
471 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800472 blocks.append(block)
473 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
Guido van Rossum38455212014-01-06 16:09:18 -0800475 return b''.join(blocks)