blob: ac764f8ad56c15580d345274c52546b90a9f5003 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800162 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800190 if not (stat.S_ISFIFO(mode) or
191 stat.S_ISSOCK(mode) or
192 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700193 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 _set_nonblocking(self._fileno)
195 self._protocol = protocol
196 self._closing = False
197 self._loop.add_reader(self._fileno, self._read_ready)
198 self._loop.call_soon(self._protocol.connection_made, self)
199 if waiter is not None:
200 self._loop.call_soon(waiter.set_result, None)
201
202 def _read_ready(self):
203 try:
204 data = os.read(self._fileno, self.max_size)
205 except (BlockingIOError, InterruptedError):
206 pass
207 except OSError as exc:
208 self._fatal_error(exc)
209 else:
210 if data:
211 self._protocol.data_received(data)
212 else:
213 self._closing = True
214 self._loop.remove_reader(self._fileno)
215 self._loop.call_soon(self._protocol.eof_received)
216 self._loop.call_soon(self._call_connection_lost, None)
217
Guido van Rossum57497ad2013-10-18 07:58:20 -0700218 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 self._loop.remove_reader(self._fileno)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.add_reader(self._fileno, self._read_ready)
223
224 def close(self):
225 if not self._closing:
226 self._close(None)
227
228 def _fatal_error(self, exc):
229 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800230 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
231 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 self._close(exc)
233
234 def _close(self, exc):
235 self._closing = True
236 self._loop.remove_reader(self._fileno)
237 self._loop.call_soon(self._call_connection_lost, exc)
238
239 def _call_connection_lost(self, exc):
240 try:
241 self._protocol.connection_lost(exc)
242 finally:
243 self._pipe.close()
244 self._pipe = None
245 self._protocol = None
246 self._loop = None
247
248
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800249class _UnixWritePipeTransport(selector_events._FlowControlMixin,
250 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
253 super().__init__(extra)
254 self._extra['pipe'] = pipe
255 self._loop = loop
256 self._pipe = pipe
257 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700258 mode = os.fstat(self._fileno).st_mode
259 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100260 if not (is_socket or
261 stat.S_ISFIFO(mode) or
262 stat.S_ISCHR(mode)):
263 raise ValueError("Pipe transport is only for "
264 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 _set_nonblocking(self._fileno)
266 self._protocol = protocol
267 self._buffer = []
268 self._conn_lost = 0
269 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700270
271 # On AIX, the reader trick only works for sockets.
272 # On other platforms it works for pipes and sockets.
273 # (Exception: OS X 10.4? Issue #19294.)
274 if is_socket or not sys.platform.startswith("aix"):
275 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 self._loop.call_soon(self._protocol.connection_made, self)
278 if waiter is not None:
279 self._loop.call_soon(waiter.set_result, None)
280
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800281 def get_write_buffer_size(self):
282 return sum(len(data) for data in self._buffer)
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700285 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 self._close()
287
288 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800289 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
290 if isinstance(data, bytearray):
291 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 if not data:
293 return
294
295 if self._conn_lost or self._closing:
296 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700297 logger.warning('pipe closed by peer or '
298 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 self._conn_lost += 1
300 return
301
302 if not self._buffer:
303 # Attempt to send it right away first.
304 try:
305 n = os.write(self._fileno, data)
306 except (BlockingIOError, InterruptedError):
307 n = 0
308 except Exception as exc:
309 self._conn_lost += 1
310 self._fatal_error(exc)
311 return
312 if n == len(data):
313 return
314 elif n > 0:
315 data = data[n:]
316 self._loop.add_writer(self._fileno, self._write_ready)
317
318 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800319 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320
321 def _write_ready(self):
322 data = b''.join(self._buffer)
323 assert data, 'Data should not be empty'
324
325 self._buffer.clear()
326 try:
327 n = os.write(self._fileno, data)
328 except (BlockingIOError, InterruptedError):
329 self._buffer.append(data)
330 except Exception as exc:
331 self._conn_lost += 1
332 # Remove writer here, _fatal_error() doesn't it
333 # because _buffer is empty.
334 self._loop.remove_writer(self._fileno)
335 self._fatal_error(exc)
336 else:
337 if n == len(data):
338 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800339 self._maybe_resume_protocol() # May append to buffer.
340 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 self._loop.remove_reader(self._fileno)
342 self._call_connection_lost(None)
343 return
344 elif n > 0:
345 data = data[n:]
346
347 self._buffer.append(data) # Try again later.
348
349 def can_write_eof(self):
350 return True
351
352 # TODO: Make the relationships between write_eof(), close(),
353 # abort(), _fatal_error() and _close() more straightforward.
354
355 def write_eof(self):
356 if self._closing:
357 return
358 assert self._pipe
359 self._closing = True
360 if not self._buffer:
361 self._loop.remove_reader(self._fileno)
362 self._loop.call_soon(self._call_connection_lost, None)
363
364 def close(self):
365 if not self._closing:
366 # write_eof is all what we needed to close the write pipe
367 self.write_eof()
368
369 def abort(self):
370 self._close(None)
371
372 def _fatal_error(self, exc):
373 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800374 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
375 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 self._close(exc)
377
378 def _close(self, exc=None):
379 self._closing = True
380 if self._buffer:
381 self._loop.remove_writer(self._fileno)
382 self._buffer.clear()
383 self._loop.remove_reader(self._fileno)
384 self._loop.call_soon(self._call_connection_lost, exc)
385
386 def _call_connection_lost(self, exc):
387 try:
388 self._protocol.connection_lost(exc)
389 finally:
390 self._pipe.close()
391 self._pipe = None
392 self._protocol = None
393 self._loop = None
394
395
Guido van Rossum59691282013-10-30 14:52:03 -0700396class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
Guido van Rossum59691282013-10-30 14:52:03 -0700398 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700399 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700401 # Use a socket pair for stdin, since not all platforms
402 # support selecting read events on the write end of a
403 # socket (which we use in order to detect closing of the
404 # other end). Notably this is needed on AIX, and works
405 # just fine on other platforms.
406 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 self._proc = subprocess.Popen(
408 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
409 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700410 if stdin_w is not None:
411 stdin.close()
412 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800413
414
415class AbstractChildWatcher:
416 """Abstract base class for monitoring child processes.
417
418 Objects derived from this class monitor a collection of subprocesses and
419 report their termination or interruption by a signal.
420
421 New callbacks are registered with .add_child_handler(). Starting a new
422 process must be done within a 'with' block to allow the watcher to suspend
423 its activity until the new process if fully registered (this is needed to
424 prevent a race condition in some implementations).
425
426 Example:
427 with watcher:
428 proc = subprocess.Popen("sleep 1")
429 watcher.add_child_handler(proc.pid, callback)
430
431 Notes:
432 Implementations of this class must be thread-safe.
433
434 Since child watcher objects may catch the SIGCHLD signal and call
435 waitpid(-1), there should be only one active object per process.
436 """
437
438 def add_child_handler(self, pid, callback, *args):
439 """Register a new child handler.
440
441 Arrange for callback(pid, returncode, *args) to be called when
442 process 'pid' terminates. Specifying another callback for the same
443 process replaces the previous handler.
444
445 Note: callback() must be thread-safe
446 """
447 raise NotImplementedError()
448
449 def remove_child_handler(self, pid):
450 """Removes the handler for process 'pid'.
451
452 The function returns True if the handler was successfully removed,
453 False if there was nothing to remove."""
454
455 raise NotImplementedError()
456
Guido van Rossum2bcae702013-11-13 15:50:08 -0800457 def attach_loop(self, loop):
458 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800459
Guido van Rossum2bcae702013-11-13 15:50:08 -0800460 If the watcher was previously attached to an event loop, then it is
461 first detached before attaching to the new loop.
462
463 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800464 """
465 raise NotImplementedError()
466
467 def close(self):
468 """Close the watcher.
469
470 This must be called to make sure that any underlying resource is freed.
471 """
472 raise NotImplementedError()
473
474 def __enter__(self):
475 """Enter the watcher's context and allow starting new processes
476
477 This function must return self"""
478 raise NotImplementedError()
479
480 def __exit__(self, a, b, c):
481 """Exit the watcher's context"""
482 raise NotImplementedError()
483
484
485class BaseChildWatcher(AbstractChildWatcher):
486
Guido van Rossum2bcae702013-11-13 15:50:08 -0800487 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800488 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800489
490 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800491 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800492
493 def _do_waitpid(self, expected_pid):
494 raise NotImplementedError()
495
496 def _do_waitpid_all(self):
497 raise NotImplementedError()
498
Guido van Rossum2bcae702013-11-13 15:50:08 -0800499 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800500 assert loop is None or isinstance(loop, events.AbstractEventLoop)
501
502 if self._loop is not None:
503 self._loop.remove_signal_handler(signal.SIGCHLD)
504
505 self._loop = loop
506 if loop is not None:
507 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
508
509 # Prevent a race condition in case a child terminated
510 # during the switch.
511 self._do_waitpid_all()
512
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800513 def _sig_chld(self):
514 try:
515 self._do_waitpid_all()
516 except Exception:
517 logger.exception('Unknown exception in SIGCHLD handler')
518
519 def _compute_returncode(self, status):
520 if os.WIFSIGNALED(status):
521 # The child process died because of a signal.
522 return -os.WTERMSIG(status)
523 elif os.WIFEXITED(status):
524 # The child process exited (e.g sys.exit()).
525 return os.WEXITSTATUS(status)
526 else:
527 # The child exited, but we don't understand its status.
528 # This shouldn't happen, but if it does, let's just
529 # return that status; perhaps that helps debug it.
530 return status
531
532
533class SafeChildWatcher(BaseChildWatcher):
534 """'Safe' child watcher implementation.
535
536 This implementation avoids disrupting other code spawning processes by
537 polling explicitly each process in the SIGCHLD handler instead of calling
538 os.waitpid(-1).
539
540 This is a safe solution but it has a significant overhead when handling a
541 big number of children (O(n) each time SIGCHLD is raised)
542 """
543
Guido van Rossum2bcae702013-11-13 15:50:08 -0800544 def __init__(self):
545 super().__init__()
546 self._callbacks = {}
547
548 def close(self):
549 self._callbacks.clear()
550 super().close()
551
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800552 def __enter__(self):
553 return self
554
555 def __exit__(self, a, b, c):
556 pass
557
558 def add_child_handler(self, pid, callback, *args):
559 self._callbacks[pid] = callback, args
560
561 # Prevent a race condition in case the child is already terminated.
562 self._do_waitpid(pid)
563
Guido van Rossum2bcae702013-11-13 15:50:08 -0800564 def remove_child_handler(self, pid):
565 try:
566 del self._callbacks[pid]
567 return True
568 except KeyError:
569 return False
570
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800571 def _do_waitpid_all(self):
572
573 for pid in list(self._callbacks):
574 self._do_waitpid(pid)
575
576 def _do_waitpid(self, expected_pid):
577 assert expected_pid > 0
578
579 try:
580 pid, status = os.waitpid(expected_pid, os.WNOHANG)
581 except ChildProcessError:
582 # The child process is already reaped
583 # (may happen if waitpid() is called elsewhere).
584 pid = expected_pid
585 returncode = 255
586 logger.warning(
587 "Unknown child process pid %d, will report returncode 255",
588 pid)
589 else:
590 if pid == 0:
591 # The child process is still alive.
592 return
593
594 returncode = self._compute_returncode(status)
595
596 try:
597 callback, args = self._callbacks.pop(pid)
598 except KeyError: # pragma: no cover
599 # May happen if .remove_child_handler() is called
600 # after os.waitpid() returns.
601 pass
602 else:
603 callback(pid, returncode, *args)
604
605
606class FastChildWatcher(BaseChildWatcher):
607 """'Fast' child watcher implementation.
608
609 This implementation reaps every terminated processes by calling
610 os.waitpid(-1) directly, possibly breaking other code spawning processes
611 and waiting for their termination.
612
613 There is no noticeable overhead when handling a big number of children
614 (O(1) each time a child terminates).
615 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800616 def __init__(self):
617 super().__init__()
618 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800619 self._lock = threading.Lock()
620 self._zombies = {}
621 self._forks = 0
622
623 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800624 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800625 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800626 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800627
628 def __enter__(self):
629 with self._lock:
630 self._forks += 1
631
632 return self
633
634 def __exit__(self, a, b, c):
635 with self._lock:
636 self._forks -= 1
637
638 if self._forks or not self._zombies:
639 return
640
641 collateral_victims = str(self._zombies)
642 self._zombies.clear()
643
644 logger.warning(
645 "Caught subprocesses termination from unknown pids: %s",
646 collateral_victims)
647
648 def add_child_handler(self, pid, callback, *args):
649 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800650 with self._lock:
651 try:
652 returncode = self._zombies.pop(pid)
653 except KeyError:
654 # The child is running.
655 self._callbacks[pid] = callback, args
656 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800657
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800658 # The child is dead already. We can fire the callback.
659 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800660
Guido van Rossum2bcae702013-11-13 15:50:08 -0800661 def remove_child_handler(self, pid):
662 try:
663 del self._callbacks[pid]
664 return True
665 except KeyError:
666 return False
667
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800668 def _do_waitpid_all(self):
669 # Because of signal coalescing, we must keep calling waitpid() as
670 # long as we're able to reap a child.
671 while True:
672 try:
673 pid, status = os.waitpid(-1, os.WNOHANG)
674 except ChildProcessError:
675 # No more child processes exist.
676 return
677 else:
678 if pid == 0:
679 # A child process is still alive.
680 return
681
682 returncode = self._compute_returncode(status)
683
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800684 with self._lock:
685 try:
686 callback, args = self._callbacks.pop(pid)
687 except KeyError:
688 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800689 if self._forks:
690 # It may not be registered yet.
691 self._zombies[pid] = returncode
692 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800693 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800694
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800695 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800696 logger.warning(
697 "Caught subprocess termination from unknown pid: "
698 "%d -> %d", pid, returncode)
699 else:
700 callback(pid, returncode, *args)
701
702
703class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
704 """XXX"""
705 _loop_factory = _UnixSelectorEventLoop
706
707 def __init__(self):
708 super().__init__()
709 self._watcher = None
710
711 def _init_watcher(self):
712 with events._lock:
713 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800714 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800715 if isinstance(threading.current_thread(),
716 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800717 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800718
719 def set_event_loop(self, loop):
720 """Set the event loop.
721
722 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 .set_event_loop() from the main thread will call .attach_loop(loop) on
724 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725 """
726
727 super().set_event_loop(loop)
728
729 if self._watcher is not None and \
730 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800731 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800732
733 def get_child_watcher(self):
734 """Get the child watcher
735
736 If not yet set, a SafeChildWatcher object is automatically created.
737 """
738 if self._watcher is None:
739 self._init_watcher()
740
741 return self._watcher
742
743 def set_child_watcher(self, watcher):
744 """Set the child watcher"""
745
746 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
747
748 if self._watcher is not None:
749 self._watcher.close()
750
751 self._watcher = watcher
752
753SelectorEventLoop = _UnixSelectorEventLoop
754DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy