blob: 698c5c6b184201903897b791287ddb9b4a0d4758 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Yury Selivanovb057c522014-02-18 12:15:06 -05004 'open_connection', 'start_server',
5 'open_unix_connection', 'start_unix_server',
6 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08007 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
Yury Selivanovb057c522014-02-18 12:15:06 -05009import socket
10
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011from . import events
12from . import futures
13from . import protocols
14from . import tasks
15
16
17_DEFAULT_LIMIT = 2**16
18
Guido van Rossuma849be92014-01-30 16:05:28 -080019
Victor Stinner8dffc452014-01-25 15:32:06 +010020class IncompleteReadError(EOFError):
21 """
22 Incomplete read error. Attributes:
23
24 - partial: read bytes string before the end of stream was reached
25 - expected: total number of expected bytes
26 """
27 def __init__(self, partial, expected):
28 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
29 % (len(partial), expected))
30 self.partial = partial
31 self.expected = expected
32
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
34@tasks.coroutine
35def open_connection(host=None, port=None, *,
36 loop=None, limit=_DEFAULT_LIMIT, **kwds):
37 """A wrapper for create_connection() returning a (reader, writer) pair.
38
39 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010040 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42 The arguments are all the usual arguments to create_connection()
43 except protocol_factory; most common are positional host and port,
44 with various optional keyword arguments following.
45
46 Additional optional keyword arguments are loop (to set the event loop
47 instance to use) and limit (to set the buffer limit passed to the
48 StreamReader).
49
50 (If you want to customize the StreamReader and/or
51 StreamReaderProtocol classes, just copy the code -- there's
52 really nothing special here except some convenience.)
53 """
54 if loop is None:
55 loop = events.get_event_loop()
56 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080057 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 transport, _ = yield from loop.create_connection(
59 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070060 writer = StreamWriter(transport, protocol, reader, loop)
61 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
63
Guido van Rossum1540b162013-11-19 11:43:38 -080064@tasks.coroutine
65def start_server(client_connected_cb, host=None, port=None, *,
66 loop=None, limit=_DEFAULT_LIMIT, **kwds):
67 """Start a socket server, call back for each client connected.
68
69 The first parameter, `client_connected_cb`, takes two parameters:
70 client_reader, client_writer. client_reader is a StreamReader
71 object, while client_writer is a StreamWriter object. This
72 parameter can either be a plain callback function or a coroutine;
73 if it is a coroutine, it will be automatically converted into a
74 Task.
75
76 The rest of the arguments are all the usual arguments to
77 loop.create_server() except protocol_factory; most common are
78 positional host and port, with various optional keyword arguments
79 following. The return value is the same as loop.create_server().
80
81 Additional optional keyword arguments are loop (to set the event loop
82 instance to use) and limit (to set the buffer limit passed to the
83 StreamReader).
84
85 The return value is the same as loop.create_server(), i.e. a
86 Server object which can be used to stop the service.
87 """
88 if loop is None:
89 loop = events.get_event_loop()
90
91 def factory():
92 reader = StreamReader(limit=limit, loop=loop)
93 protocol = StreamReaderProtocol(reader, client_connected_cb,
94 loop=loop)
95 return protocol
96
97 return (yield from loop.create_server(factory, host, port, **kwds))
98
99
Yury Selivanovb057c522014-02-18 12:15:06 -0500100if hasattr(socket, 'AF_UNIX'):
101 # UNIX Domain Sockets are supported on this platform
102
103 @tasks.coroutine
104 def open_unix_connection(path=None, *,
105 loop=None, limit=_DEFAULT_LIMIT, **kwds):
106 """Similar to `open_connection` but works with UNIX Domain Sockets."""
107 if loop is None:
108 loop = events.get_event_loop()
109 reader = StreamReader(limit=limit, loop=loop)
110 protocol = StreamReaderProtocol(reader, loop=loop)
111 transport, _ = yield from loop.create_unix_connection(
112 lambda: protocol, path, **kwds)
113 writer = StreamWriter(transport, protocol, reader, loop)
114 return reader, writer
115
116
117 @tasks.coroutine
118 def start_unix_server(client_connected_cb, path=None, *,
119 loop=None, limit=_DEFAULT_LIMIT, **kwds):
120 """Similar to `start_server` but works with UNIX Domain Sockets."""
121 if loop is None:
122 loop = events.get_event_loop()
123
124 def factory():
125 reader = StreamReader(limit=limit, loop=loop)
126 protocol = StreamReaderProtocol(reader, client_connected_cb,
127 loop=loop)
128 return protocol
129
130 return (yield from loop.create_unix_server(factory, path, **kwds))
131
132
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800133class FlowControlMixin(protocols.Protocol):
134 """Reusable flow control logic for StreamWriter.drain().
135
136 This implements the protocol methods pause_writing(),
137 resume_reading() and connection_lost(). If the subclass overrides
138 these it must call the super methods.
139
140 StreamWriter.drain() must check for error conditions and then call
141 _make_drain_waiter(), which will return either () or a Future
142 depending on the paused state.
143 """
144
145 def __init__(self, loop=None):
146 self._loop = loop # May be None; we may never need it.
147 self._paused = False
148 self._drain_waiter = None
149
150 def pause_writing(self):
151 assert not self._paused
152 self._paused = True
153
154 def resume_writing(self):
155 assert self._paused
156 self._paused = False
157 waiter = self._drain_waiter
158 if waiter is not None:
159 self._drain_waiter = None
160 if not waiter.done():
161 waiter.set_result(None)
162
163 def connection_lost(self, exc):
164 # Wake up the writer if currently paused.
165 if not self._paused:
166 return
167 waiter = self._drain_waiter
168 if waiter is None:
169 return
170 self._drain_waiter = None
171 if waiter.done():
172 return
173 if exc is None:
174 waiter.set_result(None)
175 else:
176 waiter.set_exception(exc)
177
178 def _make_drain_waiter(self):
179 if not self._paused:
180 return ()
181 waiter = self._drain_waiter
182 assert waiter is None or waiter.cancelled()
183 waiter = futures.Future(loop=self._loop)
184 self._drain_waiter = waiter
185 return waiter
186
187
188class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
189 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190
191 (This is a helper class instead of making StreamReader itself a
192 Protocol subclass, because the StreamReader has other potential
193 uses, and to prevent the user of the StreamReader to accidentally
194 call inappropriate methods of the protocol.)
195 """
196
Guido van Rossum1540b162013-11-19 11:43:38 -0800197 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800198 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700199 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800200 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800201 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202
203 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700204 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800205 if self._client_connected_cb is not None:
206 self._stream_writer = StreamWriter(transport, self,
207 self._stream_reader,
208 self._loop)
209 res = self._client_connected_cb(self._stream_reader,
210 self._stream_writer)
211 if tasks.iscoroutine(res):
212 tasks.Task(res, loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
214 def connection_lost(self, exc):
215 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700216 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700218 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800219 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220
221 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700222 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223
224 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700225 self._stream_reader.feed_eof()
226
Guido van Rossum355491d2013-10-18 15:17:11 -0700227
228class StreamWriter:
229 """Wraps a Transport.
230
231 This exposes write(), writelines(), [can_]write_eof(),
232 get_extra_info() and close(). It adds drain() which returns an
233 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800234 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700235 directly.
236 """
237
238 def __init__(self, transport, protocol, reader, loop):
239 self._transport = transport
240 self._protocol = protocol
241 self._reader = reader
242 self._loop = loop
243
244 @property
245 def transport(self):
246 return self._transport
247
248 def write(self, data):
249 self._transport.write(data)
250
251 def writelines(self, data):
252 self._transport.writelines(data)
253
254 def write_eof(self):
255 return self._transport.write_eof()
256
257 def can_write_eof(self):
258 return self._transport.can_write_eof()
259
260 def close(self):
261 return self._transport.close()
262
263 def get_extra_info(self, name, default=None):
264 return self._transport.get_extra_info(name, default)
265
266 def drain(self):
267 """This method has an unusual return value.
268
269 The intended use is to write
270
271 w.write(data)
272 yield from w.drain()
273
274 When there's nothing to wait for, drain() returns (), and the
275 yield-from continues immediately. When the transport buffer
276 is full (the protocol is paused), drain() creates and returns
277 a Future and the yield-from will block until that Future is
278 completed, which will happen when the buffer is (partially)
279 drained and the protocol is resumed.
280 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800281 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800282 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700283 if self._transport._conn_lost: # Uses private variable.
284 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800285 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286
287
288class StreamReader:
289
290 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
291 # The line length limit is a security feature;
292 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700293 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 if loop is None:
295 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700296 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500297 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700298 self._eof = False # Whether we're done.
299 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 self._exception = None
301 self._transport = None
302 self._paused = False
303
304 def exception(self):
305 return self._exception
306
307 def set_exception(self, exc):
308 self._exception = exc
309
Guido van Rossum355491d2013-10-18 15:17:11 -0700310 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700312 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 if not waiter.cancelled():
314 waiter.set_exception(exc)
315
316 def set_transport(self, transport):
317 assert self._transport is None, 'Transport already set'
318 self._transport = transport
319
320 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500321 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700323 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700326 self._eof = True
327 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700329 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 if not waiter.cancelled():
331 waiter.set_result(True)
332
Yury Selivanovf0020f52014-02-06 00:14:30 -0500333 def at_eof(self):
334 """Return True if the buffer is empty and 'feed_eof' was called."""
335 return self._eof and not self._buffer
336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500338 assert not self._eof, 'feed_data after feed_eof'
339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 if not data:
341 return
342
Yury Selivanove694c972014-02-05 18:11:13 -0500343 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344
Guido van Rossum355491d2013-10-18 15:17:11 -0700345 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700347 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 if not waiter.cancelled():
349 waiter.set_result(False)
350
351 if (self._transport is not None and
352 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500353 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700355 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 except NotImplementedError:
357 # The transport can't be paused.
358 # We'll just have to buffer all data.
359 # Forget the transport so we don't keep trying.
360 self._transport = None
361 else:
362 self._paused = True
363
Victor Stinner183e3472014-01-23 17:40:03 +0100364 def _create_waiter(self, func_name):
365 # StreamReader uses a future to link the protocol feed_data() method
366 # to a read coroutine. Running two read coroutines at the same time
367 # would have an unexpected behaviour. It would not possible to know
368 # which coroutine would get the next data.
369 if self._waiter is not None:
370 raise RuntimeError('%s() called while another coroutine is '
371 'already waiting for incoming data' % func_name)
372 return futures.Future(loop=self._loop)
373
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 @tasks.coroutine
375 def readline(self):
376 if self._exception is not None:
377 raise self._exception
378
Yury Selivanove694c972014-02-05 18:11:13 -0500379 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 not_enough = True
381
382 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700383 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500384 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500386 line.extend(self._buffer)
387 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 else:
389 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500390 line.extend(self._buffer[:ichar])
391 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
Yury Selivanove694c972014-02-05 18:11:13 -0500394 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 self._maybe_resume_transport()
396 raise ValueError('Line is too long')
397
Guido van Rossum355491d2013-10-18 15:17:11 -0700398 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 break
400
401 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100402 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700404 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700406 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500409 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
411 @tasks.coroutine
412 def read(self, n=-1):
413 if self._exception is not None:
414 raise self._exception
415
416 if not n:
417 return b''
418
419 if n < 0:
Guido van Rossum355491d2013-10-18 15:17:11 -0700420 while not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100421 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700423 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700425 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500427 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100428 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700430 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700432 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
Yury Selivanove694c972014-02-05 18:11:13 -0500434 if n < 0 or len(self._buffer) <= n:
435 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700436 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500437 else:
438 # n > 0 and len(self._buffer) > n
439 data = bytes(self._buffer[:n])
440 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441
Yury Selivanove694c972014-02-05 18:11:13 -0500442 self._maybe_resume_transport()
443 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
445 @tasks.coroutine
446 def readexactly(self, n):
447 if self._exception is not None:
448 raise self._exception
449
Guido van Rossum38455212014-01-06 16:09:18 -0800450 # There used to be "optimized" code here. It created its own
451 # Future and waited until self._buffer had at least the n
452 # bytes, then called read(n). Unfortunately, this could pause
453 # the transport if the argument was larger than the pause
454 # limit (which is twice self._limit). So now we just read()
455 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
Guido van Rossum38455212014-01-06 16:09:18 -0800457 blocks = []
458 while n > 0:
459 block = yield from self.read(n)
460 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100461 partial = b''.join(blocks)
462 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800463 blocks.append(block)
464 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
Guido van Rossum38455212014-01-06 16:09:18 -0800466 return b''.join(blocks)