blob: f0c08c281d26acd38d8ed7643e9b1153e1dcc380 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseProactorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import socket
10
11from . import base_events
12from . import constants
13from . import futures
14from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
18class _ProactorBasePipeTransport(transports.BaseTransport):
19 """Base class for pipe and socket transports."""
20
21 def __init__(self, loop, sock, protocol, waiter=None,
22 extra=None, server=None):
23 super().__init__(extra)
24 self._set_extra(sock)
25 self._loop = loop
26 self._sock = sock
27 self._protocol = protocol
28 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080029 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030 self._read_fut = None
31 self._write_fut = None
32 self._conn_lost = 0
33 self._closing = False # Set when close() called.
34 self._eof_written = False
Guido van Rossumebb8e582013-12-04 12:12:07 -080035 self._protocol_paused = False
36 self.set_write_buffer_limits()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 if self._server is not None:
38 self._server.attach(self)
39 self._loop.call_soon(self._protocol.connection_made, self)
40 if waiter is not None:
41 self._loop.call_soon(waiter.set_result, None)
42
43 def _set_extra(self, sock):
44 self._extra['pipe'] = sock
45
46 def close(self):
47 if self._closing:
48 return
49 self._closing = True
50 self._conn_lost += 1
51 if not self._buffer and self._write_fut is None:
52 self._loop.call_soon(self._call_connection_lost, None)
53 if self._read_fut is not None:
54 self._read_fut.cancel()
55
56 def _fatal_error(self, exc):
Victor Stinner63b4d4b2014-01-29 13:12:03 -080057 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
58 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 self._force_close(exc)
60
61 def _force_close(self, exc):
62 if self._closing:
63 return
64 self._closing = True
65 self._conn_lost += 1
66 if self._write_fut:
67 self._write_fut.cancel()
68 if self._read_fut:
69 self._read_fut.cancel()
70 self._write_fut = self._read_fut = None
Guido van Rossumebb8e582013-12-04 12:12:07 -080071 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 self._loop.call_soon(self._call_connection_lost, exc)
73
74 def _call_connection_lost(self, exc):
75 try:
76 self._protocol.connection_lost(exc)
77 finally:
78 # XXX If there is a pending overlapped read on the other
79 # end then it may fail with ERROR_NETNAME_DELETED if we
80 # just close our end. First calling shutdown() seems to
81 # cure it, but maybe using DisconnectEx() would be better.
82 if hasattr(self._sock, 'shutdown'):
83 self._sock.shutdown(socket.SHUT_RDWR)
84 self._sock.close()
85 server = self._server
86 if server is not None:
87 server.detach(self)
88 self._server = None
89
Guido van Rossumebb8e582013-12-04 12:12:07 -080090 # XXX The next four methods are nearly identical to corresponding
91 # ones in _SelectorTransport. Maybe refactor buffer management to
92 # share the implementations? (Also these are really only needed
93 # by _ProactorWritePipeTransport but since _buffer is defined on
94 # the base class I am putting it here for now.)
95
96 def _maybe_pause_protocol(self):
97 size = self.get_write_buffer_size()
98 if size <= self._high_water:
99 return
100 if not self._protocol_paused:
101 self._protocol_paused = True
102 try:
103 self._protocol.pause_writing()
104 except Exception:
105 logger.exception('pause_writing() failed')
106
107 def _maybe_resume_protocol(self):
108 if (self._protocol_paused and
109 self.get_write_buffer_size() <= self._low_water):
110 self._protocol_paused = False
111 try:
112 self._protocol.resume_writing()
113 except Exception:
114 logger.exception('resume_writing() failed')
115
116 def set_write_buffer_limits(self, high=None, low=None):
117 if high is None:
118 if low is None:
119 high = 64*1024
120 else:
121 high = 4*low
122 if low is None:
123 low = high // 4
124 if not high >= low >= 0:
125 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
126 (high, low))
127 self._high_water = high
128 self._low_water = low
129
130 def get_write_buffer_size(self):
131 # NOTE: This doesn't take into account data already passed to
132 # send() even if send() hasn't finished yet.
133 if not self._buffer:
134 return 0
135 return len(self._buffer)
136
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137
138class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
139 transports.ReadTransport):
140 """Transport for read pipes."""
141
142 def __init__(self, loop, sock, protocol, waiter=None,
143 extra=None, server=None):
144 super().__init__(loop, sock, protocol, waiter, extra, server)
145 self._read_fut = None
146 self._paused = False
147 self._loop.call_soon(self._loop_reading)
148
Guido van Rossum57497ad2013-10-18 07:58:20 -0700149 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800150 if self._closing:
151 raise RuntimeError('Cannot pause_reading() when closing')
152 if self._paused:
153 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 self._paused = True
155
Guido van Rossum57497ad2013-10-18 07:58:20 -0700156 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800157 if not self._paused:
158 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 self._paused = False
160 if self._closing:
161 return
162 self._loop.call_soon(self._loop_reading, self._read_fut)
163
164 def _loop_reading(self, fut=None):
165 if self._paused:
166 return
167 data = None
168
169 try:
170 if fut is not None:
171 assert self._read_fut is fut or (self._read_fut is None and
172 self._closing)
173 self._read_fut = None
174 data = fut.result() # deliver data later in "finally" clause
175
176 if self._closing:
177 # since close() has been called we ignore any read data
178 data = None
179 return
180
181 if data == b'':
182 # we got end-of-file so no need to reschedule a new read
183 return
184
185 # reschedule a new read
186 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
187 except ConnectionAbortedError as exc:
188 if not self._closing:
189 self._fatal_error(exc)
190 except ConnectionResetError as exc:
191 self._force_close(exc)
192 except OSError as exc:
193 self._fatal_error(exc)
194 except futures.CancelledError:
195 if not self._closing:
196 raise
197 else:
198 self._read_fut.add_done_callback(self._loop_reading)
199 finally:
200 if data:
201 self._protocol.data_received(data)
202 elif data is not None:
203 keep_open = self._protocol.eof_received()
204 if not keep_open:
205 self.close()
206
207
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100208class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 transports.WriteTransport):
210 """Transport for write pipes."""
211
212 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800213 if not isinstance(data, (bytes, bytearray, memoryview)):
214 raise TypeError('data argument must be byte-ish (%r)',
215 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800217 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
219 if not data:
220 return
221
222 if self._conn_lost:
223 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700224 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 self._conn_lost += 1
226 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
Guido van Rossumebb8e582013-12-04 12:12:07 -0800228 # Observable states:
229 # 1. IDLE: _write_fut and _buffer both None
230 # 2. WRITING: _write_fut set; _buffer None
231 # 3. BACKED UP: _write_fut set; _buffer a bytearray
232 # We always copy the data, so the caller can't modify it
233 # while we're still waiting for the I/O to happen.
234 if self._write_fut is None: # IDLE -> WRITING
235 assert self._buffer is None
236 # Pass a copy, except if it's already immutable.
237 self._loop_writing(data=bytes(data))
238 # XXX Should we pause the protocol at this point
239 # if len(data) > self._high_water? (That would
240 # require keeping track of the number of bytes passed
241 # to a send() that hasn't finished yet.)
242 elif not self._buffer: # WRITING -> BACKED UP
243 # Make a mutable copy which we can extend.
244 self._buffer = bytearray(data)
245 self._maybe_pause_protocol()
246 else: # BACKED UP
247 # Append to buffer (also copies).
248 self._buffer.extend(data)
249 self._maybe_pause_protocol()
250
251 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 try:
253 assert f is self._write_fut
254 self._write_fut = None
255 if f:
256 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800257 if data is None:
258 data = self._buffer
259 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 if not data:
261 if self._closing:
262 self._loop.call_soon(self._call_connection_lost, None)
263 if self._eof_written:
264 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossumebb8e582013-12-04 12:12:07 -0800265 else:
266 self._write_fut = self._loop._proactor.send(self._sock, data)
267 self._write_fut.add_done_callback(self._loop_writing)
268 # Now that we've reduced the buffer size, tell the
269 # protocol to resume writing if it was paused. Note that
270 # we do this last since the callback is called immediately
271 # and it may add more data to the buffer (even causing the
272 # protocol to be paused again).
273 self._maybe_resume_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 except ConnectionResetError as exc:
275 self._force_close(exc)
276 except OSError as exc:
277 self._fatal_error(exc)
278
279 def can_write_eof(self):
280 return True
281
282 def write_eof(self):
283 self.close()
284
285 def abort(self):
286 self._force_close(None)
287
288
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100289class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
290 def __init__(self, *args, **kw):
291 super().__init__(*args, **kw)
292 self._read_fut = self._loop._proactor.recv(self._sock, 16)
293 self._read_fut.add_done_callback(self._pipe_closed)
294
295 def _pipe_closed(self, fut):
296 if fut.cancelled():
297 # the transport has been closed
298 return
299 assert fut is self._read_fut, (fut, self._read_fut)
300 self._read_fut = None
301 assert fut.result() == b''
302 if self._write_fut is not None:
303 self._force_close(exc)
304 else:
305 self.close()
306
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100309 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 transports.Transport):
311 """Transport for duplex pipes."""
312
313 def can_write_eof(self):
314 return False
315
316 def write_eof(self):
317 raise NotImplementedError
318
319
320class _ProactorSocketTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100321 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 transports.Transport):
323 """Transport for connected sockets."""
324
325 def _set_extra(self, sock):
326 self._extra['socket'] = sock
327 try:
328 self._extra['sockname'] = sock.getsockname()
329 except (socket.error, AttributeError):
330 pass
331 if 'peername' not in self._extra:
332 try:
333 self._extra['peername'] = sock.getpeername()
334 except (socket.error, AttributeError):
335 pass
336
337 def can_write_eof(self):
338 return True
339
340 def write_eof(self):
341 if self._closing or self._eof_written:
342 return
343 self._eof_written = True
344 if self._write_fut is None:
345 self._sock.shutdown(socket.SHUT_WR)
346
347
348class BaseProactorEventLoop(base_events.BaseEventLoop):
349
350 def __init__(self, proactor):
351 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700352 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 self._proactor = proactor
354 self._selector = proactor # convenient alias
Victor Stinner7de26462014-01-11 00:03:21 +0100355 self._self_reading_future = None
356 self._accept_futures = {} # socket file descriptor => Future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 proactor.set_loop(self)
358 self._make_self_pipe()
359
360 def _make_socket_transport(self, sock, protocol, waiter=None,
361 extra=None, server=None):
362 return _ProactorSocketTransport(self, sock, protocol, waiter,
363 extra, server)
364
365 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
366 extra=None):
367 return _ProactorDuplexPipeTransport(self,
368 sock, protocol, waiter, extra)
369
370 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
371 extra=None):
372 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
373
374 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100375 extra=None):
376 # We want connection_lost() to be called when other end closes
377 return _ProactorWritePipeTransport(self,
378 sock, protocol, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
380 def close(self):
381 if self._proactor is not None:
382 self._close_self_pipe()
383 self._proactor.close()
384 self._proactor = None
385 self._selector = None
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200386 super().close()
Victor Stinner7de26462014-01-11 00:03:21 +0100387 self._accept_futures.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
389 def sock_recv(self, sock, n):
390 return self._proactor.recv(sock, n)
391
392 def sock_sendall(self, sock, data):
393 return self._proactor.send(sock, data)
394
395 def sock_connect(self, sock, address):
396 return self._proactor.connect(sock, address)
397
398 def sock_accept(self, sock):
399 return self._proactor.accept(sock)
400
401 def _socketpair(self):
402 raise NotImplementedError
403
404 def _close_self_pipe(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100405 if self._self_reading_future is not None:
406 self._self_reading_future.cancel()
407 self._self_reading_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 self._ssock.close()
409 self._ssock = None
410 self._csock.close()
411 self._csock = None
412 self._internal_fds -= 1
413
414 def _make_self_pipe(self):
415 # A self-socket, really. :-)
416 self._ssock, self._csock = self._socketpair()
417 self._ssock.setblocking(False)
418 self._csock.setblocking(False)
419 self._internal_fds += 1
420 self.call_soon(self._loop_self_reading)
421
422 def _loop_self_reading(self, f=None):
423 try:
424 if f is not None:
425 f.result() # may raise
426 f = self._proactor.recv(self._ssock, 4096)
427 except:
428 self.close()
429 raise
430 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100431 self._self_reading_future = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 f.add_done_callback(self._loop_self_reading)
433
434 def _write_to_self(self):
435 self._csock.send(b'x')
436
437 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800438 if ssl:
439 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 def loop(f=None):
442 try:
443 if f is not None:
444 conn, addr = f.result()
445 protocol = protocol_factory()
446 self._make_socket_transport(
447 conn, protocol,
448 extra={'peername': addr}, server=server)
449 f = self._proactor.accept(sock)
450 except OSError:
451 if sock.fileno() != -1:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700452 logger.exception('Accept failed')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 sock.close()
454 except futures.CancelledError:
455 sock.close()
456 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100457 self._accept_futures[sock.fileno()] = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 f.add_done_callback(loop)
459
460 self.call_soon(loop)
461
462 def _process_events(self, event_list):
463 pass # XXX hard work currently done in poll
464
465 def _stop_serving(self, sock):
Victor Stinner7de26462014-01-11 00:03:21 +0100466 for future in self._accept_futures.values():
467 future.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 self._proactor._stop_serving(sock)
469 sock.close()