blob: 98fdddeec98c67745766c1dc050298df37625c50 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800162 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800190 if not (stat.S_ISFIFO(mode) or
191 stat.S_ISSOCK(mode) or
192 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700193 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 _set_nonblocking(self._fileno)
195 self._protocol = protocol
196 self._closing = False
197 self._loop.add_reader(self._fileno, self._read_ready)
198 self._loop.call_soon(self._protocol.connection_made, self)
199 if waiter is not None:
200 self._loop.call_soon(waiter.set_result, None)
201
202 def _read_ready(self):
203 try:
204 data = os.read(self._fileno, self.max_size)
205 except (BlockingIOError, InterruptedError):
206 pass
207 except OSError as exc:
208 self._fatal_error(exc)
209 else:
210 if data:
211 self._protocol.data_received(data)
212 else:
213 self._closing = True
214 self._loop.remove_reader(self._fileno)
215 self._loop.call_soon(self._protocol.eof_received)
216 self._loop.call_soon(self._call_connection_lost, None)
217
Guido van Rossum57497ad2013-10-18 07:58:20 -0700218 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 self._loop.remove_reader(self._fileno)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.add_reader(self._fileno, self._read_ready)
223
224 def close(self):
225 if not self._closing:
226 self._close(None)
227
228 def _fatal_error(self, exc):
229 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800230 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
231 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 self._close(exc)
233
234 def _close(self, exc):
235 self._closing = True
236 self._loop.remove_reader(self._fileno)
237 self._loop.call_soon(self._call_connection_lost, exc)
238
239 def _call_connection_lost(self, exc):
240 try:
241 self._protocol.connection_lost(exc)
242 finally:
243 self._pipe.close()
244 self._pipe = None
245 self._protocol = None
246 self._loop = None
247
248
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800249class _UnixWritePipeTransport(selector_events._FlowControlMixin,
250 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
253 super().__init__(extra)
254 self._extra['pipe'] = pipe
255 self._loop = loop
256 self._pipe = pipe
257 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700258 mode = os.fstat(self._fileno).st_mode
259 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100260 if not (is_socket or
261 stat.S_ISFIFO(mode) or
262 stat.S_ISCHR(mode)):
263 raise ValueError("Pipe transport is only for "
264 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 _set_nonblocking(self._fileno)
266 self._protocol = protocol
267 self._buffer = []
268 self._conn_lost = 0
269 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700270
271 # On AIX, the reader trick only works for sockets.
272 # On other platforms it works for pipes and sockets.
273 # (Exception: OS X 10.4? Issue #19294.)
274 if is_socket or not sys.platform.startswith("aix"):
275 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 self._loop.call_soon(self._protocol.connection_made, self)
278 if waiter is not None:
279 self._loop.call_soon(waiter.set_result, None)
280
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800281 def get_write_buffer_size(self):
282 return sum(len(data) for data in self._buffer)
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700285 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100286 if self._buffer:
287 self._close(BrokenPipeError())
288 else:
289 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290
291 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800292 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
293 if isinstance(data, bytearray):
294 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 if not data:
296 return
297
298 if self._conn_lost or self._closing:
299 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700300 logger.warning('pipe closed by peer or '
301 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 self._conn_lost += 1
303 return
304
305 if not self._buffer:
306 # Attempt to send it right away first.
307 try:
308 n = os.write(self._fileno, data)
309 except (BlockingIOError, InterruptedError):
310 n = 0
311 except Exception as exc:
312 self._conn_lost += 1
313 self._fatal_error(exc)
314 return
315 if n == len(data):
316 return
317 elif n > 0:
318 data = data[n:]
319 self._loop.add_writer(self._fileno, self._write_ready)
320
321 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800322 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
324 def _write_ready(self):
325 data = b''.join(self._buffer)
326 assert data, 'Data should not be empty'
327
328 self._buffer.clear()
329 try:
330 n = os.write(self._fileno, data)
331 except (BlockingIOError, InterruptedError):
332 self._buffer.append(data)
333 except Exception as exc:
334 self._conn_lost += 1
335 # Remove writer here, _fatal_error() doesn't it
336 # because _buffer is empty.
337 self._loop.remove_writer(self._fileno)
338 self._fatal_error(exc)
339 else:
340 if n == len(data):
341 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800342 self._maybe_resume_protocol() # May append to buffer.
343 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 self._loop.remove_reader(self._fileno)
345 self._call_connection_lost(None)
346 return
347 elif n > 0:
348 data = data[n:]
349
350 self._buffer.append(data) # Try again later.
351
352 def can_write_eof(self):
353 return True
354
355 # TODO: Make the relationships between write_eof(), close(),
356 # abort(), _fatal_error() and _close() more straightforward.
357
358 def write_eof(self):
359 if self._closing:
360 return
361 assert self._pipe
362 self._closing = True
363 if not self._buffer:
364 self._loop.remove_reader(self._fileno)
365 self._loop.call_soon(self._call_connection_lost, None)
366
367 def close(self):
368 if not self._closing:
369 # write_eof is all what we needed to close the write pipe
370 self.write_eof()
371
372 def abort(self):
373 self._close(None)
374
375 def _fatal_error(self, exc):
376 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800377 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
378 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._close(exc)
380
381 def _close(self, exc=None):
382 self._closing = True
383 if self._buffer:
384 self._loop.remove_writer(self._fileno)
385 self._buffer.clear()
386 self._loop.remove_reader(self._fileno)
387 self._loop.call_soon(self._call_connection_lost, exc)
388
389 def _call_connection_lost(self, exc):
390 try:
391 self._protocol.connection_lost(exc)
392 finally:
393 self._pipe.close()
394 self._pipe = None
395 self._protocol = None
396 self._loop = None
397
398
Guido van Rossum59691282013-10-30 14:52:03 -0700399class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
Guido van Rossum59691282013-10-30 14:52:03 -0700401 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700402 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700404 # Use a socket pair for stdin, since not all platforms
405 # support selecting read events on the write end of a
406 # socket (which we use in order to detect closing of the
407 # other end). Notably this is needed on AIX, and works
408 # just fine on other platforms.
409 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 self._proc = subprocess.Popen(
411 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
412 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700413 if stdin_w is not None:
414 stdin.close()
415 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800416
417
418class AbstractChildWatcher:
419 """Abstract base class for monitoring child processes.
420
421 Objects derived from this class monitor a collection of subprocesses and
422 report their termination or interruption by a signal.
423
424 New callbacks are registered with .add_child_handler(). Starting a new
425 process must be done within a 'with' block to allow the watcher to suspend
426 its activity until the new process if fully registered (this is needed to
427 prevent a race condition in some implementations).
428
429 Example:
430 with watcher:
431 proc = subprocess.Popen("sleep 1")
432 watcher.add_child_handler(proc.pid, callback)
433
434 Notes:
435 Implementations of this class must be thread-safe.
436
437 Since child watcher objects may catch the SIGCHLD signal and call
438 waitpid(-1), there should be only one active object per process.
439 """
440
441 def add_child_handler(self, pid, callback, *args):
442 """Register a new child handler.
443
444 Arrange for callback(pid, returncode, *args) to be called when
445 process 'pid' terminates. Specifying another callback for the same
446 process replaces the previous handler.
447
448 Note: callback() must be thread-safe
449 """
450 raise NotImplementedError()
451
452 def remove_child_handler(self, pid):
453 """Removes the handler for process 'pid'.
454
455 The function returns True if the handler was successfully removed,
456 False if there was nothing to remove."""
457
458 raise NotImplementedError()
459
Guido van Rossum2bcae702013-11-13 15:50:08 -0800460 def attach_loop(self, loop):
461 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800462
Guido van Rossum2bcae702013-11-13 15:50:08 -0800463 If the watcher was previously attached to an event loop, then it is
464 first detached before attaching to the new loop.
465
466 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800467 """
468 raise NotImplementedError()
469
470 def close(self):
471 """Close the watcher.
472
473 This must be called to make sure that any underlying resource is freed.
474 """
475 raise NotImplementedError()
476
477 def __enter__(self):
478 """Enter the watcher's context and allow starting new processes
479
480 This function must return self"""
481 raise NotImplementedError()
482
483 def __exit__(self, a, b, c):
484 """Exit the watcher's context"""
485 raise NotImplementedError()
486
487
488class BaseChildWatcher(AbstractChildWatcher):
489
Guido van Rossum2bcae702013-11-13 15:50:08 -0800490 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800491 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800492
493 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800494 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800495
496 def _do_waitpid(self, expected_pid):
497 raise NotImplementedError()
498
499 def _do_waitpid_all(self):
500 raise NotImplementedError()
501
Guido van Rossum2bcae702013-11-13 15:50:08 -0800502 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800503 assert loop is None or isinstance(loop, events.AbstractEventLoop)
504
505 if self._loop is not None:
506 self._loop.remove_signal_handler(signal.SIGCHLD)
507
508 self._loop = loop
509 if loop is not None:
510 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
511
512 # Prevent a race condition in case a child terminated
513 # during the switch.
514 self._do_waitpid_all()
515
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800516 def _sig_chld(self):
517 try:
518 self._do_waitpid_all()
519 except Exception:
520 logger.exception('Unknown exception in SIGCHLD handler')
521
522 def _compute_returncode(self, status):
523 if os.WIFSIGNALED(status):
524 # The child process died because of a signal.
525 return -os.WTERMSIG(status)
526 elif os.WIFEXITED(status):
527 # The child process exited (e.g sys.exit()).
528 return os.WEXITSTATUS(status)
529 else:
530 # The child exited, but we don't understand its status.
531 # This shouldn't happen, but if it does, let's just
532 # return that status; perhaps that helps debug it.
533 return status
534
535
536class SafeChildWatcher(BaseChildWatcher):
537 """'Safe' child watcher implementation.
538
539 This implementation avoids disrupting other code spawning processes by
540 polling explicitly each process in the SIGCHLD handler instead of calling
541 os.waitpid(-1).
542
543 This is a safe solution but it has a significant overhead when handling a
544 big number of children (O(n) each time SIGCHLD is raised)
545 """
546
Guido van Rossum2bcae702013-11-13 15:50:08 -0800547 def __init__(self):
548 super().__init__()
549 self._callbacks = {}
550
551 def close(self):
552 self._callbacks.clear()
553 super().close()
554
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800555 def __enter__(self):
556 return self
557
558 def __exit__(self, a, b, c):
559 pass
560
561 def add_child_handler(self, pid, callback, *args):
562 self._callbacks[pid] = callback, args
563
564 # Prevent a race condition in case the child is already terminated.
565 self._do_waitpid(pid)
566
Guido van Rossum2bcae702013-11-13 15:50:08 -0800567 def remove_child_handler(self, pid):
568 try:
569 del self._callbacks[pid]
570 return True
571 except KeyError:
572 return False
573
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800574 def _do_waitpid_all(self):
575
576 for pid in list(self._callbacks):
577 self._do_waitpid(pid)
578
579 def _do_waitpid(self, expected_pid):
580 assert expected_pid > 0
581
582 try:
583 pid, status = os.waitpid(expected_pid, os.WNOHANG)
584 except ChildProcessError:
585 # The child process is already reaped
586 # (may happen if waitpid() is called elsewhere).
587 pid = expected_pid
588 returncode = 255
589 logger.warning(
590 "Unknown child process pid %d, will report returncode 255",
591 pid)
592 else:
593 if pid == 0:
594 # The child process is still alive.
595 return
596
597 returncode = self._compute_returncode(status)
598
599 try:
600 callback, args = self._callbacks.pop(pid)
601 except KeyError: # pragma: no cover
602 # May happen if .remove_child_handler() is called
603 # after os.waitpid() returns.
604 pass
605 else:
606 callback(pid, returncode, *args)
607
608
609class FastChildWatcher(BaseChildWatcher):
610 """'Fast' child watcher implementation.
611
612 This implementation reaps every terminated processes by calling
613 os.waitpid(-1) directly, possibly breaking other code spawning processes
614 and waiting for their termination.
615
616 There is no noticeable overhead when handling a big number of children
617 (O(1) each time a child terminates).
618 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800619 def __init__(self):
620 super().__init__()
621 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800622 self._lock = threading.Lock()
623 self._zombies = {}
624 self._forks = 0
625
626 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800627 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800628 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800629 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800630
631 def __enter__(self):
632 with self._lock:
633 self._forks += 1
634
635 return self
636
637 def __exit__(self, a, b, c):
638 with self._lock:
639 self._forks -= 1
640
641 if self._forks or not self._zombies:
642 return
643
644 collateral_victims = str(self._zombies)
645 self._zombies.clear()
646
647 logger.warning(
648 "Caught subprocesses termination from unknown pids: %s",
649 collateral_victims)
650
651 def add_child_handler(self, pid, callback, *args):
652 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800653 with self._lock:
654 try:
655 returncode = self._zombies.pop(pid)
656 except KeyError:
657 # The child is running.
658 self._callbacks[pid] = callback, args
659 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800660
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800661 # The child is dead already. We can fire the callback.
662 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800663
Guido van Rossum2bcae702013-11-13 15:50:08 -0800664 def remove_child_handler(self, pid):
665 try:
666 del self._callbacks[pid]
667 return True
668 except KeyError:
669 return False
670
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800671 def _do_waitpid_all(self):
672 # Because of signal coalescing, we must keep calling waitpid() as
673 # long as we're able to reap a child.
674 while True:
675 try:
676 pid, status = os.waitpid(-1, os.WNOHANG)
677 except ChildProcessError:
678 # No more child processes exist.
679 return
680 else:
681 if pid == 0:
682 # A child process is still alive.
683 return
684
685 returncode = self._compute_returncode(status)
686
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800687 with self._lock:
688 try:
689 callback, args = self._callbacks.pop(pid)
690 except KeyError:
691 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800692 if self._forks:
693 # It may not be registered yet.
694 self._zombies[pid] = returncode
695 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800696 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800697
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800698 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800699 logger.warning(
700 "Caught subprocess termination from unknown pid: "
701 "%d -> %d", pid, returncode)
702 else:
703 callback(pid, returncode, *args)
704
705
706class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
707 """XXX"""
708 _loop_factory = _UnixSelectorEventLoop
709
710 def __init__(self):
711 super().__init__()
712 self._watcher = None
713
714 def _init_watcher(self):
715 with events._lock:
716 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800717 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800718 if isinstance(threading.current_thread(),
719 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800720 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800721
722 def set_event_loop(self, loop):
723 """Set the event loop.
724
725 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800726 .set_event_loop() from the main thread will call .attach_loop(loop) on
727 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800728 """
729
730 super().set_event_loop(loop)
731
732 if self._watcher is not None and \
733 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800734 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800735
736 def get_child_watcher(self):
737 """Get the child watcher
738
739 If not yet set, a SafeChildWatcher object is automatically created.
740 """
741 if self._watcher is None:
742 self._init_watcher()
743
744 return self._watcher
745
746 def set_child_watcher(self, watcher):
747 """Set the child watcher"""
748
749 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
750
751 if self._watcher is not None:
752 self._watcher.close()
753
754 self._watcher = watcher
755
756SelectorEventLoop = _UnixSelectorEventLoop
757DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy