blob: 9b654cdbb451b05d0df60d7ab1d8a47b85d544ff [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Victor Stinnerf951d282014-06-29 00:46:45 +020013from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import events
15from . import futures
16from . import protocols
Victor Stinnerf951d282014-06-29 00:46:45 +020017from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018
19
20_DEFAULT_LIMIT = 2**16
21
Guido van Rossuma849be92014-01-30 16:05:28 -080022
Victor Stinner8dffc452014-01-25 15:32:06 +010023class IncompleteReadError(EOFError):
24 """
25 Incomplete read error. Attributes:
26
27 - partial: read bytes string before the end of stream was reached
28 - expected: total number of expected bytes
29 """
30 def __init__(self, partial, expected):
31 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
32 % (len(partial), expected))
33 self.partial = partial
34 self.expected = expected
35
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Victor Stinnerf951d282014-06-29 00:46:45 +020037@coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038def open_connection(host=None, port=None, *,
39 loop=None, limit=_DEFAULT_LIMIT, **kwds):
40 """A wrapper for create_connection() returning a (reader, writer) pair.
41
42 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010043 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
45 The arguments are all the usual arguments to create_connection()
46 except protocol_factory; most common are positional host and port,
47 with various optional keyword arguments following.
48
49 Additional optional keyword arguments are loop (to set the event loop
50 instance to use) and limit (to set the buffer limit passed to the
51 StreamReader).
52
53 (If you want to customize the StreamReader and/or
54 StreamReaderProtocol classes, just copy the code -- there's
55 really nothing special here except some convenience.)
56 """
57 if loop is None:
58 loop = events.get_event_loop()
59 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080060 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061 transport, _ = yield from loop.create_connection(
62 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070063 writer = StreamWriter(transport, protocol, reader, loop)
64 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070065
66
Victor Stinnerf951d282014-06-29 00:46:45 +020067@coroutine
Guido van Rossum1540b162013-11-19 11:43:38 -080068def start_server(client_connected_cb, host=None, port=None, *,
69 loop=None, limit=_DEFAULT_LIMIT, **kwds):
70 """Start a socket server, call back for each client connected.
71
72 The first parameter, `client_connected_cb`, takes two parameters:
73 client_reader, client_writer. client_reader is a StreamReader
74 object, while client_writer is a StreamWriter object. This
75 parameter can either be a plain callback function or a coroutine;
76 if it is a coroutine, it will be automatically converted into a
77 Task.
78
79 The rest of the arguments are all the usual arguments to
80 loop.create_server() except protocol_factory; most common are
81 positional host and port, with various optional keyword arguments
82 following. The return value is the same as loop.create_server().
83
84 Additional optional keyword arguments are loop (to set the event loop
85 instance to use) and limit (to set the buffer limit passed to the
86 StreamReader).
87
88 The return value is the same as loop.create_server(), i.e. a
89 Server object which can be used to stop the service.
90 """
91 if loop is None:
92 loop = events.get_event_loop()
93
94 def factory():
95 reader = StreamReader(limit=limit, loop=loop)
96 protocol = StreamReaderProtocol(reader, client_connected_cb,
97 loop=loop)
98 return protocol
99
100 return (yield from loop.create_server(factory, host, port, **kwds))
101
102
Yury Selivanovb057c522014-02-18 12:15:06 -0500103if hasattr(socket, 'AF_UNIX'):
104 # UNIX Domain Sockets are supported on this platform
105
Victor Stinnerf951d282014-06-29 00:46:45 +0200106 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500107 def open_unix_connection(path=None, *,
108 loop=None, limit=_DEFAULT_LIMIT, **kwds):
109 """Similar to `open_connection` but works with UNIX Domain Sockets."""
110 if loop is None:
111 loop = events.get_event_loop()
112 reader = StreamReader(limit=limit, loop=loop)
113 protocol = StreamReaderProtocol(reader, loop=loop)
114 transport, _ = yield from loop.create_unix_connection(
115 lambda: protocol, path, **kwds)
116 writer = StreamWriter(transport, protocol, reader, loop)
117 return reader, writer
118
119
Victor Stinnerf951d282014-06-29 00:46:45 +0200120 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500121 def start_unix_server(client_connected_cb, path=None, *,
122 loop=None, limit=_DEFAULT_LIMIT, **kwds):
123 """Similar to `start_server` but works with UNIX Domain Sockets."""
124 if loop is None:
125 loop = events.get_event_loop()
126
127 def factory():
128 reader = StreamReader(limit=limit, loop=loop)
129 protocol = StreamReaderProtocol(reader, client_connected_cb,
130 loop=loop)
131 return protocol
132
133 return (yield from loop.create_unix_server(factory, path, **kwds))
134
135
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800136class FlowControlMixin(protocols.Protocol):
137 """Reusable flow control logic for StreamWriter.drain().
138
139 This implements the protocol methods pause_writing(),
140 resume_reading() and connection_lost(). If the subclass overrides
141 these it must call the super methods.
142
143 StreamWriter.drain() must check for error conditions and then call
144 _make_drain_waiter(), which will return either () or a Future
145 depending on the paused state.
146 """
147
148 def __init__(self, loop=None):
149 self._loop = loop # May be None; we may never need it.
150 self._paused = False
151 self._drain_waiter = None
152
153 def pause_writing(self):
154 assert not self._paused
155 self._paused = True
156
157 def resume_writing(self):
158 assert self._paused
159 self._paused = False
160 waiter = self._drain_waiter
161 if waiter is not None:
162 self._drain_waiter = None
163 if not waiter.done():
164 waiter.set_result(None)
165
166 def connection_lost(self, exc):
167 # Wake up the writer if currently paused.
168 if not self._paused:
169 return
170 waiter = self._drain_waiter
171 if waiter is None:
172 return
173 self._drain_waiter = None
174 if waiter.done():
175 return
176 if exc is None:
177 waiter.set_result(None)
178 else:
179 waiter.set_exception(exc)
180
181 def _make_drain_waiter(self):
182 if not self._paused:
183 return ()
184 waiter = self._drain_waiter
185 assert waiter is None or waiter.cancelled()
186 waiter = futures.Future(loop=self._loop)
187 self._drain_waiter = waiter
188 return waiter
189
190
191class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
192 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
194 (This is a helper class instead of making StreamReader itself a
195 Protocol subclass, because the StreamReader has other potential
196 uses, and to prevent the user of the StreamReader to accidentally
197 call inappropriate methods of the protocol.)
198 """
199
Guido van Rossum1540b162013-11-19 11:43:38 -0800200 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800201 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700202 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800203 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800204 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
206 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700207 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800208 if self._client_connected_cb is not None:
209 self._stream_writer = StreamWriter(transport, self,
210 self._stream_reader,
211 self._loop)
212 res = self._client_connected_cb(self._stream_reader,
213 self._stream_writer)
Victor Stinnerf951d282014-06-29 00:46:45 +0200214 if coroutines.iscoroutine(res):
Victor Stinner896a25a2014-07-08 11:29:25 +0200215 self._loop.create_task(res)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216
217 def connection_lost(self, exc):
218 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700219 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700221 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800222 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223
224 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226
227 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700228 self._stream_reader.feed_eof()
229
Guido van Rossum355491d2013-10-18 15:17:11 -0700230
231class StreamWriter:
232 """Wraps a Transport.
233
234 This exposes write(), writelines(), [can_]write_eof(),
235 get_extra_info() and close(). It adds drain() which returns an
236 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800237 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700238 directly.
239 """
240
241 def __init__(self, transport, protocol, reader, loop):
242 self._transport = transport
243 self._protocol = protocol
244 self._reader = reader
245 self._loop = loop
246
247 @property
248 def transport(self):
249 return self._transport
250
251 def write(self, data):
252 self._transport.write(data)
253
254 def writelines(self, data):
255 self._transport.writelines(data)
256
257 def write_eof(self):
258 return self._transport.write_eof()
259
260 def can_write_eof(self):
261 return self._transport.can_write_eof()
262
263 def close(self):
264 return self._transport.close()
265
266 def get_extra_info(self, name, default=None):
267 return self._transport.get_extra_info(name, default)
268
269 def drain(self):
270 """This method has an unusual return value.
271
272 The intended use is to write
273
274 w.write(data)
275 yield from w.drain()
276
277 When there's nothing to wait for, drain() returns (), and the
278 yield-from continues immediately. When the transport buffer
279 is full (the protocol is paused), drain() creates and returns
280 a Future and the yield-from will block until that Future is
281 completed, which will happen when the buffer is (partially)
282 drained and the protocol is resumed.
283 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800284 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800285 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700286 if self._transport._conn_lost: # Uses private variable.
287 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800288 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
290
291class StreamReader:
292
293 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
294 # The line length limit is a security feature;
295 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700296 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 if loop is None:
298 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700299 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500300 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700301 self._eof = False # Whether we're done.
302 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 self._exception = None
304 self._transport = None
305 self._paused = False
306
307 def exception(self):
308 return self._exception
309
310 def set_exception(self, exc):
311 self._exception = exc
312
Guido van Rossum355491d2013-10-18 15:17:11 -0700313 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700315 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 if not waiter.cancelled():
317 waiter.set_exception(exc)
318
319 def set_transport(self, transport):
320 assert self._transport is None, 'Transport already set'
321 self._transport = transport
322
323 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500324 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700326 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700329 self._eof = True
330 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700332 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 if not waiter.cancelled():
334 waiter.set_result(True)
335
Yury Selivanovf0020f52014-02-06 00:14:30 -0500336 def at_eof(self):
337 """Return True if the buffer is empty and 'feed_eof' was called."""
338 return self._eof and not self._buffer
339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500341 assert not self._eof, 'feed_data after feed_eof'
342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if not data:
344 return
345
Yury Selivanove694c972014-02-05 18:11:13 -0500346 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
Guido van Rossum355491d2013-10-18 15:17:11 -0700348 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700350 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 if not waiter.cancelled():
352 waiter.set_result(False)
353
354 if (self._transport is not None and
355 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500356 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700358 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 except NotImplementedError:
360 # The transport can't be paused.
361 # We'll just have to buffer all data.
362 # Forget the transport so we don't keep trying.
363 self._transport = None
364 else:
365 self._paused = True
366
Victor Stinner183e3472014-01-23 17:40:03 +0100367 def _create_waiter(self, func_name):
368 # StreamReader uses a future to link the protocol feed_data() method
369 # to a read coroutine. Running two read coroutines at the same time
370 # would have an unexpected behaviour. It would not possible to know
371 # which coroutine would get the next data.
372 if self._waiter is not None:
373 raise RuntimeError('%s() called while another coroutine is '
374 'already waiting for incoming data' % func_name)
375 return futures.Future(loop=self._loop)
376
Victor Stinnerf951d282014-06-29 00:46:45 +0200377 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 def readline(self):
379 if self._exception is not None:
380 raise self._exception
381
Yury Selivanove694c972014-02-05 18:11:13 -0500382 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 not_enough = True
384
385 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700386 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500387 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500389 line.extend(self._buffer)
390 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 else:
392 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500393 line.extend(self._buffer[:ichar])
394 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Yury Selivanove694c972014-02-05 18:11:13 -0500397 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 self._maybe_resume_transport()
399 raise ValueError('Line is too long')
400
Guido van Rossum355491d2013-10-18 15:17:11 -0700401 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 break
403
404 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100405 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700407 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700409 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500412 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
Victor Stinnerf951d282014-06-29 00:46:45 +0200414 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 def read(self, n=-1):
416 if self._exception is not None:
417 raise self._exception
418
419 if not n:
420 return b''
421
422 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700423 # This used to just loop creating a new waiter hoping to
424 # collect everything in self._buffer, but that would
425 # deadlock if the subprocess sends more than self.limit
426 # bytes. So just call self.read(self._limit) until EOF.
427 blocks = []
428 while True:
429 block = yield from self.read(self._limit)
430 if not block:
431 break
432 blocks.append(block)
433 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500435 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100436 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700438 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700440 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
Yury Selivanove694c972014-02-05 18:11:13 -0500442 if n < 0 or len(self._buffer) <= n:
443 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700444 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500445 else:
446 # n > 0 and len(self._buffer) > n
447 data = bytes(self._buffer[:n])
448 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
Yury Selivanove694c972014-02-05 18:11:13 -0500450 self._maybe_resume_transport()
451 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
Victor Stinnerf951d282014-06-29 00:46:45 +0200453 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 def readexactly(self, n):
455 if self._exception is not None:
456 raise self._exception
457
Guido van Rossum38455212014-01-06 16:09:18 -0800458 # There used to be "optimized" code here. It created its own
459 # Future and waited until self._buffer had at least the n
460 # bytes, then called read(n). Unfortunately, this could pause
461 # the transport if the argument was larger than the pause
462 # limit (which is twice self._limit). So now we just read()
463 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
Guido van Rossum38455212014-01-06 16:09:18 -0800465 blocks = []
466 while n > 0:
467 block = yield from self.read(n)
468 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100469 partial = b''.join(blocks)
470 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800471 blocks.append(block)
472 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473
Guido van Rossum38455212014-01-06 16:09:18 -0800474 return b''.join(blocks)