blob: eb3fb9f91c080a4249b1c09fbfe697e6f5a79976 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
162 extra=None, **kwargs)
163 watcher.add_child_handler(transp.get_pid(),
164 self._child_watcher_callback, transp)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 yield from transp._post_init()
166 return transp
167
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800168 def _child_watcher_callback(self, pid, returncode, transp):
169 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800171 def _subprocess_closed(self, transp):
172 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173
174
175def _set_nonblocking(fd):
176 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
177 flags = flags | os.O_NONBLOCK
178 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
179
180
181class _UnixReadPipeTransport(transports.ReadTransport):
182
183 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
184
185 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
186 super().__init__(extra)
187 self._extra['pipe'] = pipe
188 self._loop = loop
189 self._pipe = pipe
190 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700191 mode = os.fstat(self._fileno).st_mode
192 if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
193 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 _set_nonblocking(self._fileno)
195 self._protocol = protocol
196 self._closing = False
197 self._loop.add_reader(self._fileno, self._read_ready)
198 self._loop.call_soon(self._protocol.connection_made, self)
199 if waiter is not None:
200 self._loop.call_soon(waiter.set_result, None)
201
202 def _read_ready(self):
203 try:
204 data = os.read(self._fileno, self.max_size)
205 except (BlockingIOError, InterruptedError):
206 pass
207 except OSError as exc:
208 self._fatal_error(exc)
209 else:
210 if data:
211 self._protocol.data_received(data)
212 else:
213 self._closing = True
214 self._loop.remove_reader(self._fileno)
215 self._loop.call_soon(self._protocol.eof_received)
216 self._loop.call_soon(self._call_connection_lost, None)
217
Guido van Rossum57497ad2013-10-18 07:58:20 -0700218 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 self._loop.remove_reader(self._fileno)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.add_reader(self._fileno, self._read_ready)
223
224 def close(self):
225 if not self._closing:
226 self._close(None)
227
228 def _fatal_error(self, exc):
229 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700230 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 self._close(exc)
232
233 def _close(self, exc):
234 self._closing = True
235 self._loop.remove_reader(self._fileno)
236 self._loop.call_soon(self._call_connection_lost, exc)
237
238 def _call_connection_lost(self, exc):
239 try:
240 self._protocol.connection_lost(exc)
241 finally:
242 self._pipe.close()
243 self._pipe = None
244 self._protocol = None
245 self._loop = None
246
247
248class _UnixWritePipeTransport(transports.WriteTransport):
249
250 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
251 super().__init__(extra)
252 self._extra['pipe'] = pipe
253 self._loop = loop
254 self._pipe = pipe
255 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700256 mode = os.fstat(self._fileno).st_mode
257 is_socket = stat.S_ISSOCK(mode)
258 is_pipe = stat.S_ISFIFO(mode)
259 if not (is_socket or is_pipe):
260 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 _set_nonblocking(self._fileno)
262 self._protocol = protocol
263 self._buffer = []
264 self._conn_lost = 0
265 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700266
267 # On AIX, the reader trick only works for sockets.
268 # On other platforms it works for pipes and sockets.
269 # (Exception: OS X 10.4? Issue #19294.)
270 if is_socket or not sys.platform.startswith("aix"):
271 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272
273 self._loop.call_soon(self._protocol.connection_made, self)
274 if waiter is not None:
275 self._loop.call_soon(waiter.set_result, None)
276
277 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700278 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 self._close()
280
281 def write(self, data):
282 assert isinstance(data, bytes), repr(data)
283 if not data:
284 return
285
286 if self._conn_lost or self._closing:
287 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700288 logger.warning('pipe closed by peer or '
289 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 self._conn_lost += 1
291 return
292
293 if not self._buffer:
294 # Attempt to send it right away first.
295 try:
296 n = os.write(self._fileno, data)
297 except (BlockingIOError, InterruptedError):
298 n = 0
299 except Exception as exc:
300 self._conn_lost += 1
301 self._fatal_error(exc)
302 return
303 if n == len(data):
304 return
305 elif n > 0:
306 data = data[n:]
307 self._loop.add_writer(self._fileno, self._write_ready)
308
309 self._buffer.append(data)
310
311 def _write_ready(self):
312 data = b''.join(self._buffer)
313 assert data, 'Data should not be empty'
314
315 self._buffer.clear()
316 try:
317 n = os.write(self._fileno, data)
318 except (BlockingIOError, InterruptedError):
319 self._buffer.append(data)
320 except Exception as exc:
321 self._conn_lost += 1
322 # Remove writer here, _fatal_error() doesn't it
323 # because _buffer is empty.
324 self._loop.remove_writer(self._fileno)
325 self._fatal_error(exc)
326 else:
327 if n == len(data):
328 self._loop.remove_writer(self._fileno)
329 if self._closing:
330 self._loop.remove_reader(self._fileno)
331 self._call_connection_lost(None)
332 return
333 elif n > 0:
334 data = data[n:]
335
336 self._buffer.append(data) # Try again later.
337
338 def can_write_eof(self):
339 return True
340
341 # TODO: Make the relationships between write_eof(), close(),
342 # abort(), _fatal_error() and _close() more straightforward.
343
344 def write_eof(self):
345 if self._closing:
346 return
347 assert self._pipe
348 self._closing = True
349 if not self._buffer:
350 self._loop.remove_reader(self._fileno)
351 self._loop.call_soon(self._call_connection_lost, None)
352
353 def close(self):
354 if not self._closing:
355 # write_eof is all what we needed to close the write pipe
356 self.write_eof()
357
358 def abort(self):
359 self._close(None)
360
361 def _fatal_error(self, exc):
362 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700363 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 self._close(exc)
365
366 def _close(self, exc=None):
367 self._closing = True
368 if self._buffer:
369 self._loop.remove_writer(self._fileno)
370 self._buffer.clear()
371 self._loop.remove_reader(self._fileno)
372 self._loop.call_soon(self._call_connection_lost, exc)
373
374 def _call_connection_lost(self, exc):
375 try:
376 self._protocol.connection_lost(exc)
377 finally:
378 self._pipe.close()
379 self._pipe = None
380 self._protocol = None
381 self._loop = None
382
383
Guido van Rossum59691282013-10-30 14:52:03 -0700384class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
Guido van Rossum59691282013-10-30 14:52:03 -0700386 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700387 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700389 # Use a socket pair for stdin, since not all platforms
390 # support selecting read events on the write end of a
391 # socket (which we use in order to detect closing of the
392 # other end). Notably this is needed on AIX, and works
393 # just fine on other platforms.
394 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 self._proc = subprocess.Popen(
396 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
397 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700398 if stdin_w is not None:
399 stdin.close()
400 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800401
402
403class AbstractChildWatcher:
404 """Abstract base class for monitoring child processes.
405
406 Objects derived from this class monitor a collection of subprocesses and
407 report their termination or interruption by a signal.
408
409 New callbacks are registered with .add_child_handler(). Starting a new
410 process must be done within a 'with' block to allow the watcher to suspend
411 its activity until the new process if fully registered (this is needed to
412 prevent a race condition in some implementations).
413
414 Example:
415 with watcher:
416 proc = subprocess.Popen("sleep 1")
417 watcher.add_child_handler(proc.pid, callback)
418
419 Notes:
420 Implementations of this class must be thread-safe.
421
422 Since child watcher objects may catch the SIGCHLD signal and call
423 waitpid(-1), there should be only one active object per process.
424 """
425
426 def add_child_handler(self, pid, callback, *args):
427 """Register a new child handler.
428
429 Arrange for callback(pid, returncode, *args) to be called when
430 process 'pid' terminates. Specifying another callback for the same
431 process replaces the previous handler.
432
433 Note: callback() must be thread-safe
434 """
435 raise NotImplementedError()
436
437 def remove_child_handler(self, pid):
438 """Removes the handler for process 'pid'.
439
440 The function returns True if the handler was successfully removed,
441 False if there was nothing to remove."""
442
443 raise NotImplementedError()
444
Guido van Rossum2bcae702013-11-13 15:50:08 -0800445 def attach_loop(self, loop):
446 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800447
Guido van Rossum2bcae702013-11-13 15:50:08 -0800448 If the watcher was previously attached to an event loop, then it is
449 first detached before attaching to the new loop.
450
451 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800452 """
453 raise NotImplementedError()
454
455 def close(self):
456 """Close the watcher.
457
458 This must be called to make sure that any underlying resource is freed.
459 """
460 raise NotImplementedError()
461
462 def __enter__(self):
463 """Enter the watcher's context and allow starting new processes
464
465 This function must return self"""
466 raise NotImplementedError()
467
468 def __exit__(self, a, b, c):
469 """Exit the watcher's context"""
470 raise NotImplementedError()
471
472
473class BaseChildWatcher(AbstractChildWatcher):
474
Guido van Rossum2bcae702013-11-13 15:50:08 -0800475 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800476 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800477
478 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800479 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800480
481 def _do_waitpid(self, expected_pid):
482 raise NotImplementedError()
483
484 def _do_waitpid_all(self):
485 raise NotImplementedError()
486
Guido van Rossum2bcae702013-11-13 15:50:08 -0800487 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800488 assert loop is None or isinstance(loop, events.AbstractEventLoop)
489
490 if self._loop is not None:
491 self._loop.remove_signal_handler(signal.SIGCHLD)
492
493 self._loop = loop
494 if loop is not None:
495 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
496
497 # Prevent a race condition in case a child terminated
498 # during the switch.
499 self._do_waitpid_all()
500
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800501 def _sig_chld(self):
502 try:
503 self._do_waitpid_all()
504 except Exception:
505 logger.exception('Unknown exception in SIGCHLD handler')
506
507 def _compute_returncode(self, status):
508 if os.WIFSIGNALED(status):
509 # The child process died because of a signal.
510 return -os.WTERMSIG(status)
511 elif os.WIFEXITED(status):
512 # The child process exited (e.g sys.exit()).
513 return os.WEXITSTATUS(status)
514 else:
515 # The child exited, but we don't understand its status.
516 # This shouldn't happen, but if it does, let's just
517 # return that status; perhaps that helps debug it.
518 return status
519
520
521class SafeChildWatcher(BaseChildWatcher):
522 """'Safe' child watcher implementation.
523
524 This implementation avoids disrupting other code spawning processes by
525 polling explicitly each process in the SIGCHLD handler instead of calling
526 os.waitpid(-1).
527
528 This is a safe solution but it has a significant overhead when handling a
529 big number of children (O(n) each time SIGCHLD is raised)
530 """
531
Guido van Rossum2bcae702013-11-13 15:50:08 -0800532 def __init__(self):
533 super().__init__()
534 self._callbacks = {}
535
536 def close(self):
537 self._callbacks.clear()
538 super().close()
539
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800540 def __enter__(self):
541 return self
542
543 def __exit__(self, a, b, c):
544 pass
545
546 def add_child_handler(self, pid, callback, *args):
547 self._callbacks[pid] = callback, args
548
549 # Prevent a race condition in case the child is already terminated.
550 self._do_waitpid(pid)
551
Guido van Rossum2bcae702013-11-13 15:50:08 -0800552 def remove_child_handler(self, pid):
553 try:
554 del self._callbacks[pid]
555 return True
556 except KeyError:
557 return False
558
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800559 def _do_waitpid_all(self):
560
561 for pid in list(self._callbacks):
562 self._do_waitpid(pid)
563
564 def _do_waitpid(self, expected_pid):
565 assert expected_pid > 0
566
567 try:
568 pid, status = os.waitpid(expected_pid, os.WNOHANG)
569 except ChildProcessError:
570 # The child process is already reaped
571 # (may happen if waitpid() is called elsewhere).
572 pid = expected_pid
573 returncode = 255
574 logger.warning(
575 "Unknown child process pid %d, will report returncode 255",
576 pid)
577 else:
578 if pid == 0:
579 # The child process is still alive.
580 return
581
582 returncode = self._compute_returncode(status)
583
584 try:
585 callback, args = self._callbacks.pop(pid)
586 except KeyError: # pragma: no cover
587 # May happen if .remove_child_handler() is called
588 # after os.waitpid() returns.
589 pass
590 else:
591 callback(pid, returncode, *args)
592
593
594class FastChildWatcher(BaseChildWatcher):
595 """'Fast' child watcher implementation.
596
597 This implementation reaps every terminated processes by calling
598 os.waitpid(-1) directly, possibly breaking other code spawning processes
599 and waiting for their termination.
600
601 There is no noticeable overhead when handling a big number of children
602 (O(1) each time a child terminates).
603 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800604 def __init__(self):
605 super().__init__()
606 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800607 self._lock = threading.Lock()
608 self._zombies = {}
609 self._forks = 0
610
611 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800612 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800614 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800615
616 def __enter__(self):
617 with self._lock:
618 self._forks += 1
619
620 return self
621
622 def __exit__(self, a, b, c):
623 with self._lock:
624 self._forks -= 1
625
626 if self._forks or not self._zombies:
627 return
628
629 collateral_victims = str(self._zombies)
630 self._zombies.clear()
631
632 logger.warning(
633 "Caught subprocesses termination from unknown pids: %s",
634 collateral_victims)
635
636 def add_child_handler(self, pid, callback, *args):
637 assert self._forks, "Must use the context manager"
638
639 self._callbacks[pid] = callback, args
640
641 try:
642 # Ensure that the child is not already terminated.
643 # (raise KeyError if still alive)
644 returncode = self._zombies.pop(pid)
645
646 # Child is dead, therefore we can fire the callback immediately.
647 # First we remove it from the dict.
648 # (raise KeyError if .remove_child_handler() was called in-between)
649 del self._callbacks[pid]
650 except KeyError:
651 pass
652 else:
653 callback(pid, returncode, *args)
654
Guido van Rossum2bcae702013-11-13 15:50:08 -0800655 def remove_child_handler(self, pid):
656 try:
657 del self._callbacks[pid]
658 return True
659 except KeyError:
660 return False
661
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800662 def _do_waitpid_all(self):
663 # Because of signal coalescing, we must keep calling waitpid() as
664 # long as we're able to reap a child.
665 while True:
666 try:
667 pid, status = os.waitpid(-1, os.WNOHANG)
668 except ChildProcessError:
669 # No more child processes exist.
670 return
671 else:
672 if pid == 0:
673 # A child process is still alive.
674 return
675
676 returncode = self._compute_returncode(status)
677
678 try:
679 callback, args = self._callbacks.pop(pid)
680 except KeyError:
681 # unknown child
682 with self._lock:
683 if self._forks:
684 # It may not be registered yet.
685 self._zombies[pid] = returncode
686 continue
687
688 logger.warning(
689 "Caught subprocess termination from unknown pid: "
690 "%d -> %d", pid, returncode)
691 else:
692 callback(pid, returncode, *args)
693
694
695class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
696 """XXX"""
697 _loop_factory = _UnixSelectorEventLoop
698
699 def __init__(self):
700 super().__init__()
701 self._watcher = None
702
703 def _init_watcher(self):
704 with events._lock:
705 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800706 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800707 if isinstance(threading.current_thread(),
708 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800709 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710
711 def set_event_loop(self, loop):
712 """Set the event loop.
713
714 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800715 .set_event_loop() from the main thread will call .attach_loop(loop) on
716 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800717 """
718
719 super().set_event_loop(loop)
720
721 if self._watcher is not None and \
722 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724
725 def get_child_watcher(self):
726 """Get the child watcher
727
728 If not yet set, a SafeChildWatcher object is automatically created.
729 """
730 if self._watcher is None:
731 self._init_watcher()
732
733 return self._watcher
734
735 def set_child_watcher(self, watcher):
736 """Set the child watcher"""
737
738 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
739
740 if self._watcher is not None:
741 self._watcher.close()
742
743 self._watcher = watcher
744
745SelectorEventLoop = _UnixSelectorEventLoop
746DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy