blob: 5de4d3d691a1d880b35524335f6670d6eb7d6f96 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseProactorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import socket
10
11from . import base_events
12from . import constants
13from . import futures
14from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
18class _ProactorBasePipeTransport(transports.BaseTransport):
19 """Base class for pipe and socket transports."""
20
21 def __init__(self, loop, sock, protocol, waiter=None,
22 extra=None, server=None):
23 super().__init__(extra)
24 self._set_extra(sock)
25 self._loop = loop
26 self._sock = sock
27 self._protocol = protocol
28 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080029 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030 self._read_fut = None
31 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010032 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033 self._conn_lost = 0
34 self._closing = False # Set when close() called.
35 self._eof_written = False
Guido van Rossumebb8e582013-12-04 12:12:07 -080036 self._protocol_paused = False
37 self.set_write_buffer_limits()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 if self._server is not None:
39 self._server.attach(self)
40 self._loop.call_soon(self._protocol.connection_made, self)
41 if waiter is not None:
42 self._loop.call_soon(waiter.set_result, None)
43
44 def _set_extra(self, sock):
45 self._extra['pipe'] = sock
46
47 def close(self):
48 if self._closing:
49 return
50 self._closing = True
51 self._conn_lost += 1
52 if not self._buffer and self._write_fut is None:
53 self._loop.call_soon(self._call_connection_lost, None)
54 if self._read_fut is not None:
55 self._read_fut.cancel()
56
57 def _fatal_error(self, exc):
Victor Stinner63b4d4b2014-01-29 13:12:03 -080058 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
59 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 self._force_close(exc)
61
62 def _force_close(self, exc):
63 if self._closing:
64 return
65 self._closing = True
66 self._conn_lost += 1
67 if self._write_fut:
68 self._write_fut.cancel()
69 if self._read_fut:
70 self._read_fut.cancel()
71 self._write_fut = self._read_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010072 self._pending_write = 0
Guido van Rossumebb8e582013-12-04 12:12:07 -080073 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._loop.call_soon(self._call_connection_lost, exc)
75
76 def _call_connection_lost(self, exc):
77 try:
78 self._protocol.connection_lost(exc)
79 finally:
80 # XXX If there is a pending overlapped read on the other
81 # end then it may fail with ERROR_NETNAME_DELETED if we
82 # just close our end. First calling shutdown() seems to
83 # cure it, but maybe using DisconnectEx() would be better.
84 if hasattr(self._sock, 'shutdown'):
85 self._sock.shutdown(socket.SHUT_RDWR)
86 self._sock.close()
87 server = self._server
88 if server is not None:
89 server.detach(self)
90 self._server = None
91
Guido van Rossumebb8e582013-12-04 12:12:07 -080092 # XXX The next four methods are nearly identical to corresponding
93 # ones in _SelectorTransport. Maybe refactor buffer management to
94 # share the implementations? (Also these are really only needed
95 # by _ProactorWritePipeTransport but since _buffer is defined on
96 # the base class I am putting it here for now.)
97
98 def _maybe_pause_protocol(self):
99 size = self.get_write_buffer_size()
100 if size <= self._high_water:
101 return
102 if not self._protocol_paused:
103 self._protocol_paused = True
104 try:
105 self._protocol.pause_writing()
106 except Exception:
107 logger.exception('pause_writing() failed')
108
109 def _maybe_resume_protocol(self):
110 if (self._protocol_paused and
111 self.get_write_buffer_size() <= self._low_water):
112 self._protocol_paused = False
113 try:
114 self._protocol.resume_writing()
115 except Exception:
116 logger.exception('resume_writing() failed')
117
118 def set_write_buffer_limits(self, high=None, low=None):
119 if high is None:
120 if low is None:
121 high = 64*1024
122 else:
123 high = 4*low
124 if low is None:
125 low = high // 4
126 if not high >= low >= 0:
127 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
128 (high, low))
129 self._high_water = high
130 self._low_water = low
131
132 def get_write_buffer_size(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100133 size = self._pending_write
134 if self._buffer is not None:
135 size += len(self._buffer)
136 return size
Guido van Rossumebb8e582013-12-04 12:12:07 -0800137
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138
139class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
140 transports.ReadTransport):
141 """Transport for read pipes."""
142
143 def __init__(self, loop, sock, protocol, waiter=None,
144 extra=None, server=None):
145 super().__init__(loop, sock, protocol, waiter, extra, server)
146 self._read_fut = None
147 self._paused = False
148 self._loop.call_soon(self._loop_reading)
149
Guido van Rossum57497ad2013-10-18 07:58:20 -0700150 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800151 if self._closing:
152 raise RuntimeError('Cannot pause_reading() when closing')
153 if self._paused:
154 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 self._paused = True
156
Guido van Rossum57497ad2013-10-18 07:58:20 -0700157 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800158 if not self._paused:
159 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 self._paused = False
161 if self._closing:
162 return
163 self._loop.call_soon(self._loop_reading, self._read_fut)
164
165 def _loop_reading(self, fut=None):
166 if self._paused:
167 return
168 data = None
169
170 try:
171 if fut is not None:
172 assert self._read_fut is fut or (self._read_fut is None and
173 self._closing)
174 self._read_fut = None
175 data = fut.result() # deliver data later in "finally" clause
176
177 if self._closing:
178 # since close() has been called we ignore any read data
179 data = None
180 return
181
182 if data == b'':
183 # we got end-of-file so no need to reschedule a new read
184 return
185
186 # reschedule a new read
187 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
188 except ConnectionAbortedError as exc:
189 if not self._closing:
190 self._fatal_error(exc)
191 except ConnectionResetError as exc:
192 self._force_close(exc)
193 except OSError as exc:
194 self._fatal_error(exc)
195 except futures.CancelledError:
196 if not self._closing:
197 raise
198 else:
199 self._read_fut.add_done_callback(self._loop_reading)
200 finally:
201 if data:
202 self._protocol.data_received(data)
203 elif data is not None:
204 keep_open = self._protocol.eof_received()
205 if not keep_open:
206 self.close()
207
208
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100209class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Victor Stinner915bcb02014-02-01 22:49:59 +0100210 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 """Transport for write pipes."""
212
213 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800214 if not isinstance(data, (bytes, bytearray, memoryview)):
215 raise TypeError('data argument must be byte-ish (%r)',
216 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800218 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219
220 if not data:
221 return
222
223 if self._conn_lost:
224 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700225 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 self._conn_lost += 1
227 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
Guido van Rossumebb8e582013-12-04 12:12:07 -0800229 # Observable states:
230 # 1. IDLE: _write_fut and _buffer both None
231 # 2. WRITING: _write_fut set; _buffer None
232 # 3. BACKED UP: _write_fut set; _buffer a bytearray
233 # We always copy the data, so the caller can't modify it
234 # while we're still waiting for the I/O to happen.
235 if self._write_fut is None: # IDLE -> WRITING
236 assert self._buffer is None
237 # Pass a copy, except if it's already immutable.
238 self._loop_writing(data=bytes(data))
239 # XXX Should we pause the protocol at this point
240 # if len(data) > self._high_water? (That would
241 # require keeping track of the number of bytes passed
242 # to a send() that hasn't finished yet.)
243 elif not self._buffer: # WRITING -> BACKED UP
244 # Make a mutable copy which we can extend.
245 self._buffer = bytearray(data)
246 self._maybe_pause_protocol()
247 else: # BACKED UP
248 # Append to buffer (also copies).
249 self._buffer.extend(data)
250 self._maybe_pause_protocol()
251
252 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 try:
254 assert f is self._write_fut
255 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +0100256 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 if f:
258 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800259 if data is None:
260 data = self._buffer
261 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 if not data:
263 if self._closing:
264 self._loop.call_soon(self._call_connection_lost, None)
265 if self._eof_written:
266 self._sock.shutdown(socket.SHUT_WR)
Victor Stinner915bcb02014-02-01 22:49:59 +0100267 # Now that we've reduced the buffer size, tell the
268 # protocol to resume writing if it was paused. Note that
269 # we do this last since the callback is called immediately
270 # and it may add more data to the buffer (even causing the
271 # protocol to be paused again).
272 self._maybe_resume_protocol()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800273 else:
274 self._write_fut = self._loop._proactor.send(self._sock, data)
Victor Stinner915bcb02014-02-01 22:49:59 +0100275 if not self._write_fut.done():
276 assert self._pending_write == 0
277 self._pending_write = len(data)
278 self._write_fut.add_done_callback(self._loop_writing)
279 self._maybe_pause_protocol()
280 else:
281 self._write_fut.add_done_callback(self._loop_writing)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 except ConnectionResetError as exc:
283 self._force_close(exc)
284 except OSError as exc:
285 self._fatal_error(exc)
286
287 def can_write_eof(self):
288 return True
289
290 def write_eof(self):
291 self.close()
292
293 def abort(self):
294 self._force_close(None)
295
296
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100297class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
298 def __init__(self, *args, **kw):
299 super().__init__(*args, **kw)
300 self._read_fut = self._loop._proactor.recv(self._sock, 16)
301 self._read_fut.add_done_callback(self._pipe_closed)
302
303 def _pipe_closed(self, fut):
304 if fut.cancelled():
305 # the transport has been closed
306 return
Victor Stinner83bdfa02014-02-04 08:57:48 +0100307 assert fut.result() == b''
308 if self._closing:
309 assert self._read_fut is None
310 return
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100311 assert fut is self._read_fut, (fut, self._read_fut)
312 self._read_fut = None
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100313 if self._write_fut is not None:
314 self._force_close(exc)
315 else:
316 self.close()
317
318
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100320 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 transports.Transport):
322 """Transport for duplex pipes."""
323
324 def can_write_eof(self):
325 return False
326
327 def write_eof(self):
328 raise NotImplementedError
329
330
331class _ProactorSocketTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100332 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 transports.Transport):
334 """Transport for connected sockets."""
335
336 def _set_extra(self, sock):
337 self._extra['socket'] = sock
338 try:
339 self._extra['sockname'] = sock.getsockname()
340 except (socket.error, AttributeError):
341 pass
342 if 'peername' not in self._extra:
343 try:
344 self._extra['peername'] = sock.getpeername()
345 except (socket.error, AttributeError):
346 pass
347
348 def can_write_eof(self):
349 return True
350
351 def write_eof(self):
352 if self._closing or self._eof_written:
353 return
354 self._eof_written = True
355 if self._write_fut is None:
356 self._sock.shutdown(socket.SHUT_WR)
357
358
359class BaseProactorEventLoop(base_events.BaseEventLoop):
360
361 def __init__(self, proactor):
362 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700363 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 self._proactor = proactor
365 self._selector = proactor # convenient alias
Victor Stinner7de26462014-01-11 00:03:21 +0100366 self._self_reading_future = None
367 self._accept_futures = {} # socket file descriptor => Future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 proactor.set_loop(self)
369 self._make_self_pipe()
370
371 def _make_socket_transport(self, sock, protocol, waiter=None,
372 extra=None, server=None):
373 return _ProactorSocketTransport(self, sock, protocol, waiter,
374 extra, server)
375
376 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
377 extra=None):
378 return _ProactorDuplexPipeTransport(self,
379 sock, protocol, waiter, extra)
380
381 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
382 extra=None):
383 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
384
385 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100386 extra=None):
387 # We want connection_lost() to be called when other end closes
388 return _ProactorWritePipeTransport(self,
389 sock, protocol, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 def close(self):
392 if self._proactor is not None:
393 self._close_self_pipe()
394 self._proactor.close()
395 self._proactor = None
396 self._selector = None
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200397 super().close()
Victor Stinner7de26462014-01-11 00:03:21 +0100398 self._accept_futures.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
400 def sock_recv(self, sock, n):
401 return self._proactor.recv(sock, n)
402
403 def sock_sendall(self, sock, data):
404 return self._proactor.send(sock, data)
405
406 def sock_connect(self, sock, address):
Victor Stinner1b0580b2014-02-13 09:24:37 +0100407 try:
408 base_events._check_resolved_address(sock, address)
409 except ValueError as err:
410 fut = futures.Future(loop=self)
411 fut.set_exception(err)
412 return fut
413 else:
414 return self._proactor.connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415
416 def sock_accept(self, sock):
417 return self._proactor.accept(sock)
418
419 def _socketpair(self):
420 raise NotImplementedError
421
422 def _close_self_pipe(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100423 if self._self_reading_future is not None:
424 self._self_reading_future.cancel()
425 self._self_reading_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 self._ssock.close()
427 self._ssock = None
428 self._csock.close()
429 self._csock = None
430 self._internal_fds -= 1
431
432 def _make_self_pipe(self):
433 # A self-socket, really. :-)
434 self._ssock, self._csock = self._socketpair()
435 self._ssock.setblocking(False)
436 self._csock.setblocking(False)
437 self._internal_fds += 1
438 self.call_soon(self._loop_self_reading)
439
440 def _loop_self_reading(self, f=None):
441 try:
442 if f is not None:
443 f.result() # may raise
444 f = self._proactor.recv(self._ssock, 4096)
445 except:
446 self.close()
447 raise
448 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100449 self._self_reading_future = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 f.add_done_callback(self._loop_self_reading)
451
452 def _write_to_self(self):
453 self._csock.send(b'x')
454
455 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800456 if ssl:
457 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
459 def loop(f=None):
460 try:
461 if f is not None:
462 conn, addr = f.result()
463 protocol = protocol_factory()
464 self._make_socket_transport(
465 conn, protocol,
466 extra={'peername': addr}, server=server)
467 f = self._proactor.accept(sock)
468 except OSError:
469 if sock.fileno() != -1:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700470 logger.exception('Accept failed')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 sock.close()
472 except futures.CancelledError:
473 sock.close()
474 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100475 self._accept_futures[sock.fileno()] = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 f.add_done_callback(loop)
477
478 self.call_soon(loop)
479
480 def _process_events(self, event_list):
481 pass # XXX hard work currently done in poll
482
483 def _stop_serving(self, sock):
Victor Stinner7de26462014-01-11 00:03:21 +0100484 for future in self._accept_futures.values():
485 future.cancel()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 self._proactor._stop_serving(sock)
487 sock.close()