blob: d09e9faa1b580423d77d2fb7cbba0babe604a73f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseProactorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import socket
10
11from . import base_events
12from . import constants
13from . import futures
14from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
Yury Selivanov3cb99142014-02-18 18:41:13 -050018class _ProactorBasePipeTransport(transports._FlowControlMixin,
19 transports.BaseTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020 """Base class for pipe and socket transports."""
21
22 def __init__(self, loop, sock, protocol, waiter=None,
23 extra=None, server=None):
24 super().__init__(extra)
25 self._set_extra(sock)
26 self._loop = loop
27 self._sock = sock
28 self._protocol = protocol
29 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080030 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031 self._read_fut = None
32 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010033 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 self._conn_lost = 0
35 self._closing = False # Set when close() called.
36 self._eof_written = False
37 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +020038 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 self._loop.call_soon(self._protocol.connection_made, self)
40 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +020041 # wait until protocol.connection_made() has been called
Victor Stinner799a60c2014-07-07 18:08:22 +020042 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
Victor Stinnere912e652014-07-12 03:11:53 +020044 def __repr__(self):
45 info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
46 if self._read_fut is not None:
47 ov = "pending" if self._read_fut.ov.pending else "completed"
48 info.append('read=%s' % ov)
49 if self._write_fut is not None:
50 if self._write_fut.ov.pending:
51 info.append("write=pending=%s" % self._pending_write)
52 else:
53 info.append("write=completed")
54 if self._buffer:
55 bufsize = len(self._buffer)
56 info.append('write_bufsize=%s' % bufsize)
57 if self._eof_written:
58 info.append('EOF written')
59 return '<%s>' % ' '.join(info)
60
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061 def _set_extra(self, sock):
62 self._extra['pipe'] = sock
63
64 def close(self):
65 if self._closing:
66 return
67 self._closing = True
68 self._conn_lost += 1
69 if not self._buffer and self._write_fut is None:
70 self._loop.call_soon(self._call_connection_lost, None)
71 if self._read_fut is not None:
72 self._read_fut.cancel()
73
Victor Stinner0ee29c22014-02-19 01:40:41 +010074 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Victor Stinnere912e652014-07-12 03:11:53 +020075 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
76 if self._loop.get_debug():
77 logger.debug("%r: %s", self, message, exc_info=True)
78 else:
Yury Selivanov569efa22014-02-18 18:02:19 -050079 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +010080 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -050081 'exception': exc,
82 'transport': self,
83 'protocol': self._protocol,
84 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._force_close(exc)
86
87 def _force_close(self, exc):
88 if self._closing:
89 return
90 self._closing = True
91 self._conn_lost += 1
92 if self._write_fut:
93 self._write_fut.cancel()
94 if self._read_fut:
95 self._read_fut.cancel()
96 self._write_fut = self._read_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010097 self._pending_write = 0
Guido van Rossumebb8e582013-12-04 12:12:07 -080098 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 self._loop.call_soon(self._call_connection_lost, exc)
100
101 def _call_connection_lost(self, exc):
102 try:
103 self._protocol.connection_lost(exc)
104 finally:
105 # XXX If there is a pending overlapped read on the other
106 # end then it may fail with ERROR_NETNAME_DELETED if we
107 # just close our end. First calling shutdown() seems to
108 # cure it, but maybe using DisconnectEx() would be better.
109 if hasattr(self._sock, 'shutdown'):
110 self._sock.shutdown(socket.SHUT_RDWR)
111 self._sock.close()
112 server = self._server
113 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200114 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 self._server = None
116
Guido van Rossumebb8e582013-12-04 12:12:07 -0800117 def get_write_buffer_size(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100118 size = self._pending_write
119 if self._buffer is not None:
120 size += len(self._buffer)
121 return size
Guido van Rossumebb8e582013-12-04 12:12:07 -0800122
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123
124class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
125 transports.ReadTransport):
126 """Transport for read pipes."""
127
128 def __init__(self, loop, sock, protocol, waiter=None,
129 extra=None, server=None):
130 super().__init__(loop, sock, protocol, waiter, extra, server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131 self._paused = False
132 self._loop.call_soon(self._loop_reading)
133
Guido van Rossum57497ad2013-10-18 07:58:20 -0700134 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800135 if self._closing:
136 raise RuntimeError('Cannot pause_reading() when closing')
137 if self._paused:
138 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 self._paused = True
Victor Stinnere912e652014-07-12 03:11:53 +0200140 if self._loop.get_debug():
141 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
Guido van Rossum57497ad2013-10-18 07:58:20 -0700143 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800144 if not self._paused:
145 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 self._paused = False
147 if self._closing:
148 return
149 self._loop.call_soon(self._loop_reading, self._read_fut)
Victor Stinnere912e652014-07-12 03:11:53 +0200150 if self._loop.get_debug():
151 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152
153 def _loop_reading(self, fut=None):
154 if self._paused:
155 return
156 data = None
157
158 try:
159 if fut is not None:
160 assert self._read_fut is fut or (self._read_fut is None and
161 self._closing)
162 self._read_fut = None
163 data = fut.result() # deliver data later in "finally" clause
164
165 if self._closing:
166 # since close() has been called we ignore any read data
167 data = None
168 return
169
170 if data == b'':
171 # we got end-of-file so no need to reschedule a new read
172 return
173
174 # reschedule a new read
175 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
176 except ConnectionAbortedError as exc:
177 if not self._closing:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100178 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 except ConnectionResetError as exc:
180 self._force_close(exc)
181 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100182 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 except futures.CancelledError:
184 if not self._closing:
185 raise
186 else:
187 self._read_fut.add_done_callback(self._loop_reading)
188 finally:
189 if data:
190 self._protocol.data_received(data)
191 elif data is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200192 if self._loop.get_debug():
193 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 keep_open = self._protocol.eof_received()
195 if not keep_open:
196 self.close()
197
198
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100199class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Victor Stinner915bcb02014-02-01 22:49:59 +0100200 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 """Transport for write pipes."""
202
203 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800204 if not isinstance(data, (bytes, bytearray, memoryview)):
205 raise TypeError('data argument must be byte-ish (%r)',
206 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800208 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209
210 if not data:
211 return
212
213 if self._conn_lost:
214 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700215 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 self._conn_lost += 1
217 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
Guido van Rossumebb8e582013-12-04 12:12:07 -0800219 # Observable states:
220 # 1. IDLE: _write_fut and _buffer both None
221 # 2. WRITING: _write_fut set; _buffer None
222 # 3. BACKED UP: _write_fut set; _buffer a bytearray
223 # We always copy the data, so the caller can't modify it
224 # while we're still waiting for the I/O to happen.
225 if self._write_fut is None: # IDLE -> WRITING
226 assert self._buffer is None
227 # Pass a copy, except if it's already immutable.
228 self._loop_writing(data=bytes(data))
229 # XXX Should we pause the protocol at this point
230 # if len(data) > self._high_water? (That would
231 # require keeping track of the number of bytes passed
232 # to a send() that hasn't finished yet.)
233 elif not self._buffer: # WRITING -> BACKED UP
234 # Make a mutable copy which we can extend.
235 self._buffer = bytearray(data)
236 self._maybe_pause_protocol()
237 else: # BACKED UP
238 # Append to buffer (also copies).
239 self._buffer.extend(data)
240 self._maybe_pause_protocol()
241
242 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 try:
244 assert f is self._write_fut
245 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +0100246 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 if f:
248 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800249 if data is None:
250 data = self._buffer
251 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 if not data:
253 if self._closing:
254 self._loop.call_soon(self._call_connection_lost, None)
255 if self._eof_written:
256 self._sock.shutdown(socket.SHUT_WR)
Victor Stinner915bcb02014-02-01 22:49:59 +0100257 # Now that we've reduced the buffer size, tell the
258 # protocol to resume writing if it was paused. Note that
259 # we do this last since the callback is called immediately
260 # and it may add more data to the buffer (even causing the
261 # protocol to be paused again).
262 self._maybe_resume_protocol()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800263 else:
264 self._write_fut = self._loop._proactor.send(self._sock, data)
Victor Stinner915bcb02014-02-01 22:49:59 +0100265 if not self._write_fut.done():
266 assert self._pending_write == 0
267 self._pending_write = len(data)
268 self._write_fut.add_done_callback(self._loop_writing)
269 self._maybe_pause_protocol()
270 else:
271 self._write_fut.add_done_callback(self._loop_writing)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 except ConnectionResetError as exc:
273 self._force_close(exc)
274 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100275 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 def can_write_eof(self):
278 return True
279
280 def write_eof(self):
281 self.close()
282
283 def abort(self):
284 self._force_close(None)
285
286
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100287class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
288 def __init__(self, *args, **kw):
289 super().__init__(*args, **kw)
290 self._read_fut = self._loop._proactor.recv(self._sock, 16)
291 self._read_fut.add_done_callback(self._pipe_closed)
292
293 def _pipe_closed(self, fut):
294 if fut.cancelled():
295 # the transport has been closed
296 return
Victor Stinner83bdfa02014-02-04 08:57:48 +0100297 assert fut.result() == b''
298 if self._closing:
299 assert self._read_fut is None
300 return
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100301 assert fut is self._read_fut, (fut, self._read_fut)
302 self._read_fut = None
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100303 if self._write_fut is not None:
Victor Stinner6f24d832014-02-20 10:33:01 +0100304 self._force_close(BrokenPipeError())
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100305 else:
306 self.close()
307
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100310 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 transports.Transport):
312 """Transport for duplex pipes."""
313
314 def can_write_eof(self):
315 return False
316
317 def write_eof(self):
318 raise NotImplementedError
319
320
321class _ProactorSocketTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100322 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 transports.Transport):
324 """Transport for connected sockets."""
325
326 def _set_extra(self, sock):
327 self._extra['socket'] = sock
328 try:
329 self._extra['sockname'] = sock.getsockname()
330 except (socket.error, AttributeError):
331 pass
332 if 'peername' not in self._extra:
333 try:
334 self._extra['peername'] = sock.getpeername()
335 except (socket.error, AttributeError):
336 pass
337
338 def can_write_eof(self):
339 return True
340
341 def write_eof(self):
342 if self._closing or self._eof_written:
343 return
344 self._eof_written = True
345 if self._write_fut is None:
346 self._sock.shutdown(socket.SHUT_WR)
347
348
349class BaseProactorEventLoop(base_events.BaseEventLoop):
350
351 def __init__(self, proactor):
352 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700353 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._proactor = proactor
355 self._selector = proactor # convenient alias
Victor Stinner7de26462014-01-11 00:03:21 +0100356 self._self_reading_future = None
357 self._accept_futures = {} # socket file descriptor => Future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 proactor.set_loop(self)
359 self._make_self_pipe()
360
361 def _make_socket_transport(self, sock, protocol, waiter=None,
362 extra=None, server=None):
363 return _ProactorSocketTransport(self, sock, protocol, waiter,
364 extra, server)
365
366 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
367 extra=None):
368 return _ProactorDuplexPipeTransport(self,
369 sock, protocol, waiter, extra)
370
371 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
372 extra=None):
373 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
374
375 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100376 extra=None):
377 # We want connection_lost() to be called when other end closes
378 return _ProactorWritePipeTransport(self,
379 sock, protocol, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
381 def close(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200382 if self.is_closed():
383 return
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200384 super().close()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200385 self._stop_accept_futures()
386 self._close_self_pipe()
387 self._proactor.close()
388 self._proactor = None
389 self._selector = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 def sock_recv(self, sock, n):
392 return self._proactor.recv(sock, n)
393
394 def sock_sendall(self, sock, data):
395 return self._proactor.send(sock, data)
396
397 def sock_connect(self, sock, address):
Victor Stinner1b0580b2014-02-13 09:24:37 +0100398 try:
399 base_events._check_resolved_address(sock, address)
400 except ValueError as err:
401 fut = futures.Future(loop=self)
402 fut.set_exception(err)
403 return fut
404 else:
405 return self._proactor.connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
407 def sock_accept(self, sock):
408 return self._proactor.accept(sock)
409
410 def _socketpair(self):
411 raise NotImplementedError
412
413 def _close_self_pipe(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100414 if self._self_reading_future is not None:
415 self._self_reading_future.cancel()
416 self._self_reading_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 self._ssock.close()
418 self._ssock = None
419 self._csock.close()
420 self._csock = None
421 self._internal_fds -= 1
422
423 def _make_self_pipe(self):
424 # A self-socket, really. :-)
425 self._ssock, self._csock = self._socketpair()
426 self._ssock.setblocking(False)
427 self._csock.setblocking(False)
428 self._internal_fds += 1
Victor Stinnere912e652014-07-12 03:11:53 +0200429 # don't check the current loop because _make_self_pipe() is called
430 # from the event loop constructor
431 self._call_soon(self._loop_self_reading, (), check_loop=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433 def _loop_self_reading(self, f=None):
434 try:
435 if f is not None:
436 f.result() # may raise
437 f = self._proactor.recv(self._ssock, 4096)
438 except:
439 self.close()
440 raise
441 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100442 self._self_reading_future = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 f.add_done_callback(self._loop_self_reading)
444
445 def _write_to_self(self):
446 self._csock.send(b'x')
447
448 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800449 if ssl:
450 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 def loop(f=None):
453 try:
454 if f is not None:
455 conn, addr = f.result()
Victor Stinnere912e652014-07-12 03:11:53 +0200456 if self._debug:
457 logger.debug("%r got a new connection from %r: %r",
458 server, addr, conn)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 protocol = protocol_factory()
460 self._make_socket_transport(
461 conn, protocol,
462 extra={'peername': addr}, server=server)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200463 if self.is_closed():
464 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 f = self._proactor.accept(sock)
Yury Selivanov569efa22014-02-18 18:02:19 -0500466 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 if sock.fileno() != -1:
Yury Selivanov569efa22014-02-18 18:02:19 -0500468 self.call_exception_handler({
469 'message': 'Accept failed',
470 'exception': exc,
471 'socket': sock,
472 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 sock.close()
474 except futures.CancelledError:
475 sock.close()
476 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100477 self._accept_futures[sock.fileno()] = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 f.add_done_callback(loop)
479
480 self.call_soon(loop)
481
482 def _process_events(self, event_list):
483 pass # XXX hard work currently done in poll
484
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200485 def _stop_accept_futures(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100486 for future in self._accept_futures.values():
487 future.cancel()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200488 self._accept_futures.clear()
489
490 def _stop_serving(self, sock):
491 self._stop_accept_futures()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._proactor._stop_serving(sock)
493 sock.close()