blob: b76f69ee57107d0b07e5a63b53d6e4d1b0a1141d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseProactorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import socket
10
11from . import base_events
12from . import constants
13from . import futures
14from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
Yury Selivanov3cb99142014-02-18 18:41:13 -050018class _ProactorBasePipeTransport(transports._FlowControlMixin,
19 transports.BaseTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020 """Base class for pipe and socket transports."""
21
22 def __init__(self, loop, sock, protocol, waiter=None,
23 extra=None, server=None):
24 super().__init__(extra)
25 self._set_extra(sock)
26 self._loop = loop
27 self._sock = sock
28 self._protocol = protocol
29 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080030 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031 self._read_fut = None
32 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010033 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 self._conn_lost = 0
35 self._closing = False # Set when close() called.
36 self._eof_written = False
37 if self._server is not None:
38 self._server.attach(self)
39 self._loop.call_soon(self._protocol.connection_made, self)
40 if waiter is not None:
41 self._loop.call_soon(waiter.set_result, None)
42
43 def _set_extra(self, sock):
44 self._extra['pipe'] = sock
45
46 def close(self):
47 if self._closing:
48 return
49 self._closing = True
50 self._conn_lost += 1
51 if not self._buffer and self._write_fut is None:
52 self._loop.call_soon(self._call_connection_lost, None)
53 if self._read_fut is not None:
54 self._read_fut.cancel()
55
Victor Stinner0ee29c22014-02-19 01:40:41 +010056 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Victor Stinner63b4d4b2014-01-29 13:12:03 -080057 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -050058 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +010059 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -050060 'exception': exc,
61 'transport': self,
62 'protocol': self._protocol,
63 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 self._force_close(exc)
65
66 def _force_close(self, exc):
67 if self._closing:
68 return
69 self._closing = True
70 self._conn_lost += 1
71 if self._write_fut:
72 self._write_fut.cancel()
73 if self._read_fut:
74 self._read_fut.cancel()
75 self._write_fut = self._read_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010076 self._pending_write = 0
Guido van Rossumebb8e582013-12-04 12:12:07 -080077 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 self._loop.call_soon(self._call_connection_lost, exc)
79
80 def _call_connection_lost(self, exc):
81 try:
82 self._protocol.connection_lost(exc)
83 finally:
84 # XXX If there is a pending overlapped read on the other
85 # end then it may fail with ERROR_NETNAME_DELETED if we
86 # just close our end. First calling shutdown() seems to
87 # cure it, but maybe using DisconnectEx() would be better.
88 if hasattr(self._sock, 'shutdown'):
89 self._sock.shutdown(socket.SHUT_RDWR)
90 self._sock.close()
91 server = self._server
92 if server is not None:
93 server.detach(self)
94 self._server = None
95
Guido van Rossumebb8e582013-12-04 12:12:07 -080096 def get_write_buffer_size(self):
Victor Stinner915bcb02014-02-01 22:49:59 +010097 size = self._pending_write
98 if self._buffer is not None:
99 size += len(self._buffer)
100 return size
Guido van Rossumebb8e582013-12-04 12:12:07 -0800101
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
104 transports.ReadTransport):
105 """Transport for read pipes."""
106
107 def __init__(self, loop, sock, protocol, waiter=None,
108 extra=None, server=None):
109 super().__init__(loop, sock, protocol, waiter, extra, server)
110 self._read_fut = None
111 self._paused = False
112 self._loop.call_soon(self._loop_reading)
113
Guido van Rossum57497ad2013-10-18 07:58:20 -0700114 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800115 if self._closing:
116 raise RuntimeError('Cannot pause_reading() when closing')
117 if self._paused:
118 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 self._paused = True
120
Guido van Rossum57497ad2013-10-18 07:58:20 -0700121 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800122 if not self._paused:
123 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 self._paused = False
125 if self._closing:
126 return
127 self._loop.call_soon(self._loop_reading, self._read_fut)
128
129 def _loop_reading(self, fut=None):
130 if self._paused:
131 return
132 data = None
133
134 try:
135 if fut is not None:
136 assert self._read_fut is fut or (self._read_fut is None and
137 self._closing)
138 self._read_fut = None
139 data = fut.result() # deliver data later in "finally" clause
140
141 if self._closing:
142 # since close() has been called we ignore any read data
143 data = None
144 return
145
146 if data == b'':
147 # we got end-of-file so no need to reschedule a new read
148 return
149
150 # reschedule a new read
151 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
152 except ConnectionAbortedError as exc:
153 if not self._closing:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100154 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 except ConnectionResetError as exc:
156 self._force_close(exc)
157 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100158 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 except futures.CancelledError:
160 if not self._closing:
161 raise
162 else:
163 self._read_fut.add_done_callback(self._loop_reading)
164 finally:
165 if data:
166 self._protocol.data_received(data)
167 elif data is not None:
168 keep_open = self._protocol.eof_received()
169 if not keep_open:
170 self.close()
171
172
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100173class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Victor Stinner915bcb02014-02-01 22:49:59 +0100174 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 """Transport for write pipes."""
176
177 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800178 if not isinstance(data, (bytes, bytearray, memoryview)):
179 raise TypeError('data argument must be byte-ish (%r)',
180 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800182 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
184 if not data:
185 return
186
187 if self._conn_lost:
188 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700189 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 self._conn_lost += 1
191 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
Guido van Rossumebb8e582013-12-04 12:12:07 -0800193 # Observable states:
194 # 1. IDLE: _write_fut and _buffer both None
195 # 2. WRITING: _write_fut set; _buffer None
196 # 3. BACKED UP: _write_fut set; _buffer a bytearray
197 # We always copy the data, so the caller can't modify it
198 # while we're still waiting for the I/O to happen.
199 if self._write_fut is None: # IDLE -> WRITING
200 assert self._buffer is None
201 # Pass a copy, except if it's already immutable.
202 self._loop_writing(data=bytes(data))
203 # XXX Should we pause the protocol at this point
204 # if len(data) > self._high_water? (That would
205 # require keeping track of the number of bytes passed
206 # to a send() that hasn't finished yet.)
207 elif not self._buffer: # WRITING -> BACKED UP
208 # Make a mutable copy which we can extend.
209 self._buffer = bytearray(data)
210 self._maybe_pause_protocol()
211 else: # BACKED UP
212 # Append to buffer (also copies).
213 self._buffer.extend(data)
214 self._maybe_pause_protocol()
215
216 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 try:
218 assert f is self._write_fut
219 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +0100220 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 if f:
222 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800223 if data is None:
224 data = self._buffer
225 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 if not data:
227 if self._closing:
228 self._loop.call_soon(self._call_connection_lost, None)
229 if self._eof_written:
230 self._sock.shutdown(socket.SHUT_WR)
Victor Stinner915bcb02014-02-01 22:49:59 +0100231 # Now that we've reduced the buffer size, tell the
232 # protocol to resume writing if it was paused. Note that
233 # we do this last since the callback is called immediately
234 # and it may add more data to the buffer (even causing the
235 # protocol to be paused again).
236 self._maybe_resume_protocol()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800237 else:
238 self._write_fut = self._loop._proactor.send(self._sock, data)
Victor Stinner915bcb02014-02-01 22:49:59 +0100239 if not self._write_fut.done():
240 assert self._pending_write == 0
241 self._pending_write = len(data)
242 self._write_fut.add_done_callback(self._loop_writing)
243 self._maybe_pause_protocol()
244 else:
245 self._write_fut.add_done_callback(self._loop_writing)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 except ConnectionResetError as exc:
247 self._force_close(exc)
248 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100249 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250
251 def can_write_eof(self):
252 return True
253
254 def write_eof(self):
255 self.close()
256
257 def abort(self):
258 self._force_close(None)
259
260
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100261class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
262 def __init__(self, *args, **kw):
263 super().__init__(*args, **kw)
264 self._read_fut = self._loop._proactor.recv(self._sock, 16)
265 self._read_fut.add_done_callback(self._pipe_closed)
266
267 def _pipe_closed(self, fut):
268 if fut.cancelled():
269 # the transport has been closed
270 return
Victor Stinner83bdfa02014-02-04 08:57:48 +0100271 assert fut.result() == b''
272 if self._closing:
273 assert self._read_fut is None
274 return
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100275 assert fut is self._read_fut, (fut, self._read_fut)
276 self._read_fut = None
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100277 if self._write_fut is not None:
Victor Stinner6f24d832014-02-20 10:33:01 +0100278 self._force_close(BrokenPipeError())
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100279 else:
280 self.close()
281
282
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100284 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 transports.Transport):
286 """Transport for duplex pipes."""
287
288 def can_write_eof(self):
289 return False
290
291 def write_eof(self):
292 raise NotImplementedError
293
294
295class _ProactorSocketTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100296 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 transports.Transport):
298 """Transport for connected sockets."""
299
300 def _set_extra(self, sock):
301 self._extra['socket'] = sock
302 try:
303 self._extra['sockname'] = sock.getsockname()
304 except (socket.error, AttributeError):
305 pass
306 if 'peername' not in self._extra:
307 try:
308 self._extra['peername'] = sock.getpeername()
309 except (socket.error, AttributeError):
310 pass
311
312 def can_write_eof(self):
313 return True
314
315 def write_eof(self):
316 if self._closing or self._eof_written:
317 return
318 self._eof_written = True
319 if self._write_fut is None:
320 self._sock.shutdown(socket.SHUT_WR)
321
322
323class BaseProactorEventLoop(base_events.BaseEventLoop):
324
325 def __init__(self, proactor):
326 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700327 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self._proactor = proactor
329 self._selector = proactor # convenient alias
Victor Stinner7de26462014-01-11 00:03:21 +0100330 self._self_reading_future = None
331 self._accept_futures = {} # socket file descriptor => Future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 proactor.set_loop(self)
333 self._make_self_pipe()
334
335 def _make_socket_transport(self, sock, protocol, waiter=None,
336 extra=None, server=None):
337 return _ProactorSocketTransport(self, sock, protocol, waiter,
338 extra, server)
339
340 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
341 extra=None):
342 return _ProactorDuplexPipeTransport(self,
343 sock, protocol, waiter, extra)
344
345 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
346 extra=None):
347 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
348
349 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100350 extra=None):
351 # We want connection_lost() to be called when other end closes
352 return _ProactorWritePipeTransport(self,
353 sock, protocol, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355 def close(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200356 if self.is_closed():
357 return
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200358 super().close()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200359 self._stop_accept_futures()
360 self._close_self_pipe()
361 self._proactor.close()
362 self._proactor = None
363 self._selector = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 def sock_recv(self, sock, n):
366 return self._proactor.recv(sock, n)
367
368 def sock_sendall(self, sock, data):
369 return self._proactor.send(sock, data)
370
371 def sock_connect(self, sock, address):
Victor Stinner1b0580b2014-02-13 09:24:37 +0100372 try:
373 base_events._check_resolved_address(sock, address)
374 except ValueError as err:
375 fut = futures.Future(loop=self)
376 fut.set_exception(err)
377 return fut
378 else:
379 return self._proactor.connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
381 def sock_accept(self, sock):
382 return self._proactor.accept(sock)
383
384 def _socketpair(self):
385 raise NotImplementedError
386
387 def _close_self_pipe(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100388 if self._self_reading_future is not None:
389 self._self_reading_future.cancel()
390 self._self_reading_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._ssock.close()
392 self._ssock = None
393 self._csock.close()
394 self._csock = None
395 self._internal_fds -= 1
396
397 def _make_self_pipe(self):
398 # A self-socket, really. :-)
399 self._ssock, self._csock = self._socketpair()
400 self._ssock.setblocking(False)
401 self._csock.setblocking(False)
402 self._internal_fds += 1
403 self.call_soon(self._loop_self_reading)
404
405 def _loop_self_reading(self, f=None):
406 try:
407 if f is not None:
408 f.result() # may raise
409 f = self._proactor.recv(self._ssock, 4096)
410 except:
411 self.close()
412 raise
413 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100414 self._self_reading_future = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 f.add_done_callback(self._loop_self_reading)
416
417 def _write_to_self(self):
418 self._csock.send(b'x')
419
420 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800421 if ssl:
422 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423
424 def loop(f=None):
425 try:
426 if f is not None:
427 conn, addr = f.result()
428 protocol = protocol_factory()
429 self._make_socket_transport(
430 conn, protocol,
431 extra={'peername': addr}, server=server)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200432 if self.is_closed():
433 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 f = self._proactor.accept(sock)
Yury Selivanov569efa22014-02-18 18:02:19 -0500435 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 if sock.fileno() != -1:
Yury Selivanov569efa22014-02-18 18:02:19 -0500437 self.call_exception_handler({
438 'message': 'Accept failed',
439 'exception': exc,
440 'socket': sock,
441 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 sock.close()
443 except futures.CancelledError:
444 sock.close()
445 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100446 self._accept_futures[sock.fileno()] = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 f.add_done_callback(loop)
448
449 self.call_soon(loop)
450
451 def _process_events(self, event_list):
452 pass # XXX hard work currently done in poll
453
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200454 def _stop_accept_futures(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100455 for future in self._accept_futures.values():
456 future.cancel()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200457 self._accept_futures.clear()
458
459 def _stop_serving(self, sock):
460 self._stop_accept_futures()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._proactor._stop_serving(sock)
462 sock.close()