blob: b611efd17d14171be8a0e512b423b653ed39a4dd [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
77 except OSError as exc:
78 del self._signal_handlers[sig]
79 if not self._signal_handlers:
80 try:
81 signal.set_wakeup_fd(-1)
82 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070083 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
85 if exc.errno == errno.EINVAL:
86 raise RuntimeError('sig {} cannot be caught'.format(sig))
87 else:
88 raise
89
90 def _handle_signal(self, sig, arg):
91 """Internal helper that is the actual signal handler."""
92 handle = self._signal_handlers.get(sig)
93 if handle is None:
94 return # Assume it's some race condition.
95 if handle._cancelled:
96 self.remove_signal_handler(sig) # Remove it properly.
97 else:
98 self._add_callback_signalsafe(handle)
99
100 def remove_signal_handler(self, sig):
101 """Remove a handler for a signal. UNIX only.
102
103 Return True if a signal handler was removed, False if not.
104 """
105 self._check_signal(sig)
106 try:
107 del self._signal_handlers[sig]
108 except KeyError:
109 return False
110
111 if sig == signal.SIGINT:
112 handler = signal.default_int_handler
113 else:
114 handler = signal.SIG_DFL
115
116 try:
117 signal.signal(sig, handler)
118 except OSError as exc:
119 if exc.errno == errno.EINVAL:
120 raise RuntimeError('sig {} cannot be caught'.format(sig))
121 else:
122 raise
123
124 if not self._signal_handlers:
125 try:
126 signal.set_wakeup_fd(-1)
127 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700128 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129
130 return True
131
132 def _check_signal(self, sig):
133 """Internal helper to validate a signal.
134
135 Raise ValueError if the signal number is invalid or uncatchable.
136 Raise RuntimeError if there is a problem setting up the handler.
137 """
138 if not isinstance(sig, int):
139 raise TypeError('sig must be an int, not {!r}'.format(sig))
140
141 if not (1 <= sig < signal.NSIG):
142 raise ValueError(
143 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
144
145 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
146 extra=None):
147 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
148
149 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
150 extra=None):
151 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
152
153 @tasks.coroutine
154 def _make_subprocess_transport(self, protocol, args, shell,
155 stdin, stdout, stderr, bufsize,
156 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800157 with events.get_child_watcher() as watcher:
158 transp = _UnixSubprocessTransport(self, protocol, args, shell,
159 stdin, stdout, stderr, bufsize,
160 extra=None, **kwargs)
161 watcher.add_child_handler(transp.get_pid(),
162 self._child_watcher_callback, transp)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 yield from transp._post_init()
164 return transp
165
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800166 def _child_watcher_callback(self, pid, returncode, transp):
167 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _subprocess_closed(self, transp):
170 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
190 if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
191 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 _set_nonblocking(self._fileno)
193 self._protocol = protocol
194 self._closing = False
195 self._loop.add_reader(self._fileno, self._read_ready)
196 self._loop.call_soon(self._protocol.connection_made, self)
197 if waiter is not None:
198 self._loop.call_soon(waiter.set_result, None)
199
200 def _read_ready(self):
201 try:
202 data = os.read(self._fileno, self.max_size)
203 except (BlockingIOError, InterruptedError):
204 pass
205 except OSError as exc:
206 self._fatal_error(exc)
207 else:
208 if data:
209 self._protocol.data_received(data)
210 else:
211 self._closing = True
212 self._loop.remove_reader(self._fileno)
213 self._loop.call_soon(self._protocol.eof_received)
214 self._loop.call_soon(self._call_connection_lost, None)
215
Guido van Rossum57497ad2013-10-18 07:58:20 -0700216 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 self._loop.remove_reader(self._fileno)
218
Guido van Rossum57497ad2013-10-18 07:58:20 -0700219 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 self._loop.add_reader(self._fileno, self._read_ready)
221
222 def close(self):
223 if not self._closing:
224 self._close(None)
225
226 def _fatal_error(self, exc):
227 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700228 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 self._close(exc)
230
231 def _close(self, exc):
232 self._closing = True
233 self._loop.remove_reader(self._fileno)
234 self._loop.call_soon(self._call_connection_lost, exc)
235
236 def _call_connection_lost(self, exc):
237 try:
238 self._protocol.connection_lost(exc)
239 finally:
240 self._pipe.close()
241 self._pipe = None
242 self._protocol = None
243 self._loop = None
244
245
246class _UnixWritePipeTransport(transports.WriteTransport):
247
248 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
249 super().__init__(extra)
250 self._extra['pipe'] = pipe
251 self._loop = loop
252 self._pipe = pipe
253 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700254 mode = os.fstat(self._fileno).st_mode
255 is_socket = stat.S_ISSOCK(mode)
256 is_pipe = stat.S_ISFIFO(mode)
257 if not (is_socket or is_pipe):
258 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 _set_nonblocking(self._fileno)
260 self._protocol = protocol
261 self._buffer = []
262 self._conn_lost = 0
263 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700264
265 # On AIX, the reader trick only works for sockets.
266 # On other platforms it works for pipes and sockets.
267 # (Exception: OS X 10.4? Issue #19294.)
268 if is_socket or not sys.platform.startswith("aix"):
269 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
271 self._loop.call_soon(self._protocol.connection_made, self)
272 if waiter is not None:
273 self._loop.call_soon(waiter.set_result, None)
274
275 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700276 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 self._close()
278
279 def write(self, data):
280 assert isinstance(data, bytes), repr(data)
281 if not data:
282 return
283
284 if self._conn_lost or self._closing:
285 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700286 logger.warning('pipe closed by peer or '
287 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 self._conn_lost += 1
289 return
290
291 if not self._buffer:
292 # Attempt to send it right away first.
293 try:
294 n = os.write(self._fileno, data)
295 except (BlockingIOError, InterruptedError):
296 n = 0
297 except Exception as exc:
298 self._conn_lost += 1
299 self._fatal_error(exc)
300 return
301 if n == len(data):
302 return
303 elif n > 0:
304 data = data[n:]
305 self._loop.add_writer(self._fileno, self._write_ready)
306
307 self._buffer.append(data)
308
309 def _write_ready(self):
310 data = b''.join(self._buffer)
311 assert data, 'Data should not be empty'
312
313 self._buffer.clear()
314 try:
315 n = os.write(self._fileno, data)
316 except (BlockingIOError, InterruptedError):
317 self._buffer.append(data)
318 except Exception as exc:
319 self._conn_lost += 1
320 # Remove writer here, _fatal_error() doesn't it
321 # because _buffer is empty.
322 self._loop.remove_writer(self._fileno)
323 self._fatal_error(exc)
324 else:
325 if n == len(data):
326 self._loop.remove_writer(self._fileno)
327 if self._closing:
328 self._loop.remove_reader(self._fileno)
329 self._call_connection_lost(None)
330 return
331 elif n > 0:
332 data = data[n:]
333
334 self._buffer.append(data) # Try again later.
335
336 def can_write_eof(self):
337 return True
338
339 # TODO: Make the relationships between write_eof(), close(),
340 # abort(), _fatal_error() and _close() more straightforward.
341
342 def write_eof(self):
343 if self._closing:
344 return
345 assert self._pipe
346 self._closing = True
347 if not self._buffer:
348 self._loop.remove_reader(self._fileno)
349 self._loop.call_soon(self._call_connection_lost, None)
350
351 def close(self):
352 if not self._closing:
353 # write_eof is all what we needed to close the write pipe
354 self.write_eof()
355
356 def abort(self):
357 self._close(None)
358
359 def _fatal_error(self, exc):
360 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700361 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 self._close(exc)
363
364 def _close(self, exc=None):
365 self._closing = True
366 if self._buffer:
367 self._loop.remove_writer(self._fileno)
368 self._buffer.clear()
369 self._loop.remove_reader(self._fileno)
370 self._loop.call_soon(self._call_connection_lost, exc)
371
372 def _call_connection_lost(self, exc):
373 try:
374 self._protocol.connection_lost(exc)
375 finally:
376 self._pipe.close()
377 self._pipe = None
378 self._protocol = None
379 self._loop = None
380
381
Guido van Rossum59691282013-10-30 14:52:03 -0700382class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Guido van Rossum59691282013-10-30 14:52:03 -0700384 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700385 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700387 # Use a socket pair for stdin, since not all platforms
388 # support selecting read events on the write end of a
389 # socket (which we use in order to detect closing of the
390 # other end). Notably this is needed on AIX, and works
391 # just fine on other platforms.
392 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 self._proc = subprocess.Popen(
394 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
395 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700396 if stdin_w is not None:
397 stdin.close()
398 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800399
400
401class AbstractChildWatcher:
402 """Abstract base class for monitoring child processes.
403
404 Objects derived from this class monitor a collection of subprocesses and
405 report their termination or interruption by a signal.
406
407 New callbacks are registered with .add_child_handler(). Starting a new
408 process must be done within a 'with' block to allow the watcher to suspend
409 its activity until the new process if fully registered (this is needed to
410 prevent a race condition in some implementations).
411
412 Example:
413 with watcher:
414 proc = subprocess.Popen("sleep 1")
415 watcher.add_child_handler(proc.pid, callback)
416
417 Notes:
418 Implementations of this class must be thread-safe.
419
420 Since child watcher objects may catch the SIGCHLD signal and call
421 waitpid(-1), there should be only one active object per process.
422 """
423
424 def add_child_handler(self, pid, callback, *args):
425 """Register a new child handler.
426
427 Arrange for callback(pid, returncode, *args) to be called when
428 process 'pid' terminates. Specifying another callback for the same
429 process replaces the previous handler.
430
431 Note: callback() must be thread-safe
432 """
433 raise NotImplementedError()
434
435 def remove_child_handler(self, pid):
436 """Removes the handler for process 'pid'.
437
438 The function returns True if the handler was successfully removed,
439 False if there was nothing to remove."""
440
441 raise NotImplementedError()
442
Guido van Rossum2bcae702013-11-13 15:50:08 -0800443 def attach_loop(self, loop):
444 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800445
Guido van Rossum2bcae702013-11-13 15:50:08 -0800446 If the watcher was previously attached to an event loop, then it is
447 first detached before attaching to the new loop.
448
449 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800450 """
451 raise NotImplementedError()
452
453 def close(self):
454 """Close the watcher.
455
456 This must be called to make sure that any underlying resource is freed.
457 """
458 raise NotImplementedError()
459
460 def __enter__(self):
461 """Enter the watcher's context and allow starting new processes
462
463 This function must return self"""
464 raise NotImplementedError()
465
466 def __exit__(self, a, b, c):
467 """Exit the watcher's context"""
468 raise NotImplementedError()
469
470
471class BaseChildWatcher(AbstractChildWatcher):
472
Guido van Rossum2bcae702013-11-13 15:50:08 -0800473 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800474 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800475
476 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800477 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800478
479 def _do_waitpid(self, expected_pid):
480 raise NotImplementedError()
481
482 def _do_waitpid_all(self):
483 raise NotImplementedError()
484
Guido van Rossum2bcae702013-11-13 15:50:08 -0800485 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800486 assert loop is None or isinstance(loop, events.AbstractEventLoop)
487
488 if self._loop is not None:
489 self._loop.remove_signal_handler(signal.SIGCHLD)
490
491 self._loop = loop
492 if loop is not None:
493 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
494
495 # Prevent a race condition in case a child terminated
496 # during the switch.
497 self._do_waitpid_all()
498
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800499 def _sig_chld(self):
500 try:
501 self._do_waitpid_all()
502 except Exception:
503 logger.exception('Unknown exception in SIGCHLD handler')
504
505 def _compute_returncode(self, status):
506 if os.WIFSIGNALED(status):
507 # The child process died because of a signal.
508 return -os.WTERMSIG(status)
509 elif os.WIFEXITED(status):
510 # The child process exited (e.g sys.exit()).
511 return os.WEXITSTATUS(status)
512 else:
513 # The child exited, but we don't understand its status.
514 # This shouldn't happen, but if it does, let's just
515 # return that status; perhaps that helps debug it.
516 return status
517
518
519class SafeChildWatcher(BaseChildWatcher):
520 """'Safe' child watcher implementation.
521
522 This implementation avoids disrupting other code spawning processes by
523 polling explicitly each process in the SIGCHLD handler instead of calling
524 os.waitpid(-1).
525
526 This is a safe solution but it has a significant overhead when handling a
527 big number of children (O(n) each time SIGCHLD is raised)
528 """
529
Guido van Rossum2bcae702013-11-13 15:50:08 -0800530 def __init__(self):
531 super().__init__()
532 self._callbacks = {}
533
534 def close(self):
535 self._callbacks.clear()
536 super().close()
537
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800538 def __enter__(self):
539 return self
540
541 def __exit__(self, a, b, c):
542 pass
543
544 def add_child_handler(self, pid, callback, *args):
545 self._callbacks[pid] = callback, args
546
547 # Prevent a race condition in case the child is already terminated.
548 self._do_waitpid(pid)
549
Guido van Rossum2bcae702013-11-13 15:50:08 -0800550 def remove_child_handler(self, pid):
551 try:
552 del self._callbacks[pid]
553 return True
554 except KeyError:
555 return False
556
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800557 def _do_waitpid_all(self):
558
559 for pid in list(self._callbacks):
560 self._do_waitpid(pid)
561
562 def _do_waitpid(self, expected_pid):
563 assert expected_pid > 0
564
565 try:
566 pid, status = os.waitpid(expected_pid, os.WNOHANG)
567 except ChildProcessError:
568 # The child process is already reaped
569 # (may happen if waitpid() is called elsewhere).
570 pid = expected_pid
571 returncode = 255
572 logger.warning(
573 "Unknown child process pid %d, will report returncode 255",
574 pid)
575 else:
576 if pid == 0:
577 # The child process is still alive.
578 return
579
580 returncode = self._compute_returncode(status)
581
582 try:
583 callback, args = self._callbacks.pop(pid)
584 except KeyError: # pragma: no cover
585 # May happen if .remove_child_handler() is called
586 # after os.waitpid() returns.
587 pass
588 else:
589 callback(pid, returncode, *args)
590
591
592class FastChildWatcher(BaseChildWatcher):
593 """'Fast' child watcher implementation.
594
595 This implementation reaps every terminated processes by calling
596 os.waitpid(-1) directly, possibly breaking other code spawning processes
597 and waiting for their termination.
598
599 There is no noticeable overhead when handling a big number of children
600 (O(1) each time a child terminates).
601 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800602 def __init__(self):
603 super().__init__()
604 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800605 self._lock = threading.Lock()
606 self._zombies = {}
607 self._forks = 0
608
609 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800610 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800612 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613
614 def __enter__(self):
615 with self._lock:
616 self._forks += 1
617
618 return self
619
620 def __exit__(self, a, b, c):
621 with self._lock:
622 self._forks -= 1
623
624 if self._forks or not self._zombies:
625 return
626
627 collateral_victims = str(self._zombies)
628 self._zombies.clear()
629
630 logger.warning(
631 "Caught subprocesses termination from unknown pids: %s",
632 collateral_victims)
633
634 def add_child_handler(self, pid, callback, *args):
635 assert self._forks, "Must use the context manager"
636
637 self._callbacks[pid] = callback, args
638
639 try:
640 # Ensure that the child is not already terminated.
641 # (raise KeyError if still alive)
642 returncode = self._zombies.pop(pid)
643
644 # Child is dead, therefore we can fire the callback immediately.
645 # First we remove it from the dict.
646 # (raise KeyError if .remove_child_handler() was called in-between)
647 del self._callbacks[pid]
648 except KeyError:
649 pass
650 else:
651 callback(pid, returncode, *args)
652
Guido van Rossum2bcae702013-11-13 15:50:08 -0800653 def remove_child_handler(self, pid):
654 try:
655 del self._callbacks[pid]
656 return True
657 except KeyError:
658 return False
659
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800660 def _do_waitpid_all(self):
661 # Because of signal coalescing, we must keep calling waitpid() as
662 # long as we're able to reap a child.
663 while True:
664 try:
665 pid, status = os.waitpid(-1, os.WNOHANG)
666 except ChildProcessError:
667 # No more child processes exist.
668 return
669 else:
670 if pid == 0:
671 # A child process is still alive.
672 return
673
674 returncode = self._compute_returncode(status)
675
676 try:
677 callback, args = self._callbacks.pop(pid)
678 except KeyError:
679 # unknown child
680 with self._lock:
681 if self._forks:
682 # It may not be registered yet.
683 self._zombies[pid] = returncode
684 continue
685
686 logger.warning(
687 "Caught subprocess termination from unknown pid: "
688 "%d -> %d", pid, returncode)
689 else:
690 callback(pid, returncode, *args)
691
692
693class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
694 """XXX"""
695 _loop_factory = _UnixSelectorEventLoop
696
697 def __init__(self):
698 super().__init__()
699 self._watcher = None
700
701 def _init_watcher(self):
702 with events._lock:
703 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800704 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800705 if isinstance(threading.current_thread(),
706 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800707 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800708
709 def set_event_loop(self, loop):
710 """Set the event loop.
711
712 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800713 .set_event_loop() from the main thread will call .attach_loop(loop) on
714 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800715 """
716
717 super().set_event_loop(loop)
718
719 if self._watcher is not None and \
720 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800721 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800722
723 def get_child_watcher(self):
724 """Get the child watcher
725
726 If not yet set, a SafeChildWatcher object is automatically created.
727 """
728 if self._watcher is None:
729 self._init_watcher()
730
731 return self._watcher
732
733 def set_child_watcher(self, watcher):
734 """Set the child watcher"""
735
736 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
737
738 if self._watcher is not None:
739 self._watcher.close()
740
741 self._watcher = watcher
742
743SelectorEventLoop = _UnixSelectorEventLoop
744DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy