blob: 3ce2db8d42dc8f4fc72abad60e485669b9b38753 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080033class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 """Unix event loop
35
36 Adds signal handling to SelectorEventLoop
37 """
38
39 def __init__(self, selector=None):
40 super().__init__(selector)
41 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
43 def _socketpair(self):
44 return socket.socketpair()
45
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080046 def close(self):
47 for sig in list(self._signal_handlers):
48 self.remove_signal_handler(sig)
49 super().close()
50
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 def add_signal_handler(self, sig, callback, *args):
52 """Add a handler for a signal. UNIX only.
53
54 Raise ValueError if the signal number is invalid or uncatchable.
55 Raise RuntimeError if there is a problem setting up the handler.
56 """
57 self._check_signal(sig)
58 try:
59 # set_wakeup_fd() raises ValueError if this is not the
60 # main thread. By calling it early we ensure that an
61 # event loop running in another thread cannot add a signal
62 # handler.
63 signal.set_wakeup_fd(self._csock.fileno())
64 except ValueError as exc:
65 raise RuntimeError(str(exc))
66
67 handle = events.make_handle(callback, args)
68 self._signal_handlers[sig] = handle
69
70 try:
71 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010072 # Set SA_RESTART to limit EINTR occurrences.
73 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 except OSError as exc:
75 del self._signal_handlers[sig]
76 if not self._signal_handlers:
77 try:
78 signal.set_wakeup_fd(-1)
79 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070080 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82 if exc.errno == errno.EINVAL:
83 raise RuntimeError('sig {} cannot be caught'.format(sig))
84 else:
85 raise
86
87 def _handle_signal(self, sig, arg):
88 """Internal helper that is the actual signal handler."""
89 handle = self._signal_handlers.get(sig)
90 if handle is None:
91 return # Assume it's some race condition.
92 if handle._cancelled:
93 self.remove_signal_handler(sig) # Remove it properly.
94 else:
95 self._add_callback_signalsafe(handle)
96
97 def remove_signal_handler(self, sig):
98 """Remove a handler for a signal. UNIX only.
99
100 Return True if a signal handler was removed, False if not.
101 """
102 self._check_signal(sig)
103 try:
104 del self._signal_handlers[sig]
105 except KeyError:
106 return False
107
108 if sig == signal.SIGINT:
109 handler = signal.default_int_handler
110 else:
111 handler = signal.SIG_DFL
112
113 try:
114 signal.signal(sig, handler)
115 except OSError as exc:
116 if exc.errno == errno.EINVAL:
117 raise RuntimeError('sig {} cannot be caught'.format(sig))
118 else:
119 raise
120
121 if not self._signal_handlers:
122 try:
123 signal.set_wakeup_fd(-1)
124 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700125 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 return True
128
129 def _check_signal(self, sig):
130 """Internal helper to validate a signal.
131
132 Raise ValueError if the signal number is invalid or uncatchable.
133 Raise RuntimeError if there is a problem setting up the handler.
134 """
135 if not isinstance(sig, int):
136 raise TypeError('sig must be an int, not {!r}'.format(sig))
137
138 if not (1 <= sig < signal.NSIG):
139 raise ValueError(
140 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
145
146 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
147 extra=None):
148 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
149
150 @tasks.coroutine
151 def _make_subprocess_transport(self, protocol, args, shell,
152 stdin, stdout, stderr, bufsize,
153 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800154 with events.get_child_watcher() as watcher:
155 transp = _UnixSubprocessTransport(self, protocol, args, shell,
156 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800157 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800158 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 watcher.add_child_handler(transp.get_pid(),
160 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800161
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 return transp
163
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 def _child_watcher_callback(self, pid, returncode, transp):
165 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167
168def _set_nonblocking(fd):
169 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
170 flags = flags | os.O_NONBLOCK
171 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
172
173
174class _UnixReadPipeTransport(transports.ReadTransport):
175
176 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
177
178 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
179 super().__init__(extra)
180 self._extra['pipe'] = pipe
181 self._loop = loop
182 self._pipe = pipe
183 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700184 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800185 if not (stat.S_ISFIFO(mode) or
186 stat.S_ISSOCK(mode) or
187 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700188 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 _set_nonblocking(self._fileno)
190 self._protocol = protocol
191 self._closing = False
192 self._loop.add_reader(self._fileno, self._read_ready)
193 self._loop.call_soon(self._protocol.connection_made, self)
194 if waiter is not None:
195 self._loop.call_soon(waiter.set_result, None)
196
197 def _read_ready(self):
198 try:
199 data = os.read(self._fileno, self.max_size)
200 except (BlockingIOError, InterruptedError):
201 pass
202 except OSError as exc:
203 self._fatal_error(exc)
204 else:
205 if data:
206 self._protocol.data_received(data)
207 else:
208 self._closing = True
209 self._loop.remove_reader(self._fileno)
210 self._loop.call_soon(self._protocol.eof_received)
211 self._loop.call_soon(self._call_connection_lost, None)
212
Guido van Rossum57497ad2013-10-18 07:58:20 -0700213 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 self._loop.remove_reader(self._fileno)
215
Guido van Rossum57497ad2013-10-18 07:58:20 -0700216 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 self._loop.add_reader(self._fileno, self._read_ready)
218
219 def close(self):
220 if not self._closing:
221 self._close(None)
222
223 def _fatal_error(self, exc):
224 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800225 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
226 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 self._close(exc)
228
229 def _close(self, exc):
230 self._closing = True
231 self._loop.remove_reader(self._fileno)
232 self._loop.call_soon(self._call_connection_lost, exc)
233
234 def _call_connection_lost(self, exc):
235 try:
236 self._protocol.connection_lost(exc)
237 finally:
238 self._pipe.close()
239 self._pipe = None
240 self._protocol = None
241 self._loop = None
242
243
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800244class _UnixWritePipeTransport(selector_events._FlowControlMixin,
245 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246
247 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
248 super().__init__(extra)
249 self._extra['pipe'] = pipe
250 self._loop = loop
251 self._pipe = pipe
252 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700253 mode = os.fstat(self._fileno).st_mode
254 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100255 if not (is_socket or
256 stat.S_ISFIFO(mode) or
257 stat.S_ISCHR(mode)):
258 raise ValueError("Pipe transport is only for "
259 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 _set_nonblocking(self._fileno)
261 self._protocol = protocol
262 self._buffer = []
263 self._conn_lost = 0
264 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700265
266 # On AIX, the reader trick only works for sockets.
267 # On other platforms it works for pipes and sockets.
268 # (Exception: OS X 10.4? Issue #19294.)
269 if is_socket or not sys.platform.startswith("aix"):
270 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
272 self._loop.call_soon(self._protocol.connection_made, self)
273 if waiter is not None:
274 self._loop.call_soon(waiter.set_result, None)
275
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800276 def get_write_buffer_size(self):
277 return sum(len(data) for data in self._buffer)
278
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700280 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100281 if self._buffer:
282 self._close(BrokenPipeError())
283 else:
284 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
286 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800287 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
288 if isinstance(data, bytearray):
289 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 if not data:
291 return
292
293 if self._conn_lost or self._closing:
294 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700295 logger.warning('pipe closed by peer or '
296 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 self._conn_lost += 1
298 return
299
300 if not self._buffer:
301 # Attempt to send it right away first.
302 try:
303 n = os.write(self._fileno, data)
304 except (BlockingIOError, InterruptedError):
305 n = 0
306 except Exception as exc:
307 self._conn_lost += 1
308 self._fatal_error(exc)
309 return
310 if n == len(data):
311 return
312 elif n > 0:
313 data = data[n:]
314 self._loop.add_writer(self._fileno, self._write_ready)
315
316 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800317 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318
319 def _write_ready(self):
320 data = b''.join(self._buffer)
321 assert data, 'Data should not be empty'
322
323 self._buffer.clear()
324 try:
325 n = os.write(self._fileno, data)
326 except (BlockingIOError, InterruptedError):
327 self._buffer.append(data)
328 except Exception as exc:
329 self._conn_lost += 1
330 # Remove writer here, _fatal_error() doesn't it
331 # because _buffer is empty.
332 self._loop.remove_writer(self._fileno)
333 self._fatal_error(exc)
334 else:
335 if n == len(data):
336 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800337 self._maybe_resume_protocol() # May append to buffer.
338 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 self._loop.remove_reader(self._fileno)
340 self._call_connection_lost(None)
341 return
342 elif n > 0:
343 data = data[n:]
344
345 self._buffer.append(data) # Try again later.
346
347 def can_write_eof(self):
348 return True
349
350 # TODO: Make the relationships between write_eof(), close(),
351 # abort(), _fatal_error() and _close() more straightforward.
352
353 def write_eof(self):
354 if self._closing:
355 return
356 assert self._pipe
357 self._closing = True
358 if not self._buffer:
359 self._loop.remove_reader(self._fileno)
360 self._loop.call_soon(self._call_connection_lost, None)
361
362 def close(self):
363 if not self._closing:
364 # write_eof is all what we needed to close the write pipe
365 self.write_eof()
366
367 def abort(self):
368 self._close(None)
369
370 def _fatal_error(self, exc):
371 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800372 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
373 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 self._close(exc)
375
376 def _close(self, exc=None):
377 self._closing = True
378 if self._buffer:
379 self._loop.remove_writer(self._fileno)
380 self._buffer.clear()
381 self._loop.remove_reader(self._fileno)
382 self._loop.call_soon(self._call_connection_lost, exc)
383
384 def _call_connection_lost(self, exc):
385 try:
386 self._protocol.connection_lost(exc)
387 finally:
388 self._pipe.close()
389 self._pipe = None
390 self._protocol = None
391 self._loop = None
392
393
Guido van Rossum59691282013-10-30 14:52:03 -0700394class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
Guido van Rossum59691282013-10-30 14:52:03 -0700396 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700397 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700399 # Use a socket pair for stdin, since not all platforms
400 # support selecting read events on the write end of a
401 # socket (which we use in order to detect closing of the
402 # other end). Notably this is needed on AIX, and works
403 # just fine on other platforms.
404 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 self._proc = subprocess.Popen(
406 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
407 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700408 if stdin_w is not None:
409 stdin.close()
410 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800411
412
413class AbstractChildWatcher:
414 """Abstract base class for monitoring child processes.
415
416 Objects derived from this class monitor a collection of subprocesses and
417 report their termination or interruption by a signal.
418
419 New callbacks are registered with .add_child_handler(). Starting a new
420 process must be done within a 'with' block to allow the watcher to suspend
421 its activity until the new process if fully registered (this is needed to
422 prevent a race condition in some implementations).
423
424 Example:
425 with watcher:
426 proc = subprocess.Popen("sleep 1")
427 watcher.add_child_handler(proc.pid, callback)
428
429 Notes:
430 Implementations of this class must be thread-safe.
431
432 Since child watcher objects may catch the SIGCHLD signal and call
433 waitpid(-1), there should be only one active object per process.
434 """
435
436 def add_child_handler(self, pid, callback, *args):
437 """Register a new child handler.
438
439 Arrange for callback(pid, returncode, *args) to be called when
440 process 'pid' terminates. Specifying another callback for the same
441 process replaces the previous handler.
442
443 Note: callback() must be thread-safe
444 """
445 raise NotImplementedError()
446
447 def remove_child_handler(self, pid):
448 """Removes the handler for process 'pid'.
449
450 The function returns True if the handler was successfully removed,
451 False if there was nothing to remove."""
452
453 raise NotImplementedError()
454
Guido van Rossum2bcae702013-11-13 15:50:08 -0800455 def attach_loop(self, loop):
456 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800457
Guido van Rossum2bcae702013-11-13 15:50:08 -0800458 If the watcher was previously attached to an event loop, then it is
459 first detached before attaching to the new loop.
460
461 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800462 """
463 raise NotImplementedError()
464
465 def close(self):
466 """Close the watcher.
467
468 This must be called to make sure that any underlying resource is freed.
469 """
470 raise NotImplementedError()
471
472 def __enter__(self):
473 """Enter the watcher's context and allow starting new processes
474
475 This function must return self"""
476 raise NotImplementedError()
477
478 def __exit__(self, a, b, c):
479 """Exit the watcher's context"""
480 raise NotImplementedError()
481
482
483class BaseChildWatcher(AbstractChildWatcher):
484
Guido van Rossum2bcae702013-11-13 15:50:08 -0800485 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800486 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800487
488 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800489 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800490
491 def _do_waitpid(self, expected_pid):
492 raise NotImplementedError()
493
494 def _do_waitpid_all(self):
495 raise NotImplementedError()
496
Guido van Rossum2bcae702013-11-13 15:50:08 -0800497 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800498 assert loop is None or isinstance(loop, events.AbstractEventLoop)
499
500 if self._loop is not None:
501 self._loop.remove_signal_handler(signal.SIGCHLD)
502
503 self._loop = loop
504 if loop is not None:
505 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
506
507 # Prevent a race condition in case a child terminated
508 # during the switch.
509 self._do_waitpid_all()
510
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800511 def _sig_chld(self):
512 try:
513 self._do_waitpid_all()
514 except Exception:
515 logger.exception('Unknown exception in SIGCHLD handler')
516
517 def _compute_returncode(self, status):
518 if os.WIFSIGNALED(status):
519 # The child process died because of a signal.
520 return -os.WTERMSIG(status)
521 elif os.WIFEXITED(status):
522 # The child process exited (e.g sys.exit()).
523 return os.WEXITSTATUS(status)
524 else:
525 # The child exited, but we don't understand its status.
526 # This shouldn't happen, but if it does, let's just
527 # return that status; perhaps that helps debug it.
528 return status
529
530
531class SafeChildWatcher(BaseChildWatcher):
532 """'Safe' child watcher implementation.
533
534 This implementation avoids disrupting other code spawning processes by
535 polling explicitly each process in the SIGCHLD handler instead of calling
536 os.waitpid(-1).
537
538 This is a safe solution but it has a significant overhead when handling a
539 big number of children (O(n) each time SIGCHLD is raised)
540 """
541
Guido van Rossum2bcae702013-11-13 15:50:08 -0800542 def __init__(self):
543 super().__init__()
544 self._callbacks = {}
545
546 def close(self):
547 self._callbacks.clear()
548 super().close()
549
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800550 def __enter__(self):
551 return self
552
553 def __exit__(self, a, b, c):
554 pass
555
556 def add_child_handler(self, pid, callback, *args):
557 self._callbacks[pid] = callback, args
558
559 # Prevent a race condition in case the child is already terminated.
560 self._do_waitpid(pid)
561
Guido van Rossum2bcae702013-11-13 15:50:08 -0800562 def remove_child_handler(self, pid):
563 try:
564 del self._callbacks[pid]
565 return True
566 except KeyError:
567 return False
568
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800569 def _do_waitpid_all(self):
570
571 for pid in list(self._callbacks):
572 self._do_waitpid(pid)
573
574 def _do_waitpid(self, expected_pid):
575 assert expected_pid > 0
576
577 try:
578 pid, status = os.waitpid(expected_pid, os.WNOHANG)
579 except ChildProcessError:
580 # The child process is already reaped
581 # (may happen if waitpid() is called elsewhere).
582 pid = expected_pid
583 returncode = 255
584 logger.warning(
585 "Unknown child process pid %d, will report returncode 255",
586 pid)
587 else:
588 if pid == 0:
589 # The child process is still alive.
590 return
591
592 returncode = self._compute_returncode(status)
593
594 try:
595 callback, args = self._callbacks.pop(pid)
596 except KeyError: # pragma: no cover
597 # May happen if .remove_child_handler() is called
598 # after os.waitpid() returns.
599 pass
600 else:
601 callback(pid, returncode, *args)
602
603
604class FastChildWatcher(BaseChildWatcher):
605 """'Fast' child watcher implementation.
606
607 This implementation reaps every terminated processes by calling
608 os.waitpid(-1) directly, possibly breaking other code spawning processes
609 and waiting for their termination.
610
611 There is no noticeable overhead when handling a big number of children
612 (O(1) each time a child terminates).
613 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800614 def __init__(self):
615 super().__init__()
616 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800617 self._lock = threading.Lock()
618 self._zombies = {}
619 self._forks = 0
620
621 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800622 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800623 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800624 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800625
626 def __enter__(self):
627 with self._lock:
628 self._forks += 1
629
630 return self
631
632 def __exit__(self, a, b, c):
633 with self._lock:
634 self._forks -= 1
635
636 if self._forks or not self._zombies:
637 return
638
639 collateral_victims = str(self._zombies)
640 self._zombies.clear()
641
642 logger.warning(
643 "Caught subprocesses termination from unknown pids: %s",
644 collateral_victims)
645
646 def add_child_handler(self, pid, callback, *args):
647 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800648 with self._lock:
649 try:
650 returncode = self._zombies.pop(pid)
651 except KeyError:
652 # The child is running.
653 self._callbacks[pid] = callback, args
654 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800655
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800656 # The child is dead already. We can fire the callback.
657 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800658
Guido van Rossum2bcae702013-11-13 15:50:08 -0800659 def remove_child_handler(self, pid):
660 try:
661 del self._callbacks[pid]
662 return True
663 except KeyError:
664 return False
665
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666 def _do_waitpid_all(self):
667 # Because of signal coalescing, we must keep calling waitpid() as
668 # long as we're able to reap a child.
669 while True:
670 try:
671 pid, status = os.waitpid(-1, os.WNOHANG)
672 except ChildProcessError:
673 # No more child processes exist.
674 return
675 else:
676 if pid == 0:
677 # A child process is still alive.
678 return
679
680 returncode = self._compute_returncode(status)
681
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800682 with self._lock:
683 try:
684 callback, args = self._callbacks.pop(pid)
685 except KeyError:
686 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800687 if self._forks:
688 # It may not be registered yet.
689 self._zombies[pid] = returncode
690 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800691 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800692
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800693 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800694 logger.warning(
695 "Caught subprocess termination from unknown pid: "
696 "%d -> %d", pid, returncode)
697 else:
698 callback(pid, returncode, *args)
699
700
701class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
702 """XXX"""
703 _loop_factory = _UnixSelectorEventLoop
704
705 def __init__(self):
706 super().__init__()
707 self._watcher = None
708
709 def _init_watcher(self):
710 with events._lock:
711 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713 if isinstance(threading.current_thread(),
714 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800715 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800716
717 def set_event_loop(self, loop):
718 """Set the event loop.
719
720 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800721 .set_event_loop() from the main thread will call .attach_loop(loop) on
722 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723 """
724
725 super().set_event_loop(loop)
726
727 if self._watcher is not None and \
728 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800729 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730
731 def get_child_watcher(self):
732 """Get the child watcher
733
734 If not yet set, a SafeChildWatcher object is automatically created.
735 """
736 if self._watcher is None:
737 self._init_watcher()
738
739 return self._watcher
740
741 def set_child_watcher(self, watcher):
742 """Set the child watcher"""
743
744 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
745
746 if self._watcher is not None:
747 self._watcher.close()
748
749 self._watcher = watcher
750
751SelectorEventLoop = _UnixSelectorEventLoop
752DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy