blob: 10d3591fbaceddf71916e08fae7c1a4bad53a0b9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
Guido van Rossum49c96fb2013-11-25 15:07:18 -08003__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
Victor Stinner8dffc452014-01-25 15:32:06 +01004 'open_connection', 'start_server', 'IncompleteReadError',
Guido van Rossum1540b162013-11-19 11:43:38 -08005 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
7import collections
8
9from . import events
10from . import futures
11from . import protocols
12from . import tasks
13
14
15_DEFAULT_LIMIT = 2**16
16
Victor Stinner8dffc452014-01-25 15:32:06 +010017class IncompleteReadError(EOFError):
18 """
19 Incomplete read error. Attributes:
20
21 - partial: read bytes string before the end of stream was reached
22 - expected: total number of expected bytes
23 """
24 def __init__(self, partial, expected):
25 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
26 % (len(partial), expected))
27 self.partial = partial
28 self.expected = expected
29
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
31@tasks.coroutine
32def open_connection(host=None, port=None, *,
33 loop=None, limit=_DEFAULT_LIMIT, **kwds):
34 """A wrapper for create_connection() returning a (reader, writer) pair.
35
36 The reader returned is a StreamReader instance; the writer is a
Victor Stinner183e3472014-01-23 17:40:03 +010037 StreamWriter instance.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39 The arguments are all the usual arguments to create_connection()
40 except protocol_factory; most common are positional host and port,
41 with various optional keyword arguments following.
42
43 Additional optional keyword arguments are loop (to set the event loop
44 instance to use) and limit (to set the buffer limit passed to the
45 StreamReader).
46
47 (If you want to customize the StreamReader and/or
48 StreamReaderProtocol classes, just copy the code -- there's
49 really nothing special here except some convenience.)
50 """
51 if loop is None:
52 loop = events.get_event_loop()
53 reader = StreamReader(limit=limit, loop=loop)
Guido van Rossumefef9d32014-01-10 13:26:38 -080054 protocol = StreamReaderProtocol(reader, loop=loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 transport, _ = yield from loop.create_connection(
56 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070057 writer = StreamWriter(transport, protocol, reader, loop)
58 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60
Guido van Rossum1540b162013-11-19 11:43:38 -080061@tasks.coroutine
62def start_server(client_connected_cb, host=None, port=None, *,
63 loop=None, limit=_DEFAULT_LIMIT, **kwds):
64 """Start a socket server, call back for each client connected.
65
66 The first parameter, `client_connected_cb`, takes two parameters:
67 client_reader, client_writer. client_reader is a StreamReader
68 object, while client_writer is a StreamWriter object. This
69 parameter can either be a plain callback function or a coroutine;
70 if it is a coroutine, it will be automatically converted into a
71 Task.
72
73 The rest of the arguments are all the usual arguments to
74 loop.create_server() except protocol_factory; most common are
75 positional host and port, with various optional keyword arguments
76 following. The return value is the same as loop.create_server().
77
78 Additional optional keyword arguments are loop (to set the event loop
79 instance to use) and limit (to set the buffer limit passed to the
80 StreamReader).
81
82 The return value is the same as loop.create_server(), i.e. a
83 Server object which can be used to stop the service.
84 """
85 if loop is None:
86 loop = events.get_event_loop()
87
88 def factory():
89 reader = StreamReader(limit=limit, loop=loop)
90 protocol = StreamReaderProtocol(reader, client_connected_cb,
91 loop=loop)
92 return protocol
93
94 return (yield from loop.create_server(factory, host, port, **kwds))
95
96
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097class StreamReaderProtocol(protocols.Protocol):
98 """Trivial helper class to adapt between Protocol and StreamReader.
99
100 (This is a helper class instead of making StreamReader itself a
101 Protocol subclass, because the StreamReader has other potential
102 uses, and to prevent the user of the StreamReader to accidentally
103 call inappropriate methods of the protocol.)
104 """
105
Guido van Rossum1540b162013-11-19 11:43:38 -0800106 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
Guido van Rossum355491d2013-10-18 15:17:11 -0700107 self._stream_reader = stream_reader
Guido van Rossum1540b162013-11-19 11:43:38 -0800108 self._stream_writer = None
Guido van Rossum355491d2013-10-18 15:17:11 -0700109 self._drain_waiter = None
110 self._paused = False
Guido van Rossum1540b162013-11-19 11:43:38 -0800111 self._client_connected_cb = client_connected_cb
112 self._loop = loop # May be None; we may never need it.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -0700115 self._stream_reader.set_transport(transport)
Guido van Rossum1540b162013-11-19 11:43:38 -0800116 if self._client_connected_cb is not None:
117 self._stream_writer = StreamWriter(transport, self,
118 self._stream_reader,
119 self._loop)
120 res = self._client_connected_cb(self._stream_reader,
121 self._stream_writer)
122 if tasks.iscoroutine(res):
123 tasks.Task(res, loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
125 def connection_lost(self, exc):
126 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700127 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700129 self._stream_reader.set_exception(exc)
130 # Also wake up the writing side.
131 if self._paused:
132 waiter = self._drain_waiter
133 if waiter is not None:
134 self._drain_waiter = None
135 if not waiter.done():
136 if exc is None:
137 waiter.set_result(None)
138 else:
139 waiter.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
141 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -0700142 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700145 self._stream_reader.feed_eof()
146
147 def pause_writing(self):
148 assert not self._paused
149 self._paused = True
150
151 def resume_writing(self):
152 assert self._paused
153 self._paused = False
154 waiter = self._drain_waiter
155 if waiter is not None:
156 self._drain_waiter = None
157 if not waiter.done():
158 waiter.set_result(None)
159
160
161class StreamWriter:
162 """Wraps a Transport.
163
164 This exposes write(), writelines(), [can_]write_eof(),
165 get_extra_info() and close(). It adds drain() which returns an
166 optional Future on which you can wait for flow control. It also
Guido van Rossumefef9d32014-01-10 13:26:38 -0800167 adds a transport property which references the Transport
Guido van Rossum355491d2013-10-18 15:17:11 -0700168 directly.
169 """
170
171 def __init__(self, transport, protocol, reader, loop):
172 self._transport = transport
173 self._protocol = protocol
174 self._reader = reader
175 self._loop = loop
176
177 @property
178 def transport(self):
179 return self._transport
180
181 def write(self, data):
182 self._transport.write(data)
183
184 def writelines(self, data):
185 self._transport.writelines(data)
186
187 def write_eof(self):
188 return self._transport.write_eof()
189
190 def can_write_eof(self):
191 return self._transport.can_write_eof()
192
193 def close(self):
194 return self._transport.close()
195
196 def get_extra_info(self, name, default=None):
197 return self._transport.get_extra_info(name, default)
198
199 def drain(self):
200 """This method has an unusual return value.
201
202 The intended use is to write
203
204 w.write(data)
205 yield from w.drain()
206
207 When there's nothing to wait for, drain() returns (), and the
208 yield-from continues immediately. When the transport buffer
209 is full (the protocol is paused), drain() creates and returns
210 a Future and the yield-from will block until that Future is
211 completed, which will happen when the buffer is (partially)
212 drained and the protocol is resumed.
213 """
214 if self._reader._exception is not None:
Guido van Rossum6188bd42014-01-07 17:03:26 -0800215 raise self._reader._exception
Guido van Rossum355491d2013-10-18 15:17:11 -0700216 if self._transport._conn_lost: # Uses private variable.
217 raise ConnectionResetError('Connection lost')
218 if not self._protocol._paused:
219 return ()
220 waiter = self._protocol._drain_waiter
221 assert waiter is None or waiter.cancelled()
222 waiter = futures.Future(loop=self._loop)
223 self._protocol._drain_waiter = waiter
224 return waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225
226
227class StreamReader:
228
229 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
230 # The line length limit is a security feature;
231 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700232 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 if loop is None:
234 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700235 self._loop = loop
Guido van Rossum38455212014-01-06 16:09:18 -0800236 # TODO: Use a bytearray for a buffer, like the transport.
Guido van Rossum355491d2013-10-18 15:17:11 -0700237 self._buffer = collections.deque() # Deque of bytes objects.
238 self._byte_count = 0 # Bytes in buffer.
239 self._eof = False # Whether we're done.
240 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 self._exception = None
242 self._transport = None
243 self._paused = False
244
245 def exception(self):
246 return self._exception
247
248 def set_exception(self, exc):
249 self._exception = exc
250
Guido van Rossum355491d2013-10-18 15:17:11 -0700251 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700253 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 if not waiter.cancelled():
255 waiter.set_exception(exc)
256
257 def set_transport(self, transport):
258 assert self._transport is None, 'Transport already set'
259 self._transport = transport
260
261 def _maybe_resume_transport(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700262 if self._paused and self._byte_count <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700264 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
266 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700267 self._eof = True
268 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700270 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 if not waiter.cancelled():
272 waiter.set_result(True)
273
274 def feed_data(self, data):
275 if not data:
276 return
277
Guido van Rossum355491d2013-10-18 15:17:11 -0700278 self._buffer.append(data)
279 self._byte_count += len(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280
Guido van Rossum355491d2013-10-18 15:17:11 -0700281 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700283 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 if not waiter.cancelled():
285 waiter.set_result(False)
286
287 if (self._transport is not None and
288 not self._paused and
Guido van Rossum355491d2013-10-18 15:17:11 -0700289 self._byte_count > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700291 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 except NotImplementedError:
293 # The transport can't be paused.
294 # We'll just have to buffer all data.
295 # Forget the transport so we don't keep trying.
296 self._transport = None
297 else:
298 self._paused = True
299
Victor Stinner183e3472014-01-23 17:40:03 +0100300 def _create_waiter(self, func_name):
301 # StreamReader uses a future to link the protocol feed_data() method
302 # to a read coroutine. Running two read coroutines at the same time
303 # would have an unexpected behaviour. It would not possible to know
304 # which coroutine would get the next data.
305 if self._waiter is not None:
306 raise RuntimeError('%s() called while another coroutine is '
307 'already waiting for incoming data' % func_name)
308 return futures.Future(loop=self._loop)
309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 @tasks.coroutine
311 def readline(self):
312 if self._exception is not None:
313 raise self._exception
314
315 parts = []
316 parts_size = 0
317 not_enough = True
318
319 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700320 while self._buffer and not_enough:
321 data = self._buffer.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 ichar = data.find(b'\n')
323 if ichar < 0:
324 parts.append(data)
325 parts_size += len(data)
326 else:
327 ichar += 1
328 head, tail = data[:ichar], data[ichar:]
329 if tail:
Guido van Rossum355491d2013-10-18 15:17:11 -0700330 self._buffer.appendleft(tail)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 not_enough = False
332 parts.append(head)
333 parts_size += len(head)
334
Guido van Rossum355491d2013-10-18 15:17:11 -0700335 if parts_size > self._limit:
336 self._byte_count -= parts_size
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 self._maybe_resume_transport()
338 raise ValueError('Line is too long')
339
Guido van Rossum355491d2013-10-18 15:17:11 -0700340 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 break
342
343 if not_enough:
Victor Stinner183e3472014-01-23 17:40:03 +0100344 self._waiter = self._create_waiter('readline')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700346 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700348 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349
350 line = b''.join(parts)
Guido van Rossum355491d2013-10-18 15:17:11 -0700351 self._byte_count -= parts_size
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 self._maybe_resume_transport()
353
354 return line
355
356 @tasks.coroutine
357 def read(self, n=-1):
358 if self._exception is not None:
359 raise self._exception
360
361 if not n:
362 return b''
363
364 if n < 0:
Guido van Rossum355491d2013-10-18 15:17:11 -0700365 while not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100366 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700368 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700370 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700372 if not self._byte_count and not self._eof:
Victor Stinner183e3472014-01-23 17:40:03 +0100373 self._waiter = self._create_waiter('read')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700375 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700377 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
Guido van Rossum355491d2013-10-18 15:17:11 -0700379 if n < 0 or self._byte_count <= n:
380 data = b''.join(self._buffer)
381 self._buffer.clear()
382 self._byte_count = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 self._maybe_resume_transport()
384 return data
385
386 parts = []
387 parts_bytes = 0
Guido van Rossum355491d2013-10-18 15:17:11 -0700388 while self._buffer and parts_bytes < n:
389 data = self._buffer.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 data_bytes = len(data)
391 if n < parts_bytes + data_bytes:
392 data_bytes = n - parts_bytes
393 data, rest = data[:data_bytes], data[data_bytes:]
Guido van Rossum355491d2013-10-18 15:17:11 -0700394 self._buffer.appendleft(rest)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396 parts.append(data)
397 parts_bytes += data_bytes
Guido van Rossum355491d2013-10-18 15:17:11 -0700398 self._byte_count -= data_bytes
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._maybe_resume_transport()
400
401 return b''.join(parts)
402
403 @tasks.coroutine
404 def readexactly(self, n):
405 if self._exception is not None:
406 raise self._exception
407
Guido van Rossum38455212014-01-06 16:09:18 -0800408 # There used to be "optimized" code here. It created its own
409 # Future and waited until self._buffer had at least the n
410 # bytes, then called read(n). Unfortunately, this could pause
411 # the transport if the argument was larger than the pause
412 # limit (which is twice self._limit). So now we just read()
413 # into a local buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
Guido van Rossum38455212014-01-06 16:09:18 -0800415 blocks = []
416 while n > 0:
417 block = yield from self.read(n)
418 if not block:
Victor Stinner8dffc452014-01-25 15:32:06 +0100419 partial = b''.join(blocks)
420 raise IncompleteReadError(partial, len(partial) + n)
Guido van Rossum38455212014-01-06 16:09:18 -0800421 blocks.append(block)
422 n -= len(block)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
Guido van Rossum38455212014-01-06 16:09:18 -0800424 return b''.join(blocks)