blob: d0b601d7c0b148dab01886bcc0e4bb194c46f29d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseProactorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import socket
10
11from . import base_events
12from . import constants
13from . import futures
14from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070015from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016
17
Yury Selivanov3cb99142014-02-18 18:41:13 -050018class _ProactorBasePipeTransport(transports._FlowControlMixin,
19 transports.BaseTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020 """Base class for pipe and socket transports."""
21
22 def __init__(self, loop, sock, protocol, waiter=None,
23 extra=None, server=None):
24 super().__init__(extra)
25 self._set_extra(sock)
26 self._loop = loop
27 self._sock = sock
28 self._protocol = protocol
29 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080030 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031 self._read_fut = None
32 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010033 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 self._conn_lost = 0
35 self._closing = False # Set when close() called.
36 self._eof_written = False
37 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +020038 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 self._loop.call_soon(self._protocol.connection_made, self)
40 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +020041 # wait until protocol.connection_made() has been called
Victor Stinner799a60c2014-07-07 18:08:22 +020042 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44 def _set_extra(self, sock):
45 self._extra['pipe'] = sock
46
47 def close(self):
48 if self._closing:
49 return
50 self._closing = True
51 self._conn_lost += 1
52 if not self._buffer and self._write_fut is None:
53 self._loop.call_soon(self._call_connection_lost, None)
54 if self._read_fut is not None:
55 self._read_fut.cancel()
56
Victor Stinner0ee29c22014-02-19 01:40:41 +010057 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Victor Stinner63b4d4b2014-01-29 13:12:03 -080058 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -050059 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +010060 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -050061 'exception': exc,
62 'transport': self,
63 'protocol': self._protocol,
64 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070065 self._force_close(exc)
66
67 def _force_close(self, exc):
68 if self._closing:
69 return
70 self._closing = True
71 self._conn_lost += 1
72 if self._write_fut:
73 self._write_fut.cancel()
74 if self._read_fut:
75 self._read_fut.cancel()
76 self._write_fut = self._read_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +010077 self._pending_write = 0
Guido van Rossumebb8e582013-12-04 12:12:07 -080078 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 self._loop.call_soon(self._call_connection_lost, exc)
80
81 def _call_connection_lost(self, exc):
82 try:
83 self._protocol.connection_lost(exc)
84 finally:
85 # XXX If there is a pending overlapped read on the other
86 # end then it may fail with ERROR_NETNAME_DELETED if we
87 # just close our end. First calling shutdown() seems to
88 # cure it, but maybe using DisconnectEx() would be better.
89 if hasattr(self._sock, 'shutdown'):
90 self._sock.shutdown(socket.SHUT_RDWR)
91 self._sock.close()
92 server = self._server
93 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +020094 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 self._server = None
96
Guido van Rossumebb8e582013-12-04 12:12:07 -080097 def get_write_buffer_size(self):
Victor Stinner915bcb02014-02-01 22:49:59 +010098 size = self._pending_write
99 if self._buffer is not None:
100 size += len(self._buffer)
101 return size
Guido van Rossumebb8e582013-12-04 12:12:07 -0800102
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
105 transports.ReadTransport):
106 """Transport for read pipes."""
107
108 def __init__(self, loop, sock, protocol, waiter=None,
109 extra=None, server=None):
110 super().__init__(loop, sock, protocol, waiter, extra, server)
111 self._read_fut = None
112 self._paused = False
113 self._loop.call_soon(self._loop_reading)
114
Guido van Rossum57497ad2013-10-18 07:58:20 -0700115 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800116 if self._closing:
117 raise RuntimeError('Cannot pause_reading() when closing')
118 if self._paused:
119 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 self._paused = True
121
Guido van Rossum57497ad2013-10-18 07:58:20 -0700122 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800123 if not self._paused:
124 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 self._paused = False
126 if self._closing:
127 return
128 self._loop.call_soon(self._loop_reading, self._read_fut)
129
130 def _loop_reading(self, fut=None):
131 if self._paused:
132 return
133 data = None
134
135 try:
136 if fut is not None:
137 assert self._read_fut is fut or (self._read_fut is None and
138 self._closing)
139 self._read_fut = None
140 data = fut.result() # deliver data later in "finally" clause
141
142 if self._closing:
143 # since close() has been called we ignore any read data
144 data = None
145 return
146
147 if data == b'':
148 # we got end-of-file so no need to reschedule a new read
149 return
150
151 # reschedule a new read
152 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
153 except ConnectionAbortedError as exc:
154 if not self._closing:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100155 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156 except ConnectionResetError as exc:
157 self._force_close(exc)
158 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100159 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 except futures.CancelledError:
161 if not self._closing:
162 raise
163 else:
164 self._read_fut.add_done_callback(self._loop_reading)
165 finally:
166 if data:
167 self._protocol.data_received(data)
168 elif data is not None:
169 keep_open = self._protocol.eof_received()
170 if not keep_open:
171 self.close()
172
173
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100174class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Victor Stinner915bcb02014-02-01 22:49:59 +0100175 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 """Transport for write pipes."""
177
178 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800179 if not isinstance(data, (bytes, bytearray, memoryview)):
180 raise TypeError('data argument must be byte-ish (%r)',
181 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800183 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185 if not data:
186 return
187
188 if self._conn_lost:
189 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700190 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191 self._conn_lost += 1
192 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
Guido van Rossumebb8e582013-12-04 12:12:07 -0800194 # Observable states:
195 # 1. IDLE: _write_fut and _buffer both None
196 # 2. WRITING: _write_fut set; _buffer None
197 # 3. BACKED UP: _write_fut set; _buffer a bytearray
198 # We always copy the data, so the caller can't modify it
199 # while we're still waiting for the I/O to happen.
200 if self._write_fut is None: # IDLE -> WRITING
201 assert self._buffer is None
202 # Pass a copy, except if it's already immutable.
203 self._loop_writing(data=bytes(data))
204 # XXX Should we pause the protocol at this point
205 # if len(data) > self._high_water? (That would
206 # require keeping track of the number of bytes passed
207 # to a send() that hasn't finished yet.)
208 elif not self._buffer: # WRITING -> BACKED UP
209 # Make a mutable copy which we can extend.
210 self._buffer = bytearray(data)
211 self._maybe_pause_protocol()
212 else: # BACKED UP
213 # Append to buffer (also copies).
214 self._buffer.extend(data)
215 self._maybe_pause_protocol()
216
217 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 try:
219 assert f is self._write_fut
220 self._write_fut = None
Victor Stinner915bcb02014-02-01 22:49:59 +0100221 self._pending_write = 0
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 if f:
223 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800224 if data is None:
225 data = self._buffer
226 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 if not data:
228 if self._closing:
229 self._loop.call_soon(self._call_connection_lost, None)
230 if self._eof_written:
231 self._sock.shutdown(socket.SHUT_WR)
Victor Stinner915bcb02014-02-01 22:49:59 +0100232 # Now that we've reduced the buffer size, tell the
233 # protocol to resume writing if it was paused. Note that
234 # we do this last since the callback is called immediately
235 # and it may add more data to the buffer (even causing the
236 # protocol to be paused again).
237 self._maybe_resume_protocol()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800238 else:
239 self._write_fut = self._loop._proactor.send(self._sock, data)
Victor Stinner915bcb02014-02-01 22:49:59 +0100240 if not self._write_fut.done():
241 assert self._pending_write == 0
242 self._pending_write = len(data)
243 self._write_fut.add_done_callback(self._loop_writing)
244 self._maybe_pause_protocol()
245 else:
246 self._write_fut.add_done_callback(self._loop_writing)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 except ConnectionResetError as exc:
248 self._force_close(exc)
249 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100250 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def can_write_eof(self):
253 return True
254
255 def write_eof(self):
256 self.close()
257
258 def abort(self):
259 self._force_close(None)
260
261
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100262class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
263 def __init__(self, *args, **kw):
264 super().__init__(*args, **kw)
265 self._read_fut = self._loop._proactor.recv(self._sock, 16)
266 self._read_fut.add_done_callback(self._pipe_closed)
267
268 def _pipe_closed(self, fut):
269 if fut.cancelled():
270 # the transport has been closed
271 return
Victor Stinner83bdfa02014-02-04 08:57:48 +0100272 assert fut.result() == b''
273 if self._closing:
274 assert self._read_fut is None
275 return
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100276 assert fut is self._read_fut, (fut, self._read_fut)
277 self._read_fut = None
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100278 if self._write_fut is not None:
Victor Stinner6f24d832014-02-20 10:33:01 +0100279 self._force_close(BrokenPipeError())
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100280 else:
281 self.close()
282
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100285 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 transports.Transport):
287 """Transport for duplex pipes."""
288
289 def can_write_eof(self):
290 return False
291
292 def write_eof(self):
293 raise NotImplementedError
294
295
296class _ProactorSocketTransport(_ProactorReadPipeTransport,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100297 _ProactorBaseWritePipeTransport,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 transports.Transport):
299 """Transport for connected sockets."""
300
301 def _set_extra(self, sock):
302 self._extra['socket'] = sock
303 try:
304 self._extra['sockname'] = sock.getsockname()
305 except (socket.error, AttributeError):
306 pass
307 if 'peername' not in self._extra:
308 try:
309 self._extra['peername'] = sock.getpeername()
310 except (socket.error, AttributeError):
311 pass
312
313 def can_write_eof(self):
314 return True
315
316 def write_eof(self):
317 if self._closing or self._eof_written:
318 return
319 self._eof_written = True
320 if self._write_fut is None:
321 self._sock.shutdown(socket.SHUT_WR)
322
323
324class BaseProactorEventLoop(base_events.BaseEventLoop):
325
326 def __init__(self, proactor):
327 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700328 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._proactor = proactor
330 self._selector = proactor # convenient alias
Victor Stinner7de26462014-01-11 00:03:21 +0100331 self._self_reading_future = None
332 self._accept_futures = {} # socket file descriptor => Future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 proactor.set_loop(self)
334 self._make_self_pipe()
335
336 def _make_socket_transport(self, sock, protocol, waiter=None,
337 extra=None, server=None):
338 return _ProactorSocketTransport(self, sock, protocol, waiter,
339 extra, server)
340
341 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
342 extra=None):
343 return _ProactorDuplexPipeTransport(self,
344 sock, protocol, waiter, extra)
345
346 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
347 extra=None):
348 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
349
350 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Victor Stinnerb60e9ca2014-01-31 14:18:18 +0100351 extra=None):
352 # We want connection_lost() to be called when other end closes
353 return _ProactorWritePipeTransport(self,
354 sock, protocol, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355
356 def close(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200357 if self.is_closed():
358 return
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200359 super().close()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200360 self._stop_accept_futures()
361 self._close_self_pipe()
362 self._proactor.close()
363 self._proactor = None
364 self._selector = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366 def sock_recv(self, sock, n):
367 return self._proactor.recv(sock, n)
368
369 def sock_sendall(self, sock, data):
370 return self._proactor.send(sock, data)
371
372 def sock_connect(self, sock, address):
Victor Stinner1b0580b2014-02-13 09:24:37 +0100373 try:
374 base_events._check_resolved_address(sock, address)
375 except ValueError as err:
376 fut = futures.Future(loop=self)
377 fut.set_exception(err)
378 return fut
379 else:
380 return self._proactor.connect(sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382 def sock_accept(self, sock):
383 return self._proactor.accept(sock)
384
385 def _socketpair(self):
386 raise NotImplementedError
387
388 def _close_self_pipe(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100389 if self._self_reading_future is not None:
390 self._self_reading_future.cancel()
391 self._self_reading_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._ssock.close()
393 self._ssock = None
394 self._csock.close()
395 self._csock = None
396 self._internal_fds -= 1
397
398 def _make_self_pipe(self):
399 # A self-socket, really. :-)
400 self._ssock, self._csock = self._socketpair()
401 self._ssock.setblocking(False)
402 self._csock.setblocking(False)
403 self._internal_fds += 1
404 self.call_soon(self._loop_self_reading)
405
406 def _loop_self_reading(self, f=None):
407 try:
408 if f is not None:
409 f.result() # may raise
410 f = self._proactor.recv(self._ssock, 4096)
411 except:
412 self.close()
413 raise
414 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100415 self._self_reading_future = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 f.add_done_callback(self._loop_self_reading)
417
418 def _write_to_self(self):
419 self._csock.send(b'x')
420
421 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800422 if ssl:
423 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
425 def loop(f=None):
426 try:
427 if f is not None:
428 conn, addr = f.result()
429 protocol = protocol_factory()
430 self._make_socket_transport(
431 conn, protocol,
432 extra={'peername': addr}, server=server)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200433 if self.is_closed():
434 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 f = self._proactor.accept(sock)
Yury Selivanov569efa22014-02-18 18:02:19 -0500436 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 if sock.fileno() != -1:
Yury Selivanov569efa22014-02-18 18:02:19 -0500438 self.call_exception_handler({
439 'message': 'Accept failed',
440 'exception': exc,
441 'socket': sock,
442 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 sock.close()
444 except futures.CancelledError:
445 sock.close()
446 else:
Victor Stinner7de26462014-01-11 00:03:21 +0100447 self._accept_futures[sock.fileno()] = f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 f.add_done_callback(loop)
449
450 self.call_soon(loop)
451
452 def _process_events(self, event_list):
453 pass # XXX hard work currently done in poll
454
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200455 def _stop_accept_futures(self):
Victor Stinner7de26462014-01-11 00:03:21 +0100456 for future in self._accept_futures.values():
457 future.cancel()
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200458 self._accept_futures.clear()
459
460 def _stop_serving(self, sock):
461 self._stop_accept_futures()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 self._proactor._stop_serving(sock)
463 sock.close()