blob: 219c88a0ff9c3799765905ac985444dd2926d0e3 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
162 extra=None, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800190 if not (stat.S_ISFIFO(mode) or
191 stat.S_ISSOCK(mode) or
192 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700193 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 _set_nonblocking(self._fileno)
195 self._protocol = protocol
196 self._closing = False
197 self._loop.add_reader(self._fileno, self._read_ready)
198 self._loop.call_soon(self._protocol.connection_made, self)
199 if waiter is not None:
200 self._loop.call_soon(waiter.set_result, None)
201
202 def _read_ready(self):
203 try:
204 data = os.read(self._fileno, self.max_size)
205 except (BlockingIOError, InterruptedError):
206 pass
207 except OSError as exc:
208 self._fatal_error(exc)
209 else:
210 if data:
211 self._protocol.data_received(data)
212 else:
213 self._closing = True
214 self._loop.remove_reader(self._fileno)
215 self._loop.call_soon(self._protocol.eof_received)
216 self._loop.call_soon(self._call_connection_lost, None)
217
Guido van Rossum57497ad2013-10-18 07:58:20 -0700218 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 self._loop.remove_reader(self._fileno)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.add_reader(self._fileno, self._read_ready)
223
224 def close(self):
225 if not self._closing:
226 self._close(None)
227
228 def _fatal_error(self, exc):
229 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800230 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
231 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 self._close(exc)
233
234 def _close(self, exc):
235 self._closing = True
236 self._loop.remove_reader(self._fileno)
237 self._loop.call_soon(self._call_connection_lost, exc)
238
239 def _call_connection_lost(self, exc):
240 try:
241 self._protocol.connection_lost(exc)
242 finally:
243 self._pipe.close()
244 self._pipe = None
245 self._protocol = None
246 self._loop = None
247
248
249class _UnixWritePipeTransport(transports.WriteTransport):
250
251 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
252 super().__init__(extra)
253 self._extra['pipe'] = pipe
254 self._loop = loop
255 self._pipe = pipe
256 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700257 mode = os.fstat(self._fileno).st_mode
258 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100259 if not (is_socket or
260 stat.S_ISFIFO(mode) or
261 stat.S_ISCHR(mode)):
262 raise ValueError("Pipe transport is only for "
263 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 _set_nonblocking(self._fileno)
265 self._protocol = protocol
266 self._buffer = []
267 self._conn_lost = 0
268 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700269
270 # On AIX, the reader trick only works for sockets.
271 # On other platforms it works for pipes and sockets.
272 # (Exception: OS X 10.4? Issue #19294.)
273 if is_socket or not sys.platform.startswith("aix"):
274 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 self._loop.call_soon(self._protocol.connection_made, self)
277 if waiter is not None:
278 self._loop.call_soon(waiter.set_result, None)
279
280 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700281 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 self._close()
283
284 def write(self, data):
285 assert isinstance(data, bytes), repr(data)
286 if not data:
287 return
288
289 if self._conn_lost or self._closing:
290 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700291 logger.warning('pipe closed by peer or '
292 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 self._conn_lost += 1
294 return
295
296 if not self._buffer:
297 # Attempt to send it right away first.
298 try:
299 n = os.write(self._fileno, data)
300 except (BlockingIOError, InterruptedError):
301 n = 0
302 except Exception as exc:
303 self._conn_lost += 1
304 self._fatal_error(exc)
305 return
306 if n == len(data):
307 return
308 elif n > 0:
309 data = data[n:]
310 self._loop.add_writer(self._fileno, self._write_ready)
311
312 self._buffer.append(data)
313
314 def _write_ready(self):
315 data = b''.join(self._buffer)
316 assert data, 'Data should not be empty'
317
318 self._buffer.clear()
319 try:
320 n = os.write(self._fileno, data)
321 except (BlockingIOError, InterruptedError):
322 self._buffer.append(data)
323 except Exception as exc:
324 self._conn_lost += 1
325 # Remove writer here, _fatal_error() doesn't it
326 # because _buffer is empty.
327 self._loop.remove_writer(self._fileno)
328 self._fatal_error(exc)
329 else:
330 if n == len(data):
331 self._loop.remove_writer(self._fileno)
332 if self._closing:
333 self._loop.remove_reader(self._fileno)
334 self._call_connection_lost(None)
335 return
336 elif n > 0:
337 data = data[n:]
338
339 self._buffer.append(data) # Try again later.
340
341 def can_write_eof(self):
342 return True
343
344 # TODO: Make the relationships between write_eof(), close(),
345 # abort(), _fatal_error() and _close() more straightforward.
346
347 def write_eof(self):
348 if self._closing:
349 return
350 assert self._pipe
351 self._closing = True
352 if not self._buffer:
353 self._loop.remove_reader(self._fileno)
354 self._loop.call_soon(self._call_connection_lost, None)
355
356 def close(self):
357 if not self._closing:
358 # write_eof is all what we needed to close the write pipe
359 self.write_eof()
360
361 def abort(self):
362 self._close(None)
363
364 def _fatal_error(self, exc):
365 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700366 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 self._close(exc)
368
369 def _close(self, exc=None):
370 self._closing = True
371 if self._buffer:
372 self._loop.remove_writer(self._fileno)
373 self._buffer.clear()
374 self._loop.remove_reader(self._fileno)
375 self._loop.call_soon(self._call_connection_lost, exc)
376
377 def _call_connection_lost(self, exc):
378 try:
379 self._protocol.connection_lost(exc)
380 finally:
381 self._pipe.close()
382 self._pipe = None
383 self._protocol = None
384 self._loop = None
385
386
Guido van Rossum59691282013-10-30 14:52:03 -0700387class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388
Guido van Rossum59691282013-10-30 14:52:03 -0700389 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700390 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700392 # Use a socket pair for stdin, since not all platforms
393 # support selecting read events on the write end of a
394 # socket (which we use in order to detect closing of the
395 # other end). Notably this is needed on AIX, and works
396 # just fine on other platforms.
397 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 self._proc = subprocess.Popen(
399 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
400 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700401 if stdin_w is not None:
402 stdin.close()
403 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800404
405
406class AbstractChildWatcher:
407 """Abstract base class for monitoring child processes.
408
409 Objects derived from this class monitor a collection of subprocesses and
410 report their termination or interruption by a signal.
411
412 New callbacks are registered with .add_child_handler(). Starting a new
413 process must be done within a 'with' block to allow the watcher to suspend
414 its activity until the new process if fully registered (this is needed to
415 prevent a race condition in some implementations).
416
417 Example:
418 with watcher:
419 proc = subprocess.Popen("sleep 1")
420 watcher.add_child_handler(proc.pid, callback)
421
422 Notes:
423 Implementations of this class must be thread-safe.
424
425 Since child watcher objects may catch the SIGCHLD signal and call
426 waitpid(-1), there should be only one active object per process.
427 """
428
429 def add_child_handler(self, pid, callback, *args):
430 """Register a new child handler.
431
432 Arrange for callback(pid, returncode, *args) to be called when
433 process 'pid' terminates. Specifying another callback for the same
434 process replaces the previous handler.
435
436 Note: callback() must be thread-safe
437 """
438 raise NotImplementedError()
439
440 def remove_child_handler(self, pid):
441 """Removes the handler for process 'pid'.
442
443 The function returns True if the handler was successfully removed,
444 False if there was nothing to remove."""
445
446 raise NotImplementedError()
447
Guido van Rossum2bcae702013-11-13 15:50:08 -0800448 def attach_loop(self, loop):
449 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800450
Guido van Rossum2bcae702013-11-13 15:50:08 -0800451 If the watcher was previously attached to an event loop, then it is
452 first detached before attaching to the new loop.
453
454 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800455 """
456 raise NotImplementedError()
457
458 def close(self):
459 """Close the watcher.
460
461 This must be called to make sure that any underlying resource is freed.
462 """
463 raise NotImplementedError()
464
465 def __enter__(self):
466 """Enter the watcher's context and allow starting new processes
467
468 This function must return self"""
469 raise NotImplementedError()
470
471 def __exit__(self, a, b, c):
472 """Exit the watcher's context"""
473 raise NotImplementedError()
474
475
476class BaseChildWatcher(AbstractChildWatcher):
477
Guido van Rossum2bcae702013-11-13 15:50:08 -0800478 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800479 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800480
481 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800482 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800483
484 def _do_waitpid(self, expected_pid):
485 raise NotImplementedError()
486
487 def _do_waitpid_all(self):
488 raise NotImplementedError()
489
Guido van Rossum2bcae702013-11-13 15:50:08 -0800490 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800491 assert loop is None or isinstance(loop, events.AbstractEventLoop)
492
493 if self._loop is not None:
494 self._loop.remove_signal_handler(signal.SIGCHLD)
495
496 self._loop = loop
497 if loop is not None:
498 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
499
500 # Prevent a race condition in case a child terminated
501 # during the switch.
502 self._do_waitpid_all()
503
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800504 def _sig_chld(self):
505 try:
506 self._do_waitpid_all()
507 except Exception:
508 logger.exception('Unknown exception in SIGCHLD handler')
509
510 def _compute_returncode(self, status):
511 if os.WIFSIGNALED(status):
512 # The child process died because of a signal.
513 return -os.WTERMSIG(status)
514 elif os.WIFEXITED(status):
515 # The child process exited (e.g sys.exit()).
516 return os.WEXITSTATUS(status)
517 else:
518 # The child exited, but we don't understand its status.
519 # This shouldn't happen, but if it does, let's just
520 # return that status; perhaps that helps debug it.
521 return status
522
523
524class SafeChildWatcher(BaseChildWatcher):
525 """'Safe' child watcher implementation.
526
527 This implementation avoids disrupting other code spawning processes by
528 polling explicitly each process in the SIGCHLD handler instead of calling
529 os.waitpid(-1).
530
531 This is a safe solution but it has a significant overhead when handling a
532 big number of children (O(n) each time SIGCHLD is raised)
533 """
534
Guido van Rossum2bcae702013-11-13 15:50:08 -0800535 def __init__(self):
536 super().__init__()
537 self._callbacks = {}
538
539 def close(self):
540 self._callbacks.clear()
541 super().close()
542
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800543 def __enter__(self):
544 return self
545
546 def __exit__(self, a, b, c):
547 pass
548
549 def add_child_handler(self, pid, callback, *args):
550 self._callbacks[pid] = callback, args
551
552 # Prevent a race condition in case the child is already terminated.
553 self._do_waitpid(pid)
554
Guido van Rossum2bcae702013-11-13 15:50:08 -0800555 def remove_child_handler(self, pid):
556 try:
557 del self._callbacks[pid]
558 return True
559 except KeyError:
560 return False
561
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800562 def _do_waitpid_all(self):
563
564 for pid in list(self._callbacks):
565 self._do_waitpid(pid)
566
567 def _do_waitpid(self, expected_pid):
568 assert expected_pid > 0
569
570 try:
571 pid, status = os.waitpid(expected_pid, os.WNOHANG)
572 except ChildProcessError:
573 # The child process is already reaped
574 # (may happen if waitpid() is called elsewhere).
575 pid = expected_pid
576 returncode = 255
577 logger.warning(
578 "Unknown child process pid %d, will report returncode 255",
579 pid)
580 else:
581 if pid == 0:
582 # The child process is still alive.
583 return
584
585 returncode = self._compute_returncode(status)
586
587 try:
588 callback, args = self._callbacks.pop(pid)
589 except KeyError: # pragma: no cover
590 # May happen if .remove_child_handler() is called
591 # after os.waitpid() returns.
592 pass
593 else:
594 callback(pid, returncode, *args)
595
596
597class FastChildWatcher(BaseChildWatcher):
598 """'Fast' child watcher implementation.
599
600 This implementation reaps every terminated processes by calling
601 os.waitpid(-1) directly, possibly breaking other code spawning processes
602 and waiting for their termination.
603
604 There is no noticeable overhead when handling a big number of children
605 (O(1) each time a child terminates).
606 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800607 def __init__(self):
608 super().__init__()
609 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800610 self._lock = threading.Lock()
611 self._zombies = {}
612 self._forks = 0
613
614 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800615 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800616 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800617 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800618
619 def __enter__(self):
620 with self._lock:
621 self._forks += 1
622
623 return self
624
625 def __exit__(self, a, b, c):
626 with self._lock:
627 self._forks -= 1
628
629 if self._forks or not self._zombies:
630 return
631
632 collateral_victims = str(self._zombies)
633 self._zombies.clear()
634
635 logger.warning(
636 "Caught subprocesses termination from unknown pids: %s",
637 collateral_victims)
638
639 def add_child_handler(self, pid, callback, *args):
640 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800641 with self._lock:
642 try:
643 returncode = self._zombies.pop(pid)
644 except KeyError:
645 # The child is running.
646 self._callbacks[pid] = callback, args
647 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800648
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800649 # The child is dead already. We can fire the callback.
650 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800651
Guido van Rossum2bcae702013-11-13 15:50:08 -0800652 def remove_child_handler(self, pid):
653 try:
654 del self._callbacks[pid]
655 return True
656 except KeyError:
657 return False
658
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800659 def _do_waitpid_all(self):
660 # Because of signal coalescing, we must keep calling waitpid() as
661 # long as we're able to reap a child.
662 while True:
663 try:
664 pid, status = os.waitpid(-1, os.WNOHANG)
665 except ChildProcessError:
666 # No more child processes exist.
667 return
668 else:
669 if pid == 0:
670 # A child process is still alive.
671 return
672
673 returncode = self._compute_returncode(status)
674
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800675 with self._lock:
676 try:
677 callback, args = self._callbacks.pop(pid)
678 except KeyError:
679 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800680 if self._forks:
681 # It may not be registered yet.
682 self._zombies[pid] = returncode
683 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800684 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800685
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800686 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800687 logger.warning(
688 "Caught subprocess termination from unknown pid: "
689 "%d -> %d", pid, returncode)
690 else:
691 callback(pid, returncode, *args)
692
693
694class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
695 """XXX"""
696 _loop_factory = _UnixSelectorEventLoop
697
698 def __init__(self):
699 super().__init__()
700 self._watcher = None
701
702 def _init_watcher(self):
703 with events._lock:
704 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800705 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800706 if isinstance(threading.current_thread(),
707 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800708 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800709
710 def set_event_loop(self, loop):
711 """Set the event loop.
712
713 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800714 .set_event_loop() from the main thread will call .attach_loop(loop) on
715 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800716 """
717
718 super().set_event_loop(loop)
719
720 if self._watcher is not None and \
721 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800722 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723
724 def get_child_watcher(self):
725 """Get the child watcher
726
727 If not yet set, a SafeChildWatcher object is automatically created.
728 """
729 if self._watcher is None:
730 self._init_watcher()
731
732 return self._watcher
733
734 def set_child_watcher(self, watcher):
735 """Set the child watcher"""
736
737 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
738
739 if self._watcher is not None:
740 self._watcher.close()
741
742 self._watcher = watcher
743
744SelectorEventLoop = _UnixSelectorEventLoop
745DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy