blob: ad4c229438cd17822f5326cb38ac8e874c13769e [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080033class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050034 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Yury Selivanovb057c522014-02-18 12:15:06 -050036 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 """
38
39 def __init__(self, selector=None):
40 super().__init__(selector)
41 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
43 def _socketpair(self):
44 return socket.socketpair()
45
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080046 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020047 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080048 for sig in list(self._signal_handlers):
49 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080050
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 def add_signal_handler(self, sig, callback, *args):
52 """Add a handler for a signal. UNIX only.
53
54 Raise ValueError if the signal number is invalid or uncatchable.
55 Raise RuntimeError if there is a problem setting up the handler.
56 """
57 self._check_signal(sig)
58 try:
59 # set_wakeup_fd() raises ValueError if this is not the
60 # main thread. By calling it early we ensure that an
61 # event loop running in another thread cannot add a signal
62 # handler.
63 signal.set_wakeup_fd(self._csock.fileno())
64 except ValueError as exc:
65 raise RuntimeError(str(exc))
66
Yury Selivanov569efa22014-02-18 18:02:19 -050067 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 self._signal_handlers[sig] = handle
69
70 try:
71 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010072 # Set SA_RESTART to limit EINTR occurrences.
73 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 except OSError as exc:
75 del self._signal_handlers[sig]
76 if not self._signal_handlers:
77 try:
78 signal.set_wakeup_fd(-1)
79 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070080 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82 if exc.errno == errno.EINVAL:
83 raise RuntimeError('sig {} cannot be caught'.format(sig))
84 else:
85 raise
86
87 def _handle_signal(self, sig, arg):
88 """Internal helper that is the actual signal handler."""
89 handle = self._signal_handlers.get(sig)
90 if handle is None:
91 return # Assume it's some race condition.
92 if handle._cancelled:
93 self.remove_signal_handler(sig) # Remove it properly.
94 else:
95 self._add_callback_signalsafe(handle)
96
97 def remove_signal_handler(self, sig):
98 """Remove a handler for a signal. UNIX only.
99
100 Return True if a signal handler was removed, False if not.
101 """
102 self._check_signal(sig)
103 try:
104 del self._signal_handlers[sig]
105 except KeyError:
106 return False
107
108 if sig == signal.SIGINT:
109 handler = signal.default_int_handler
110 else:
111 handler = signal.SIG_DFL
112
113 try:
114 signal.signal(sig, handler)
115 except OSError as exc:
116 if exc.errno == errno.EINVAL:
117 raise RuntimeError('sig {} cannot be caught'.format(sig))
118 else:
119 raise
120
121 if not self._signal_handlers:
122 try:
123 signal.set_wakeup_fd(-1)
124 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700125 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 return True
128
129 def _check_signal(self, sig):
130 """Internal helper to validate a signal.
131
132 Raise ValueError if the signal number is invalid or uncatchable.
133 Raise RuntimeError if there is a problem setting up the handler.
134 """
135 if not isinstance(sig, int):
136 raise TypeError('sig must be an int, not {!r}'.format(sig))
137
138 if not (1 <= sig < signal.NSIG):
139 raise ValueError(
140 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
145
146 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
147 extra=None):
148 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
149
150 @tasks.coroutine
151 def _make_subprocess_transport(self, protocol, args, shell,
152 stdin, stdout, stderr, bufsize,
153 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800154 with events.get_child_watcher() as watcher:
155 transp = _UnixSubprocessTransport(self, protocol, args, shell,
156 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800157 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800158 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 watcher.add_child_handler(transp.get_pid(),
160 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800161
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 return transp
163
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 def _child_watcher_callback(self, pid, returncode, transp):
165 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
Yury Selivanovb057c522014-02-18 12:15:06 -0500167 @tasks.coroutine
168 def create_unix_connection(self, protocol_factory, path, *,
169 ssl=None, sock=None,
170 server_hostname=None):
171 assert server_hostname is None or isinstance(server_hostname, str)
172 if ssl:
173 if server_hostname is None:
174 raise ValueError(
175 'you have to pass server_hostname when using ssl')
176 else:
177 if server_hostname is not None:
178 raise ValueError('server_hostname is only meaningful with ssl')
179
180 if path is not None:
181 if sock is not None:
182 raise ValueError(
183 'path and sock can not be specified at the same time')
184
Victor Stinner79a29522014-02-19 01:45:59 +0100185 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500186 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 sock.setblocking(False)
188 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100189 except:
190 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500191 raise
192
193 else:
194 if sock is None:
195 raise ValueError('no path and sock were specified')
196 sock.setblocking(False)
197
198 transport, protocol = yield from self._create_connection_transport(
199 sock, protocol_factory, ssl, server_hostname)
200 return transport, protocol
201
202 @tasks.coroutine
203 def create_unix_server(self, protocol_factory, path=None, *,
204 sock=None, backlog=100, ssl=None):
205 if isinstance(ssl, bool):
206 raise TypeError('ssl argument must be an SSLContext or None')
207
208 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200209 if sock is not None:
210 raise ValueError(
211 'path and sock can not be specified at the same time')
212
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
214
215 try:
216 sock.bind(path)
217 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100218 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 if exc.errno == errno.EADDRINUSE:
220 # Let's improve the error message by adding
221 # with what exact address it occurs.
222 msg = 'Address {!r} is already in use'.format(path)
223 raise OSError(errno.EADDRINUSE, msg) from None
224 else:
225 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200226 except:
227 sock.close()
228 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 else:
230 if sock is None:
231 raise ValueError(
232 'path was not specified, and no sock specified')
233
234 if sock.family != socket.AF_UNIX:
235 raise ValueError(
236 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
237
238 server = base_events.Server(self, [sock])
239 sock.listen(backlog)
240 sock.setblocking(False)
241 self._start_serving(protocol_factory, sock, ssl, server)
242 return server
243
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700244
245def _set_nonblocking(fd):
246 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
247 flags = flags | os.O_NONBLOCK
248 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
249
250
251class _UnixReadPipeTransport(transports.ReadTransport):
252
Yury Selivanovdec1a452014-02-18 22:27:48 -0500253 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
255 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
256 super().__init__(extra)
257 self._extra['pipe'] = pipe
258 self._loop = loop
259 self._pipe = pipe
260 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700261 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800262 if not (stat.S_ISFIFO(mode) or
263 stat.S_ISSOCK(mode) or
264 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700265 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 _set_nonblocking(self._fileno)
267 self._protocol = protocol
268 self._closing = False
269 self._loop.add_reader(self._fileno, self._read_ready)
270 self._loop.call_soon(self._protocol.connection_made, self)
271 if waiter is not None:
272 self._loop.call_soon(waiter.set_result, None)
273
274 def _read_ready(self):
275 try:
276 data = os.read(self._fileno, self.max_size)
277 except (BlockingIOError, InterruptedError):
278 pass
279 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100280 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 else:
282 if data:
283 self._protocol.data_received(data)
284 else:
285 self._closing = True
286 self._loop.remove_reader(self._fileno)
287 self._loop.call_soon(self._protocol.eof_received)
288 self._loop.call_soon(self._call_connection_lost, None)
289
Guido van Rossum57497ad2013-10-18 07:58:20 -0700290 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 self._loop.remove_reader(self._fileno)
292
Guido van Rossum57497ad2013-10-18 07:58:20 -0700293 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 self._loop.add_reader(self._fileno, self._read_ready)
295
296 def close(self):
297 if not self._closing:
298 self._close(None)
299
Victor Stinner0ee29c22014-02-19 01:40:41 +0100300 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800302 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500303 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100304 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500305 'exception': exc,
306 'transport': self,
307 'protocol': self._protocol,
308 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 self._close(exc)
310
311 def _close(self, exc):
312 self._closing = True
313 self._loop.remove_reader(self._fileno)
314 self._loop.call_soon(self._call_connection_lost, exc)
315
316 def _call_connection_lost(self, exc):
317 try:
318 self._protocol.connection_lost(exc)
319 finally:
320 self._pipe.close()
321 self._pipe = None
322 self._protocol = None
323 self._loop = None
324
325
Yury Selivanov3cb99142014-02-18 18:41:13 -0500326class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800327 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328
329 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
330 super().__init__(extra)
331 self._extra['pipe'] = pipe
332 self._loop = loop
333 self._pipe = pipe
334 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700335 mode = os.fstat(self._fileno).st_mode
336 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100337 if not (is_socket or
338 stat.S_ISFIFO(mode) or
339 stat.S_ISCHR(mode)):
340 raise ValueError("Pipe transport is only for "
341 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 _set_nonblocking(self._fileno)
343 self._protocol = protocol
344 self._buffer = []
345 self._conn_lost = 0
346 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700347
348 # On AIX, the reader trick only works for sockets.
349 # On other platforms it works for pipes and sockets.
350 # (Exception: OS X 10.4? Issue #19294.)
351 if is_socket or not sys.platform.startswith("aix"):
352 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 self._loop.call_soon(self._protocol.connection_made, self)
355 if waiter is not None:
356 self._loop.call_soon(waiter.set_result, None)
357
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800358 def get_write_buffer_size(self):
359 return sum(len(data) for data in self._buffer)
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700362 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100363 if self._buffer:
364 self._close(BrokenPipeError())
365 else:
366 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
368 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800369 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
370 if isinstance(data, bytearray):
371 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 if not data:
373 return
374
375 if self._conn_lost or self._closing:
376 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700377 logger.warning('pipe closed by peer or '
378 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._conn_lost += 1
380 return
381
382 if not self._buffer:
383 # Attempt to send it right away first.
384 try:
385 n = os.write(self._fileno, data)
386 except (BlockingIOError, InterruptedError):
387 n = 0
388 except Exception as exc:
389 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100390 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 return
392 if n == len(data):
393 return
394 elif n > 0:
395 data = data[n:]
396 self._loop.add_writer(self._fileno, self._write_ready)
397
398 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800399 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401 def _write_ready(self):
402 data = b''.join(self._buffer)
403 assert data, 'Data should not be empty'
404
405 self._buffer.clear()
406 try:
407 n = os.write(self._fileno, data)
408 except (BlockingIOError, InterruptedError):
409 self._buffer.append(data)
410 except Exception as exc:
411 self._conn_lost += 1
412 # Remove writer here, _fatal_error() doesn't it
413 # because _buffer is empty.
414 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100415 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 else:
417 if n == len(data):
418 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800419 self._maybe_resume_protocol() # May append to buffer.
420 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._loop.remove_reader(self._fileno)
422 self._call_connection_lost(None)
423 return
424 elif n > 0:
425 data = data[n:]
426
427 self._buffer.append(data) # Try again later.
428
429 def can_write_eof(self):
430 return True
431
432 # TODO: Make the relationships between write_eof(), close(),
433 # abort(), _fatal_error() and _close() more straightforward.
434
435 def write_eof(self):
436 if self._closing:
437 return
438 assert self._pipe
439 self._closing = True
440 if not self._buffer:
441 self._loop.remove_reader(self._fileno)
442 self._loop.call_soon(self._call_connection_lost, None)
443
444 def close(self):
445 if not self._closing:
446 # write_eof is all what we needed to close the write pipe
447 self.write_eof()
448
449 def abort(self):
450 self._close(None)
451
Victor Stinner0ee29c22014-02-19 01:40:41 +0100452 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800454 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500455 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100456 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500457 'exception': exc,
458 'transport': self,
459 'protocol': self._protocol,
460 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._close(exc)
462
463 def _close(self, exc=None):
464 self._closing = True
465 if self._buffer:
466 self._loop.remove_writer(self._fileno)
467 self._buffer.clear()
468 self._loop.remove_reader(self._fileno)
469 self._loop.call_soon(self._call_connection_lost, exc)
470
471 def _call_connection_lost(self, exc):
472 try:
473 self._protocol.connection_lost(exc)
474 finally:
475 self._pipe.close()
476 self._pipe = None
477 self._protocol = None
478 self._loop = None
479
480
Guido van Rossum59691282013-10-30 14:52:03 -0700481class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482
Guido van Rossum59691282013-10-30 14:52:03 -0700483 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700484 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700486 # Use a socket pair for stdin, since not all platforms
487 # support selecting read events on the write end of a
488 # socket (which we use in order to detect closing of the
489 # other end). Notably this is needed on AIX, and works
490 # just fine on other platforms.
491 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._proc = subprocess.Popen(
493 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
494 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700495 if stdin_w is not None:
496 stdin.close()
497 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800498
499
500class AbstractChildWatcher:
501 """Abstract base class for monitoring child processes.
502
503 Objects derived from this class monitor a collection of subprocesses and
504 report their termination or interruption by a signal.
505
506 New callbacks are registered with .add_child_handler(). Starting a new
507 process must be done within a 'with' block to allow the watcher to suspend
508 its activity until the new process if fully registered (this is needed to
509 prevent a race condition in some implementations).
510
511 Example:
512 with watcher:
513 proc = subprocess.Popen("sleep 1")
514 watcher.add_child_handler(proc.pid, callback)
515
516 Notes:
517 Implementations of this class must be thread-safe.
518
519 Since child watcher objects may catch the SIGCHLD signal and call
520 waitpid(-1), there should be only one active object per process.
521 """
522
523 def add_child_handler(self, pid, callback, *args):
524 """Register a new child handler.
525
526 Arrange for callback(pid, returncode, *args) to be called when
527 process 'pid' terminates. Specifying another callback for the same
528 process replaces the previous handler.
529
530 Note: callback() must be thread-safe
531 """
532 raise NotImplementedError()
533
534 def remove_child_handler(self, pid):
535 """Removes the handler for process 'pid'.
536
537 The function returns True if the handler was successfully removed,
538 False if there was nothing to remove."""
539
540 raise NotImplementedError()
541
Guido van Rossum2bcae702013-11-13 15:50:08 -0800542 def attach_loop(self, loop):
543 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800544
Guido van Rossum2bcae702013-11-13 15:50:08 -0800545 If the watcher was previously attached to an event loop, then it is
546 first detached before attaching to the new loop.
547
548 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800549 """
550 raise NotImplementedError()
551
552 def close(self):
553 """Close the watcher.
554
555 This must be called to make sure that any underlying resource is freed.
556 """
557 raise NotImplementedError()
558
559 def __enter__(self):
560 """Enter the watcher's context and allow starting new processes
561
562 This function must return self"""
563 raise NotImplementedError()
564
565 def __exit__(self, a, b, c):
566 """Exit the watcher's context"""
567 raise NotImplementedError()
568
569
570class BaseChildWatcher(AbstractChildWatcher):
571
Guido van Rossum2bcae702013-11-13 15:50:08 -0800572 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800573 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800574
575 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800576 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800577
578 def _do_waitpid(self, expected_pid):
579 raise NotImplementedError()
580
581 def _do_waitpid_all(self):
582 raise NotImplementedError()
583
Guido van Rossum2bcae702013-11-13 15:50:08 -0800584 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800585 assert loop is None or isinstance(loop, events.AbstractEventLoop)
586
587 if self._loop is not None:
588 self._loop.remove_signal_handler(signal.SIGCHLD)
589
590 self._loop = loop
591 if loop is not None:
592 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
593
594 # Prevent a race condition in case a child terminated
595 # during the switch.
596 self._do_waitpid_all()
597
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800598 def _sig_chld(self):
599 try:
600 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500601 except Exception as exc:
602 # self._loop should always be available here
603 # as '_sig_chld' is added as a signal handler
604 # in 'attach_loop'
605 self._loop.call_exception_handler({
606 'message': 'Unknown exception in SIGCHLD handler',
607 'exception': exc,
608 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800609
610 def _compute_returncode(self, status):
611 if os.WIFSIGNALED(status):
612 # The child process died because of a signal.
613 return -os.WTERMSIG(status)
614 elif os.WIFEXITED(status):
615 # The child process exited (e.g sys.exit()).
616 return os.WEXITSTATUS(status)
617 else:
618 # The child exited, but we don't understand its status.
619 # This shouldn't happen, but if it does, let's just
620 # return that status; perhaps that helps debug it.
621 return status
622
623
624class SafeChildWatcher(BaseChildWatcher):
625 """'Safe' child watcher implementation.
626
627 This implementation avoids disrupting other code spawning processes by
628 polling explicitly each process in the SIGCHLD handler instead of calling
629 os.waitpid(-1).
630
631 This is a safe solution but it has a significant overhead when handling a
632 big number of children (O(n) each time SIGCHLD is raised)
633 """
634
Guido van Rossum2bcae702013-11-13 15:50:08 -0800635 def __init__(self):
636 super().__init__()
637 self._callbacks = {}
638
639 def close(self):
640 self._callbacks.clear()
641 super().close()
642
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800643 def __enter__(self):
644 return self
645
646 def __exit__(self, a, b, c):
647 pass
648
649 def add_child_handler(self, pid, callback, *args):
650 self._callbacks[pid] = callback, args
651
652 # Prevent a race condition in case the child is already terminated.
653 self._do_waitpid(pid)
654
Guido van Rossum2bcae702013-11-13 15:50:08 -0800655 def remove_child_handler(self, pid):
656 try:
657 del self._callbacks[pid]
658 return True
659 except KeyError:
660 return False
661
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800662 def _do_waitpid_all(self):
663
664 for pid in list(self._callbacks):
665 self._do_waitpid(pid)
666
667 def _do_waitpid(self, expected_pid):
668 assert expected_pid > 0
669
670 try:
671 pid, status = os.waitpid(expected_pid, os.WNOHANG)
672 except ChildProcessError:
673 # The child process is already reaped
674 # (may happen if waitpid() is called elsewhere).
675 pid = expected_pid
676 returncode = 255
677 logger.warning(
678 "Unknown child process pid %d, will report returncode 255",
679 pid)
680 else:
681 if pid == 0:
682 # The child process is still alive.
683 return
684
685 returncode = self._compute_returncode(status)
686
687 try:
688 callback, args = self._callbacks.pop(pid)
689 except KeyError: # pragma: no cover
690 # May happen if .remove_child_handler() is called
691 # after os.waitpid() returns.
692 pass
693 else:
694 callback(pid, returncode, *args)
695
696
697class FastChildWatcher(BaseChildWatcher):
698 """'Fast' child watcher implementation.
699
700 This implementation reaps every terminated processes by calling
701 os.waitpid(-1) directly, possibly breaking other code spawning processes
702 and waiting for their termination.
703
704 There is no noticeable overhead when handling a big number of children
705 (O(1) each time a child terminates).
706 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800707 def __init__(self):
708 super().__init__()
709 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710 self._lock = threading.Lock()
711 self._zombies = {}
712 self._forks = 0
713
714 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800715 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800716 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800717 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800718
719 def __enter__(self):
720 with self._lock:
721 self._forks += 1
722
723 return self
724
725 def __exit__(self, a, b, c):
726 with self._lock:
727 self._forks -= 1
728
729 if self._forks or not self._zombies:
730 return
731
732 collateral_victims = str(self._zombies)
733 self._zombies.clear()
734
735 logger.warning(
736 "Caught subprocesses termination from unknown pids: %s",
737 collateral_victims)
738
739 def add_child_handler(self, pid, callback, *args):
740 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800741 with self._lock:
742 try:
743 returncode = self._zombies.pop(pid)
744 except KeyError:
745 # The child is running.
746 self._callbacks[pid] = callback, args
747 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800748
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800749 # The child is dead already. We can fire the callback.
750 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800751
Guido van Rossum2bcae702013-11-13 15:50:08 -0800752 def remove_child_handler(self, pid):
753 try:
754 del self._callbacks[pid]
755 return True
756 except KeyError:
757 return False
758
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800759 def _do_waitpid_all(self):
760 # Because of signal coalescing, we must keep calling waitpid() as
761 # long as we're able to reap a child.
762 while True:
763 try:
764 pid, status = os.waitpid(-1, os.WNOHANG)
765 except ChildProcessError:
766 # No more child processes exist.
767 return
768 else:
769 if pid == 0:
770 # A child process is still alive.
771 return
772
773 returncode = self._compute_returncode(status)
774
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800775 with self._lock:
776 try:
777 callback, args = self._callbacks.pop(pid)
778 except KeyError:
779 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780 if self._forks:
781 # It may not be registered yet.
782 self._zombies[pid] = returncode
783 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800784 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800785
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800786 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800787 logger.warning(
788 "Caught subprocess termination from unknown pid: "
789 "%d -> %d", pid, returncode)
790 else:
791 callback(pid, returncode, *args)
792
793
794class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
795 """XXX"""
796 _loop_factory = _UnixSelectorEventLoop
797
798 def __init__(self):
799 super().__init__()
800 self._watcher = None
801
802 def _init_watcher(self):
803 with events._lock:
804 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800805 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806 if isinstance(threading.current_thread(),
807 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800808 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809
810 def set_event_loop(self, loop):
811 """Set the event loop.
812
813 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800814 .set_event_loop() from the main thread will call .attach_loop(loop) on
815 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800816 """
817
818 super().set_event_loop(loop)
819
820 if self._watcher is not None and \
821 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800822 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800823
824 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200825 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800826
827 If not yet set, a SafeChildWatcher object is automatically created.
828 """
829 if self._watcher is None:
830 self._init_watcher()
831
832 return self._watcher
833
834 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200835 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800836
837 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
838
839 if self._watcher is not None:
840 self._watcher.close()
841
842 self._watcher = watcher
843
844SelectorEventLoop = _UnixSelectorEventLoop
845DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy