blob: e239248d11c5560a3174ce1ab1fbcd005eda6557 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
Yury Selivanovb057c522014-02-18 12:15:06 -05005 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08006 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Yury Selivanovb057c522014-02-18 12:15:06 -05008import socket
9
Guido van Rossume3e786c2014-02-18 10:24:30 -080010if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
12
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import events
14from . import futures
15from . import protocols
16from . import tasks
17
18
19_DEFAULT_LIMIT = 2**16
20
Guido van Rossuma849be92014-01-30 16:05:28 -080021
Victor Stinner8dffc452014-01-25 15:32:06 +010022class IncompleteReadError(EOFError):
23 """
24 Incomplete read error. Attributes:
25
26 - partial: read bytes string before the end of stream was reached
27 - expected: total number of expected bytes
28 """
29 def __init__(self, partial, expected):
30 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
31 % (len(partial), expected))
32 self.partial = partial
33 self.expected = expected
34
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36@tasks.coroutine
37def open_connection(host=None, port=None, *,
38 loop=None, limit=_DEFAULT_LIMIT, **kwds):
39 """A wrapper for create_connection() returning a (reader, writer) pair.
40
41 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010042 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44 The arguments are all the usual arguments to create_connection()
45 except protocol_factory; most common are positional host and port,
46 with various optional keyword arguments following.
47
48 Additional optional keyword arguments are loop (to set the event loop
49 instance to use) and limit (to set the buffer limit passed to the
50 StreamReader).
51
52 (If you want to customize the StreamReader and/or
53 StreamReaderProtocol classes, just copy the code -- there's
54 really nothing special here except some convenience.)
55 """
56 if loop is None:
57 loop = events.get_event_loop()
58 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080059 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 transport, _ = yield from loop.create_connection(
61 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070062 writer = StreamWriter(transport, protocol, reader, loop)
63 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064
65
Guido van Rossum1540b162013-11-19 11:43:38 -080066@tasks.coroutine
67def start_server(client_connected_cb, host=None, port=None, *,
68 loop=None, limit=_DEFAULT_LIMIT, **kwds):
69 """Start a socket server, call back for each client connected.
70
71 The first parameter, `client_connected_cb`, takes two parameters:
72 client_reader, client_writer. client_reader is a StreamReader
73 object, while client_writer is a StreamWriter object. This
74 parameter can either be a plain callback function or a coroutine;
75 if it is a coroutine, it will be automatically converted into a
76 Task.
77
78 The rest of the arguments are all the usual arguments to
79 loop.create_server() except protocol_factory; most common are
80 positional host and port, with various optional keyword arguments
81 following. The return value is the same as loop.create_server().
82
83 Additional optional keyword arguments are loop (to set the event loop
84 instance to use) and limit (to set the buffer limit passed to the
85 StreamReader).
86
87 The return value is the same as loop.create_server(), i.e. a
88 Server object which can be used to stop the service.
89 """
90 if loop is None:
91 loop = events.get_event_loop()
92
93 def factory():
94 reader = StreamReader(limit=limit, loop=loop)
95 protocol = StreamReaderProtocol(reader, client_connected_cb,
96 loop=loop)
97 return protocol
98
99 return (yield from loop.create_server(factory, host, port, **kwds))
100
101
Yury Selivanovb057c522014-02-18 12:15:06 -0500102if hasattr(socket, 'AF_UNIX'):
103 # UNIX Domain Sockets are supported on this platform
104
105 @tasks.coroutine
106 def open_unix_connection(path=None, *,
107 loop=None, limit=_DEFAULT_LIMIT, **kwds):
108 """Similar to `open_connection` but works with UNIX Domain Sockets."""
109 if loop is None:
110 loop = events.get_event_loop()
111 reader = StreamReader(limit=limit, loop=loop)
112 protocol = StreamReaderProtocol(reader, loop=loop)
113 transport, _ = yield from loop.create_unix_connection(
114 lambda: protocol, path, **kwds)
115 writer = StreamWriter(transport, protocol, reader, loop)
116 return reader, writer
117
118
119 @tasks.coroutine
120 def start_unix_server(client_connected_cb, path=None, *,
121 loop=None, limit=_DEFAULT_LIMIT, **kwds):
122 """Similar to `start_server` but works with UNIX Domain Sockets."""
123 if loop is None:
124 loop = events.get_event_loop()
125
126 def factory():
127 reader = StreamReader(limit=limit, loop=loop)
128 protocol = StreamReaderProtocol(reader, client_connected_cb,
129 loop=loop)
130 return protocol
131
132 return (yield from loop.create_unix_server(factory, path, **kwds))
133
134
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800135class FlowControlMixin(protocols.Protocol):
136 """Reusable flow control logic for StreamWriter.drain().
137
138 This implements the protocol methods pause_writing(),
139 resume_reading() and connection_lost(). If the subclass overrides
140 these it must call the super methods.
141
142 StreamWriter.drain() must check for error conditions and then call
143 _make_drain_waiter(), which will return either () or a Future
144 depending on the paused state.
145 """
146
147 def __init__(self, loop=None):
148 self._loop = loop # May be None; we may never need it.
149 self._paused = False
150 self._drain_waiter = None
151
152 def pause_writing(self):
153 assert not self._paused
154 self._paused = True
155
156 def resume_writing(self):
157 assert self._paused
158 self._paused = False
159 waiter = self._drain_waiter
160 if waiter is not None:
161 self._drain_waiter = None
162 if not waiter.done():
163 waiter.set_result(None)
164
165 def connection_lost(self, exc):
166 # Wake up the writer if currently paused.
167 if not self._paused:
168 return
169 waiter = self._drain_waiter
170 if waiter is None:
171 return
172 self._drain_waiter = None
173 if waiter.done():
174 return
175 if exc is None:
176 waiter.set_result(None)
177 else:
178 waiter.set_exception(exc)
179
180 def _make_drain_waiter(self):
181 if not self._paused:
182 return ()
183 waiter = self._drain_waiter
184 assert waiter is None or waiter.cancelled()
185 waiter = futures.Future(loop=self._loop)
186 self._drain_waiter = waiter
187 return waiter
188
189
190class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
191 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
193 (This is a helper class instead of making StreamReader itself a
194 Protocol subclass, because the StreamReader has other potential
195 uses, and to prevent the user of the StreamReader to accidentally
196 call inappropriate methods of the protocol.)
197 """
198
Guido van Rossum1540b162013-11-19 11:43:38 -0800199 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800200 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700201 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800202 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800203 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204
205 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700206 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800207 if self._client_connected_cb is not None:
208 self._stream_writer = StreamWriter(transport, self,
209 self._stream_reader,
210 self._loop)
211 res = self._client_connected_cb(self._stream_reader,
212 self._stream_writer)
213 if tasks.iscoroutine(res):
214 tasks.Task(res, loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215
216 def connection_lost(self, exc):
217 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700218 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700220 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800221 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222
223 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700224 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225
226 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700227 self._stream_reader.feed_eof()
228
Guido van Rossum355491d2013-10-18 15:17:11 -0700229
230class StreamWriter:
231 """Wraps a Transport.
232
233 This exposes write(), writelines(), [can_]write_eof(),
234 get_extra_info() and close(). It adds drain() which returns an
235 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800236 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700237 directly.
238 """
239
240 def __init__(self, transport, protocol, reader, loop):
241 self._transport = transport
242 self._protocol = protocol
243 self._reader = reader
244 self._loop = loop
245
246 @property
247 def transport(self):
248 return self._transport
249
250 def write(self, data):
251 self._transport.write(data)
252
253 def writelines(self, data):
254 self._transport.writelines(data)
255
256 def write_eof(self):
257 return self._transport.write_eof()
258
259 def can_write_eof(self):
260 return self._transport.can_write_eof()
261
262 def close(self):
263 return self._transport.close()
264
265 def get_extra_info(self, name, default=None):
266 return self._transport.get_extra_info(name, default)
267
268 def drain(self):
269 """This method has an unusual return value.
270
271 The intended use is to write
272
273 w.write(data)
274 yield from w.drain()
275
276 When there's nothing to wait for, drain() returns (), and the
277 yield-from continues immediately. When the transport buffer
278 is full (the protocol is paused), drain() creates and returns
279 a Future and the yield-from will block until that Future is
280 completed, which will happen when the buffer is (partially)
281 drained and the protocol is resumed.
282 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800283 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800284 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700285 if self._transport._conn_lost: # Uses private variable.
286 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800287 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288
289
290class StreamReader:
291
292 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
293 # The line length limit is a security feature;
294 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700295 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 if loop is None:
297 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700298 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500299 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700300 self._eof = False # Whether we're done.
301 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 self._exception = None
303 self._transport = None
304 self._paused = False
305
306 def exception(self):
307 return self._exception
308
309 def set_exception(self, exc):
310 self._exception = exc
311
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700314 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 if not waiter.cancelled():
316 waiter.set_exception(exc)
317
318 def set_transport(self, transport):
319 assert self._transport is None, 'Transport already set'
320 self._transport = transport
321
322 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500323 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700325 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700328 self._eof = True
329 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700331 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 if not waiter.cancelled():
333 waiter.set_result(True)
334
Yury Selivanovf0020f52014-02-06 00:14:30 -0500335 def at_eof(self):
336 """Return True if the buffer is empty and 'feed_eof' was called."""
337 return self._eof and not self._buffer
338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500340 assert not self._eof, 'feed_data after feed_eof'
341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 if not data:
343 return
344
Yury Selivanove694c972014-02-05 18:11:13 -0500345 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
Guido van Rossum355491d2013-10-18 15:17:11 -0700347 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700349 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 if not waiter.cancelled():
351 waiter.set_result(False)
352
353 if (self._transport is not None and
354 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500355 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700357 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 except NotImplementedError:
359 # The transport can't be paused.
360 # We'll just have to buffer all data.
361 # Forget the transport so we don't keep trying.
362 self._transport = None
363 else:
364 self._paused = True
365
Victor Stinner183e3472014-01-23 17:40:03 +0100366 def _create_waiter(self, func_name):
367 # StreamReader uses a future to link the protocol feed_data() method
368 # to a read coroutine. Running two read coroutines at the same time
369 # would have an unexpected behaviour. It would not possible to know
370 # which coroutine would get the next data.
371 if self._waiter is not None:
372 raise RuntimeError('%s() called while another coroutine is '
373 'already waiting for incoming data' % func_name)
374 return futures.Future(loop=self._loop)
375
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 @tasks.coroutine
377 def readline(self):
378 if self._exception is not None:
379 raise self._exception
380
Yury Selivanove694c972014-02-05 18:11:13 -0500381 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 not_enough = True
383
384 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700385 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500386 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500388 line.extend(self._buffer)
389 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 else:
391 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500392 line.extend(self._buffer[:ichar])
393 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
Yury Selivanove694c972014-02-05 18:11:13 -0500396 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 self._maybe_resume_transport()
398 raise ValueError('Line is too long')
399
Guido van Rossum355491d2013-10-18 15:17:11 -0700400 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 break
402
403 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100404 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700406 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700408 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500411 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412
413 @tasks.coroutine
414 def read(self, n=-1):
415 if self._exception is not None:
416 raise self._exception
417
418 if not n:
419 return b''
420
421 if n < 0:
Guido van Rossumbf88ffb2014-05-12 10:04:37 -0700422 # This used to just loop creating a new waiter hoping to
423 # collect everything in self._buffer, but that would
424 # deadlock if the subprocess sends more than self.limit
425 # bytes. So just call self.read(self._limit) until EOF.
426 blocks = []
427 while True:
428 block = yield from self.read(self._limit)
429 if not block:
430 break
431 blocks.append(block)
432 return b''.join(blocks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500434 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100435 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700437 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700439 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
Yury Selivanove694c972014-02-05 18:11:13 -0500441 if n < 0 or len(self._buffer) <= n:
442 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700443 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500444 else:
445 # n > 0 and len(self._buffer) > n
446 data = bytes(self._buffer[:n])
447 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
Yury Selivanove694c972014-02-05 18:11:13 -0500449 self._maybe_resume_transport()
450 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 @tasks.coroutine
453 def readexactly(self, n):
454 if self._exception is not None:
455 raise self._exception
456
Guido van Rossum38455212014-01-06 16:09:18 -0800457 # There used to be "optimized" code here. It created its own
458 # Future and waited until self._buffer had at least the n
459 # bytes, then called read(n). Unfortunately, this could pause
460 # the transport if the argument was larger than the pause
461 # limit (which is twice self._limit). So now we just read()
462 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463
Guido van Rossum38455212014-01-06 16:09:18 -0800464 blocks = []
465 while n > 0:
466 block = yield from self.read(n)
467 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100468 partial = b''.join(blocks)
469 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800470 blocks.append(block)
471 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472
Guido van Rossum38455212014-01-06 16:09:18 -0800473 return b''.join(blocks)