blob: 979bc25fed99ced86890b60521f1f9a097f6a358 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer. Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
7import socket
8
9from . import base_events
10from . import constants
11from . import futures
12from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070013from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
16class _ProactorBasePipeTransport(transports.BaseTransport):
17 """Base class for pipe and socket transports."""
18
19 def __init__(self, loop, sock, protocol, waiter=None,
20 extra=None, server=None):
21 super().__init__(extra)
22 self._set_extra(sock)
23 self._loop = loop
24 self._sock = sock
25 self._protocol = protocol
26 self._server = server
Guido van Rossumebb8e582013-12-04 12:12:07 -080027 self._buffer = None # None or bytearray.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028 self._read_fut = None
29 self._write_fut = None
30 self._conn_lost = 0
31 self._closing = False # Set when close() called.
32 self._eof_written = False
Guido van Rossumebb8e582013-12-04 12:12:07 -080033 self._protocol_paused = False
34 self.set_write_buffer_limits()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035 if self._server is not None:
36 self._server.attach(self)
37 self._loop.call_soon(self._protocol.connection_made, self)
38 if waiter is not None:
39 self._loop.call_soon(waiter.set_result, None)
40
41 def _set_extra(self, sock):
42 self._extra['pipe'] = sock
43
44 def close(self):
45 if self._closing:
46 return
47 self._closing = True
48 self._conn_lost += 1
49 if not self._buffer and self._write_fut is None:
50 self._loop.call_soon(self._call_connection_lost, None)
51 if self._read_fut is not None:
52 self._read_fut.cancel()
53
54 def _fatal_error(self, exc):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070055 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 self._force_close(exc)
57
58 def _force_close(self, exc):
59 if self._closing:
60 return
61 self._closing = True
62 self._conn_lost += 1
63 if self._write_fut:
64 self._write_fut.cancel()
65 if self._read_fut:
66 self._read_fut.cancel()
67 self._write_fut = self._read_fut = None
Guido van Rossumebb8e582013-12-04 12:12:07 -080068 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 self._loop.call_soon(self._call_connection_lost, exc)
70
71 def _call_connection_lost(self, exc):
72 try:
73 self._protocol.connection_lost(exc)
74 finally:
75 # XXX If there is a pending overlapped read on the other
76 # end then it may fail with ERROR_NETNAME_DELETED if we
77 # just close our end. First calling shutdown() seems to
78 # cure it, but maybe using DisconnectEx() would be better.
79 if hasattr(self._sock, 'shutdown'):
80 self._sock.shutdown(socket.SHUT_RDWR)
81 self._sock.close()
82 server = self._server
83 if server is not None:
84 server.detach(self)
85 self._server = None
86
Guido van Rossumebb8e582013-12-04 12:12:07 -080087 # XXX The next four methods are nearly identical to corresponding
88 # ones in _SelectorTransport. Maybe refactor buffer management to
89 # share the implementations? (Also these are really only needed
90 # by _ProactorWritePipeTransport but since _buffer is defined on
91 # the base class I am putting it here for now.)
92
93 def _maybe_pause_protocol(self):
94 size = self.get_write_buffer_size()
95 if size <= self._high_water:
96 return
97 if not self._protocol_paused:
98 self._protocol_paused = True
99 try:
100 self._protocol.pause_writing()
101 except Exception:
102 logger.exception('pause_writing() failed')
103
104 def _maybe_resume_protocol(self):
105 if (self._protocol_paused and
106 self.get_write_buffer_size() <= self._low_water):
107 self._protocol_paused = False
108 try:
109 self._protocol.resume_writing()
110 except Exception:
111 logger.exception('resume_writing() failed')
112
113 def set_write_buffer_limits(self, high=None, low=None):
114 if high is None:
115 if low is None:
116 high = 64*1024
117 else:
118 high = 4*low
119 if low is None:
120 low = high // 4
121 if not high >= low >= 0:
122 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
123 (high, low))
124 self._high_water = high
125 self._low_water = low
126
127 def get_write_buffer_size(self):
128 # NOTE: This doesn't take into account data already passed to
129 # send() even if send() hasn't finished yet.
130 if not self._buffer:
131 return 0
132 return len(self._buffer)
133
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134
135class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
136 transports.ReadTransport):
137 """Transport for read pipes."""
138
139 def __init__(self, loop, sock, protocol, waiter=None,
140 extra=None, server=None):
141 super().__init__(loop, sock, protocol, waiter, extra, server)
142 self._read_fut = None
143 self._paused = False
144 self._loop.call_soon(self._loop_reading)
145
Guido van Rossum57497ad2013-10-18 07:58:20 -0700146 def pause_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800147 if self._closing:
148 raise RuntimeError('Cannot pause_reading() when closing')
149 if self._paused:
150 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 self._paused = True
152
Guido van Rossum57497ad2013-10-18 07:58:20 -0700153 def resume_reading(self):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800154 if not self._paused:
155 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156 self._paused = False
157 if self._closing:
158 return
159 self._loop.call_soon(self._loop_reading, self._read_fut)
160
161 def _loop_reading(self, fut=None):
162 if self._paused:
163 return
164 data = None
165
166 try:
167 if fut is not None:
168 assert self._read_fut is fut or (self._read_fut is None and
169 self._closing)
170 self._read_fut = None
171 data = fut.result() # deliver data later in "finally" clause
172
173 if self._closing:
174 # since close() has been called we ignore any read data
175 data = None
176 return
177
178 if data == b'':
179 # we got end-of-file so no need to reschedule a new read
180 return
181
182 # reschedule a new read
183 self._read_fut = self._loop._proactor.recv(self._sock, 4096)
184 except ConnectionAbortedError as exc:
185 if not self._closing:
186 self._fatal_error(exc)
187 except ConnectionResetError as exc:
188 self._force_close(exc)
189 except OSError as exc:
190 self._fatal_error(exc)
191 except futures.CancelledError:
192 if not self._closing:
193 raise
194 else:
195 self._read_fut.add_done_callback(self._loop_reading)
196 finally:
197 if data:
198 self._protocol.data_received(data)
199 elif data is not None:
200 keep_open = self._protocol.eof_received()
201 if not keep_open:
202 self.close()
203
204
205class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
206 transports.WriteTransport):
207 """Transport for write pipes."""
208
209 def write(self, data):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800210 if not isinstance(data, (bytes, bytearray, memoryview)):
211 raise TypeError('data argument must be byte-ish (%r)',
212 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 if self._eof_written:
Guido van Rossumebb8e582013-12-04 12:12:07 -0800214 raise RuntimeError('write_eof() already called')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215
216 if not data:
217 return
218
219 if self._conn_lost:
220 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700221 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._conn_lost += 1
223 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224
Guido van Rossumebb8e582013-12-04 12:12:07 -0800225 # Observable states:
226 # 1. IDLE: _write_fut and _buffer both None
227 # 2. WRITING: _write_fut set; _buffer None
228 # 3. BACKED UP: _write_fut set; _buffer a bytearray
229 # We always copy the data, so the caller can't modify it
230 # while we're still waiting for the I/O to happen.
231 if self._write_fut is None: # IDLE -> WRITING
232 assert self._buffer is None
233 # Pass a copy, except if it's already immutable.
234 self._loop_writing(data=bytes(data))
235 # XXX Should we pause the protocol at this point
236 # if len(data) > self._high_water? (That would
237 # require keeping track of the number of bytes passed
238 # to a send() that hasn't finished yet.)
239 elif not self._buffer: # WRITING -> BACKED UP
240 # Make a mutable copy which we can extend.
241 self._buffer = bytearray(data)
242 self._maybe_pause_protocol()
243 else: # BACKED UP
244 # Append to buffer (also copies).
245 self._buffer.extend(data)
246 self._maybe_pause_protocol()
247
248 def _loop_writing(self, f=None, data=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 try:
250 assert f is self._write_fut
251 self._write_fut = None
252 if f:
253 f.result()
Guido van Rossumebb8e582013-12-04 12:12:07 -0800254 if data is None:
255 data = self._buffer
256 self._buffer = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 if not data:
258 if self._closing:
259 self._loop.call_soon(self._call_connection_lost, None)
260 if self._eof_written:
261 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossumebb8e582013-12-04 12:12:07 -0800262 else:
263 self._write_fut = self._loop._proactor.send(self._sock, data)
264 self._write_fut.add_done_callback(self._loop_writing)
265 # Now that we've reduced the buffer size, tell the
266 # protocol to resume writing if it was paused. Note that
267 # we do this last since the callback is called immediately
268 # and it may add more data to the buffer (even causing the
269 # protocol to be paused again).
270 self._maybe_resume_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 except ConnectionResetError as exc:
272 self._force_close(exc)
273 except OSError as exc:
274 self._fatal_error(exc)
275
276 def can_write_eof(self):
277 return True
278
279 def write_eof(self):
280 self.close()
281
282 def abort(self):
283 self._force_close(None)
284
285
286class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
287 _ProactorWritePipeTransport,
288 transports.Transport):
289 """Transport for duplex pipes."""
290
291 def can_write_eof(self):
292 return False
293
294 def write_eof(self):
295 raise NotImplementedError
296
297
298class _ProactorSocketTransport(_ProactorReadPipeTransport,
299 _ProactorWritePipeTransport,
300 transports.Transport):
301 """Transport for connected sockets."""
302
303 def _set_extra(self, sock):
304 self._extra['socket'] = sock
305 try:
306 self._extra['sockname'] = sock.getsockname()
307 except (socket.error, AttributeError):
308 pass
309 if 'peername' not in self._extra:
310 try:
311 self._extra['peername'] = sock.getpeername()
312 except (socket.error, AttributeError):
313 pass
314
315 def can_write_eof(self):
316 return True
317
318 def write_eof(self):
319 if self._closing or self._eof_written:
320 return
321 self._eof_written = True
322 if self._write_fut is None:
323 self._sock.shutdown(socket.SHUT_WR)
324
325
326class BaseProactorEventLoop(base_events.BaseEventLoop):
327
328 def __init__(self, proactor):
329 super().__init__()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700330 logger.debug('Using proactor: %s', proactor.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 self._proactor = proactor
332 self._selector = proactor # convenient alias
333 proactor.set_loop(self)
334 self._make_self_pipe()
335
336 def _make_socket_transport(self, sock, protocol, waiter=None,
337 extra=None, server=None):
338 return _ProactorSocketTransport(self, sock, protocol, waiter,
339 extra, server)
340
341 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
342 extra=None):
343 return _ProactorDuplexPipeTransport(self,
344 sock, protocol, waiter, extra)
345
346 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
347 extra=None):
348 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
349
350 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
Guido van Rossum59691282013-10-30 14:52:03 -0700351 extra=None, check_for_hangup=True):
352 if check_for_hangup:
353 # We want connection_lost() to be called when other end closes
354 return _ProactorDuplexPipeTransport(self,
355 sock, protocol, waiter, extra)
356 else:
357 # If other end closes we may not notice for a long time
358 return _ProactorWritePipeTransport(self, sock, protocol, waiter,
359 extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361 def close(self):
362 if self._proactor is not None:
363 self._close_self_pipe()
364 self._proactor.close()
365 self._proactor = None
366 self._selector = None
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200367 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
369 def sock_recv(self, sock, n):
370 return self._proactor.recv(sock, n)
371
372 def sock_sendall(self, sock, data):
373 return self._proactor.send(sock, data)
374
375 def sock_connect(self, sock, address):
376 return self._proactor.connect(sock, address)
377
378 def sock_accept(self, sock):
379 return self._proactor.accept(sock)
380
381 def _socketpair(self):
382 raise NotImplementedError
383
384 def _close_self_pipe(self):
385 self._ssock.close()
386 self._ssock = None
387 self._csock.close()
388 self._csock = None
389 self._internal_fds -= 1
390
391 def _make_self_pipe(self):
392 # A self-socket, really. :-)
393 self._ssock, self._csock = self._socketpair()
394 self._ssock.setblocking(False)
395 self._csock.setblocking(False)
396 self._internal_fds += 1
397 self.call_soon(self._loop_self_reading)
398
399 def _loop_self_reading(self, f=None):
400 try:
401 if f is not None:
402 f.result() # may raise
403 f = self._proactor.recv(self._ssock, 4096)
404 except:
405 self.close()
406 raise
407 else:
408 f.add_done_callback(self._loop_self_reading)
409
410 def _write_to_self(self):
411 self._csock.send(b'x')
412
413 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
Guido van Rossumebb8e582013-12-04 12:12:07 -0800414 if ssl:
415 raise ValueError('IocpEventLoop is incompatible with SSL.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 def loop(f=None):
418 try:
419 if f is not None:
420 conn, addr = f.result()
421 protocol = protocol_factory()
422 self._make_socket_transport(
423 conn, protocol,
424 extra={'peername': addr}, server=server)
425 f = self._proactor.accept(sock)
426 except OSError:
427 if sock.fileno() != -1:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700428 logger.exception('Accept failed')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 sock.close()
430 except futures.CancelledError:
431 sock.close()
432 else:
433 f.add_done_callback(loop)
434
435 self.call_soon(loop)
436
437 def _process_events(self, event_list):
438 pass # XXX hard work currently done in poll
439
440 def _stop_serving(self, sock):
441 self._proactor._stop_serving(sock)
442 sock.close()