blob: 8fc21474e90c8bd0fc129b9f11e0fbbe64d16bac [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Victor Stinner8dffc452014-01-25 15:32:06 +01004 'open_connection', 'start_server', 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08005 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007from . import events
8from . import futures
9from . import protocols
10from . import tasks
11
12
13_DEFAULT_LIMIT = 2**16
14
Guido van Rossuma849be92014-01-30 16:05:28 -080015
Victor Stinner8dffc452014-01-25 15:32:06 +010016class IncompleteReadError(EOFError):
17 """
18 Incomplete read error. Attributes:
19
20 - partial: read bytes string before the end of stream was reached
21 - expected: total number of expected bytes
22 """
23 def __init__(self, partial, expected):
24 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
25 % (len(partial), expected))
26 self.partial = partial
27 self.expected = expected
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
30@tasks.coroutine
31def open_connection(host=None, port=None, *,
32 loop=None, limit=_DEFAULT_LIMIT, **kwds):
33 """A wrapper for create_connection() returning a (reader, writer) pair.
34
35 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010036 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
38 The arguments are all the usual arguments to create_connection()
39 except protocol_factory; most common are positional host and port,
40 with various optional keyword arguments following.
41
42 Additional optional keyword arguments are loop (to set the event loop
43 instance to use) and limit (to set the buffer limit passed to the
44 StreamReader).
45
46 (If you want to customize the StreamReader and/or
47 StreamReaderProtocol classes, just copy the code -- there's
48 really nothing special here except some convenience.)
49 """
50 if loop is None:
51 loop = events.get_event_loop()
52 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080053 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 transport, _ = yield from loop.create_connection(
55 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070056 writer = StreamWriter(transport, protocol, reader, loop)
57 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058
59
Guido van Rossum1540b162013-11-19 11:43:38 -080060@tasks.coroutine
61def start_server(client_connected_cb, host=None, port=None, *,
62 loop=None, limit=_DEFAULT_LIMIT, **kwds):
63 """Start a socket server, call back for each client connected.
64
65 The first parameter, `client_connected_cb`, takes two parameters:
66 client_reader, client_writer. client_reader is a StreamReader
67 object, while client_writer is a StreamWriter object. This
68 parameter can either be a plain callback function or a coroutine;
69 if it is a coroutine, it will be automatically converted into a
70 Task.
71
72 The rest of the arguments are all the usual arguments to
73 loop.create_server() except protocol_factory; most common are
74 positional host and port, with various optional keyword arguments
75 following. The return value is the same as loop.create_server().
76
77 Additional optional keyword arguments are loop (to set the event loop
78 instance to use) and limit (to set the buffer limit passed to the
79 StreamReader).
80
81 The return value is the same as loop.create_server(), i.e. a
82 Server object which can be used to stop the service.
83 """
84 if loop is None:
85 loop = events.get_event_loop()
86
87 def factory():
88 reader = StreamReader(limit=limit, loop=loop)
89 protocol = StreamReaderProtocol(reader, client_connected_cb,
90 loop=loop)
91 return protocol
92
93 return (yield from loop.create_server(factory, host, port, **kwds))
94
95
Guido van Rossum4d62d0b2014-01-29 14:24:45 -080096class FlowControlMixin(protocols.Protocol):
97 """Reusable flow control logic for StreamWriter.drain().
98
99 This implements the protocol methods pause_writing(),
100 resume_reading() and connection_lost(). If the subclass overrides
101 these it must call the super methods.
102
103 StreamWriter.drain() must check for error conditions and then call
104 _make_drain_waiter(), which will return either () or a Future
105 depending on the paused state.
106 """
107
108 def __init__(self, loop=None):
109 self._loop = loop # May be None; we may never need it.
110 self._paused = False
111 self._drain_waiter = None
112
113 def pause_writing(self):
114 assert not self._paused
115 self._paused = True
116
117 def resume_writing(self):
118 assert self._paused
119 self._paused = False
120 waiter = self._drain_waiter
121 if waiter is not None:
122 self._drain_waiter = None
123 if not waiter.done():
124 waiter.set_result(None)
125
126 def connection_lost(self, exc):
127 # Wake up the writer if currently paused.
128 if not self._paused:
129 return
130 waiter = self._drain_waiter
131 if waiter is None:
132 return
133 self._drain_waiter = None
134 if waiter.done():
135 return
136 if exc is None:
137 waiter.set_result(None)
138 else:
139 waiter.set_exception(exc)
140
141 def _make_drain_waiter(self):
142 if not self._paused:
143 return ()
144 waiter = self._drain_waiter
145 assert waiter is None or waiter.cancelled()
146 waiter = futures.Future(loop=self._loop)
147 self._drain_waiter = waiter
148 return waiter
149
150
151class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
152 """Helper class to adapt between Protocol and StreamReader.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153
154 (This is a helper class instead of making StreamReader itself a
155 Protocol subclass, because the StreamReader has other potential
156 uses, and to prevent the user of the StreamReader to accidentally
157 call inappropriate methods of the protocol.)
158 """
159
Guido van Rossum1540b162013-11-19 11:43:38 -0800160 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800161 super().__init__(loop=loop)
Guido van Rossum355491d2013-10-18 15:17:11 -0700162 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800163 self._stream_writer = None
Guido van Rossum1540b162013-11-19 11:43:38 -0800164 self._client_connected_cb = client_connected_cb
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165
166 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700167 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800168 if self._client_connected_cb is not None:
169 self._stream_writer = StreamWriter(transport, self,
170 self._stream_reader,
171 self._loop)
172 res = self._client_connected_cb(self._stream_reader,
173 self._stream_writer)
174 if tasks.iscoroutine(res):
175 tasks.Task(res, loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176
177 def connection_lost(self, exc):
178 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700179 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700181 self._stream_reader.set_exception(exc)
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800182 super().connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
184 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700185 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
187 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700188 self._stream_reader.feed_eof()
189
Guido van Rossum355491d2013-10-18 15:17:11 -0700190
191class StreamWriter:
192 """Wraps a Transport.
193
194 This exposes write(), writelines(), [can_]write_eof(),
195 get_extra_info() and close(). It adds drain() which returns an
196 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800197 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700198 directly.
199 """
200
201 def __init__(self, transport, protocol, reader, loop):
202 self._transport = transport
203 self._protocol = protocol
204 self._reader = reader
205 self._loop = loop
206
207 @property
208 def transport(self):
209 return self._transport
210
211 def write(self, data):
212 self._transport.write(data)
213
214 def writelines(self, data):
215 self._transport.writelines(data)
216
217 def write_eof(self):
218 return self._transport.write_eof()
219
220 def can_write_eof(self):
221 return self._transport.can_write_eof()
222
223 def close(self):
224 return self._transport.close()
225
226 def get_extra_info(self, name, default=None):
227 return self._transport.get_extra_info(name, default)
228
229 def drain(self):
230 """This method has an unusual return value.
231
232 The intended use is to write
233
234 w.write(data)
235 yield from w.drain()
236
237 When there's nothing to wait for, drain() returns (), and the
238 yield-from continues immediately. When the transport buffer
239 is full (the protocol is paused), drain() creates and returns
240 a Future and the yield-from will block until that Future is
241 completed, which will happen when the buffer is (partially)
242 drained and the protocol is resumed.
243 """
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800244 if self._reader is not None and self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800245 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700246 if self._transport._conn_lost: # Uses private variable.
247 raise ConnectionResetError('Connection lost')
Guido van Rossum4d62d0b2014-01-29 14:24:45 -0800248 return self._protocol._make_drain_waiter()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249
250
251class StreamReader:
252
253 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
254 # The line length limit is a security feature;
255 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700256 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 if loop is None:
258 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700259 self._loop = loop
Yury Selivanove694c972014-02-05 18:11:13 -0500260 self._buffer = bytearray()
Guido van Rossum355491d2013-10-18 15:17:11 -0700261 self._eof = False # Whether we're done.
262 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 self._exception = None
264 self._transport = None
265 self._paused = False
266
267 def exception(self):
268 return self._exception
269
270 def set_exception(self, exc):
271 self._exception = exc
272
Guido van Rossum355491d2013-10-18 15:17:11 -0700273 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700275 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 if not waiter.cancelled():
277 waiter.set_exception(exc)
278
279 def set_transport(self, transport):
280 assert self._transport is None, 'Transport already set'
281 self._transport = transport
282
283 def _maybe_resume_transport(self):
Yury Selivanove694c972014-02-05 18:11:13 -0500284 if self._paused and len(self._buffer) <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700286 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287
288 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 self._eof = True
290 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700292 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 if not waiter.cancelled():
294 waiter.set_result(True)
295
Yury Selivanovf0020f52014-02-06 00:14:30 -0500296 def at_eof(self):
297 """Return True if the buffer is empty and 'feed_eof' was called."""
298 return self._eof and not self._buffer
299
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 def feed_data(self, data):
Yury Selivanove694c972014-02-05 18:11:13 -0500301 assert not self._eof, 'feed_data after feed_eof'
302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 if not data:
304 return
305
Yury Selivanove694c972014-02-05 18:11:13 -0500306 self._buffer.extend(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307
Guido van Rossum355491d2013-10-18 15:17:11 -0700308 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700310 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if not waiter.cancelled():
312 waiter.set_result(False)
313
314 if (self._transport is not None and
315 not self._paused and
Yury Selivanove694c972014-02-05 18:11:13 -0500316 len(self._buffer) > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700318 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 except NotImplementedError:
320 # The transport can't be paused.
321 # We'll just have to buffer all data.
322 # Forget the transport so we don't keep trying.
323 self._transport = None
324 else:
325 self._paused = True
326
Victor Stinner183e3472014-01-23 17:40:03 +0100327 def _create_waiter(self, func_name):
328 # StreamReader uses a future to link the protocol feed_data() method
329 # to a read coroutine. Running two read coroutines at the same time
330 # would have an unexpected behaviour. It would not possible to know
331 # which coroutine would get the next data.
332 if self._waiter is not None:
333 raise RuntimeError('%s() called while another coroutine is '
334 'already waiting for incoming data' % func_name)
335 return futures.Future(loop=self._loop)
336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 @tasks.coroutine
338 def readline(self):
339 if self._exception is not None:
340 raise self._exception
341
Yury Selivanove694c972014-02-05 18:11:13 -0500342 line = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 not_enough = True
344
345 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700346 while self._buffer and not_enough:
Yury Selivanove694c972014-02-05 18:11:13 -0500347 ichar = self._buffer.find(b'\n')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 if ichar < 0:
Yury Selivanove694c972014-02-05 18:11:13 -0500349 line.extend(self._buffer)
350 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 else:
352 ichar += 1
Yury Selivanove694c972014-02-05 18:11:13 -0500353 line.extend(self._buffer[:ichar])
354 del self._buffer[:ichar]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 not_enough = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Yury Selivanove694c972014-02-05 18:11:13 -0500357 if len(line) > self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 self._maybe_resume_transport()
359 raise ValueError('Line is too long')
360
Guido van Rossum355491d2013-10-18 15:17:11 -0700361 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 break
363
364 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100365 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700367 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700369 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 self._maybe_resume_transport()
Yury Selivanove694c972014-02-05 18:11:13 -0500372 return bytes(line)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374 @tasks.coroutine
375 def read(self, n=-1):
376 if self._exception is not None:
377 raise self._exception
378
379 if not n:
380 return b''
381
382 if n < 0:
Guido van Rossum355491d2013-10-18 15:17:11 -0700383 while not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100384 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700386 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700388 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 else:
Yury Selivanove694c972014-02-05 18:11:13 -0500390 if not self._buffer and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100391 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700393 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700395 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Yury Selivanove694c972014-02-05 18:11:13 -0500397 if n < 0 or len(self._buffer) <= n:
398 data = bytes(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700399 self._buffer.clear()
Yury Selivanove694c972014-02-05 18:11:13 -0500400 else:
401 # n > 0 and len(self._buffer) > n
402 data = bytes(self._buffer[:n])
403 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404
Yury Selivanove694c972014-02-05 18:11:13 -0500405 self._maybe_resume_transport()
406 return data
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407
408 @tasks.coroutine
409 def readexactly(self, n):
410 if self._exception is not None:
411 raise self._exception
412
Guido van Rossum38455212014-01-06 16:09:18 -0800413 # There used to be "optimized" code here. It created its own
414 # Future and waited until self._buffer had at least the n
415 # bytes, then called read(n). Unfortunately, this could pause
416 # the transport if the argument was larger than the pause
417 # limit (which is twice self._limit). So now we just read()
418 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419
Guido van Rossum38455212014-01-06 16:09:18 -0800420 blocks = []
421 while n > 0:
422 block = yield from self.read(n)
423 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100424 partial = b''.join(blocks)
425 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800426 blocks.append(block)
427 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
Guido van Rossum38455212014-01-06 16:09:18 -0800429 return b''.join(blocks)