blob: e99536822510345f22cb631831b7c6a9a712cb7e [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Stream-related things."""
2
3__all__ = ['StreamReader', 'StreamReaderProtocol', 'open_connection']
4
5import collections
6
7from . import events
8from . import futures
9from . import protocols
10from . import tasks
11
12
13_DEFAULT_LIMIT = 2**16
14
15
16@tasks.coroutine
17def open_connection(host=None, port=None, *,
18 loop=None, limit=_DEFAULT_LIMIT, **kwds):
19 """A wrapper for create_connection() returning a (reader, writer) pair.
20
21 The reader returned is a StreamReader instance; the writer is a
22 Transport.
23
24 The arguments are all the usual arguments to create_connection()
25 except protocol_factory; most common are positional host and port,
26 with various optional keyword arguments following.
27
28 Additional optional keyword arguments are loop (to set the event loop
29 instance to use) and limit (to set the buffer limit passed to the
30 StreamReader).
31
32 (If you want to customize the StreamReader and/or
33 StreamReaderProtocol classes, just copy the code -- there's
34 really nothing special here except some convenience.)
35 """
36 if loop is None:
37 loop = events.get_event_loop()
38 reader = StreamReader(limit=limit, loop=loop)
39 protocol = StreamReaderProtocol(reader)
40 transport, _ = yield from loop.create_connection(
41 lambda: protocol, host, port, **kwds)
Guido van Rossum355491d2013-10-18 15:17:11 -070042 writer = StreamWriter(transport, protocol, reader, loop)
43 return reader, writer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
45
46class StreamReaderProtocol(protocols.Protocol):
47 """Trivial helper class to adapt between Protocol and StreamReader.
48
49 (This is a helper class instead of making StreamReader itself a
50 Protocol subclass, because the StreamReader has other potential
51 uses, and to prevent the user of the StreamReader to accidentally
52 call inappropriate methods of the protocol.)
53 """
54
55 def __init__(self, stream_reader):
Guido van Rossum355491d2013-10-18 15:17:11 -070056 self._stream_reader = stream_reader
57 self._drain_waiter = None
58 self._paused = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
60 def connection_made(self, transport):
Guido van Rossum355491d2013-10-18 15:17:11 -070061 self._stream_reader.set_transport(transport)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062
63 def connection_lost(self, exc):
64 if exc is None:
Guido van Rossum355491d2013-10-18 15:17:11 -070065 self._stream_reader.feed_eof()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066 else:
Guido van Rossum355491d2013-10-18 15:17:11 -070067 self._stream_reader.set_exception(exc)
68 # Also wake up the writing side.
69 if self._paused:
70 waiter = self._drain_waiter
71 if waiter is not None:
72 self._drain_waiter = None
73 if not waiter.done():
74 if exc is None:
75 waiter.set_result(None)
76 else:
77 waiter.set_exception(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078
79 def data_received(self, data):
Guido van Rossum355491d2013-10-18 15:17:11 -070080 self._stream_reader.feed_data(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82 def eof_received(self):
Guido van Rossum355491d2013-10-18 15:17:11 -070083 self._stream_reader.feed_eof()
84
85 def pause_writing(self):
86 assert not self._paused
87 self._paused = True
88
89 def resume_writing(self):
90 assert self._paused
91 self._paused = False
92 waiter = self._drain_waiter
93 if waiter is not None:
94 self._drain_waiter = None
95 if not waiter.done():
96 waiter.set_result(None)
97
98
99class StreamWriter:
100 """Wraps a Transport.
101
102 This exposes write(), writelines(), [can_]write_eof(),
103 get_extra_info() and close(). It adds drain() which returns an
104 optional Future on which you can wait for flow control. It also
105 adds a transport attribute which references the Transport
106 directly.
107 """
108
109 def __init__(self, transport, protocol, reader, loop):
110 self._transport = transport
111 self._protocol = protocol
112 self._reader = reader
113 self._loop = loop
114
115 @property
116 def transport(self):
117 return self._transport
118
119 def write(self, data):
120 self._transport.write(data)
121
122 def writelines(self, data):
123 self._transport.writelines(data)
124
125 def write_eof(self):
126 return self._transport.write_eof()
127
128 def can_write_eof(self):
129 return self._transport.can_write_eof()
130
131 def close(self):
132 return self._transport.close()
133
134 def get_extra_info(self, name, default=None):
135 return self._transport.get_extra_info(name, default)
136
137 def drain(self):
138 """This method has an unusual return value.
139
140 The intended use is to write
141
142 w.write(data)
143 yield from w.drain()
144
145 When there's nothing to wait for, drain() returns (), and the
146 yield-from continues immediately. When the transport buffer
147 is full (the protocol is paused), drain() creates and returns
148 a Future and the yield-from will block until that Future is
149 completed, which will happen when the buffer is (partially)
150 drained and the protocol is resumed.
151 """
152 if self._reader._exception is not None:
153 raise self._writer._exception
154 if self._transport._conn_lost: # Uses private variable.
155 raise ConnectionResetError('Connection lost')
156 if not self._protocol._paused:
157 return ()
158 waiter = self._protocol._drain_waiter
159 assert waiter is None or waiter.cancelled()
160 waiter = futures.Future(loop=self._loop)
161 self._protocol._drain_waiter = waiter
162 return waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164
165class StreamReader:
166
167 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
168 # The line length limit is a security feature;
169 # it also doubles as half the buffer limit.
Guido van Rossum355491d2013-10-18 15:17:11 -0700170 self._limit = limit
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 if loop is None:
172 loop = events.get_event_loop()
Guido van Rossum355491d2013-10-18 15:17:11 -0700173 self._loop = loop
174 self._buffer = collections.deque() # Deque of bytes objects.
175 self._byte_count = 0 # Bytes in buffer.
176 self._eof = False # Whether we're done.
177 self._waiter = None # A future.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 self._exception = None
179 self._transport = None
180 self._paused = False
181
182 def exception(self):
183 return self._exception
184
185 def set_exception(self, exc):
186 self._exception = exc
187
Guido van Rossum355491d2013-10-18 15:17:11 -0700188 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700190 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191 if not waiter.cancelled():
192 waiter.set_exception(exc)
193
194 def set_transport(self, transport):
195 assert self._transport is None, 'Transport already set'
196 self._transport = transport
197
198 def _maybe_resume_transport(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700199 if self._paused and self._byte_count <= self._limit:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 self._paused = False
Guido van Rossum57497ad2013-10-18 07:58:20 -0700201 self._transport.resume_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202
203 def feed_eof(self):
Guido van Rossum355491d2013-10-18 15:17:11 -0700204 self._eof = True
205 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700207 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 if not waiter.cancelled():
209 waiter.set_result(True)
210
211 def feed_data(self, data):
212 if not data:
213 return
214
Guido van Rossum355491d2013-10-18 15:17:11 -0700215 self._buffer.append(data)
216 self._byte_count += len(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
Guido van Rossum355491d2013-10-18 15:17:11 -0700218 waiter = self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 if waiter is not None:
Guido van Rossum355491d2013-10-18 15:17:11 -0700220 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 if not waiter.cancelled():
222 waiter.set_result(False)
223
224 if (self._transport is not None and
225 not self._paused and
Guido van Rossum355491d2013-10-18 15:17:11 -0700226 self._byte_count > 2*self._limit):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 try:
Guido van Rossum57497ad2013-10-18 07:58:20 -0700228 self._transport.pause_reading()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 except NotImplementedError:
230 # The transport can't be paused.
231 # We'll just have to buffer all data.
232 # Forget the transport so we don't keep trying.
233 self._transport = None
234 else:
235 self._paused = True
236
237 @tasks.coroutine
238 def readline(self):
239 if self._exception is not None:
240 raise self._exception
241
242 parts = []
243 parts_size = 0
244 not_enough = True
245
246 while not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700247 while self._buffer and not_enough:
248 data = self._buffer.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 ichar = data.find(b'\n')
250 if ichar < 0:
251 parts.append(data)
252 parts_size += len(data)
253 else:
254 ichar += 1
255 head, tail = data[:ichar], data[ichar:]
256 if tail:
Guido van Rossum355491d2013-10-18 15:17:11 -0700257 self._buffer.appendleft(tail)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 not_enough = False
259 parts.append(head)
260 parts_size += len(head)
261
Guido van Rossum355491d2013-10-18 15:17:11 -0700262 if parts_size > self._limit:
263 self._byte_count -= parts_size
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 self._maybe_resume_transport()
265 raise ValueError('Line is too long')
266
Guido van Rossum355491d2013-10-18 15:17:11 -0700267 if self._eof:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700268 break
269
270 if not_enough:
Guido van Rossum355491d2013-10-18 15:17:11 -0700271 assert self._waiter is None
272 self._waiter = futures.Future(loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700274 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700276 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277
278 line = b''.join(parts)
Guido van Rossum355491d2013-10-18 15:17:11 -0700279 self._byte_count -= parts_size
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 self._maybe_resume_transport()
281
282 return line
283
284 @tasks.coroutine
285 def read(self, n=-1):
286 if self._exception is not None:
287 raise self._exception
288
289 if not n:
290 return b''
291
292 if n < 0:
Guido van Rossum355491d2013-10-18 15:17:11 -0700293 while not self._eof:
294 assert not self._waiter
295 self._waiter = futures.Future(loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700297 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700299 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 else:
Guido van Rossum355491d2013-10-18 15:17:11 -0700301 if not self._byte_count and not self._eof:
302 assert not self._waiter
303 self._waiter = futures.Future(loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700305 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700307 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
Guido van Rossum355491d2013-10-18 15:17:11 -0700309 if n < 0 or self._byte_count <= n:
310 data = b''.join(self._buffer)
311 self._buffer.clear()
312 self._byte_count = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 self._maybe_resume_transport()
314 return data
315
316 parts = []
317 parts_bytes = 0
Guido van Rossum355491d2013-10-18 15:17:11 -0700318 while self._buffer and parts_bytes < n:
319 data = self._buffer.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 data_bytes = len(data)
321 if n < parts_bytes + data_bytes:
322 data_bytes = n - parts_bytes
323 data, rest = data[:data_bytes], data[data_bytes:]
Guido van Rossum355491d2013-10-18 15:17:11 -0700324 self._buffer.appendleft(rest)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 parts.append(data)
327 parts_bytes += data_bytes
Guido van Rossum355491d2013-10-18 15:17:11 -0700328 self._byte_count -= data_bytes
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._maybe_resume_transport()
330
331 return b''.join(parts)
332
333 @tasks.coroutine
334 def readexactly(self, n):
335 if self._exception is not None:
336 raise self._exception
337
338 if n <= 0:
339 return b''
340
Guido van Rossum355491d2013-10-18 15:17:11 -0700341 while self._byte_count < n and not self._eof:
342 assert not self._waiter
343 self._waiter = futures.Future(loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 try:
Guido van Rossum355491d2013-10-18 15:17:11 -0700345 yield from self._waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 finally:
Guido van Rossum355491d2013-10-18 15:17:11 -0700347 self._waiter = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
349 return (yield from self.read(n))