blob: a1aff3f146a775bfcca9f1ed6ff290270bf925cb [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
162 extra=None, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800190 if not (stat.S_ISFIFO(mode) or
191 stat.S_ISSOCK(mode) or
192 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700193 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194 _set_nonblocking(self._fileno)
195 self._protocol = protocol
196 self._closing = False
197 self._loop.add_reader(self._fileno, self._read_ready)
198 self._loop.call_soon(self._protocol.connection_made, self)
199 if waiter is not None:
200 self._loop.call_soon(waiter.set_result, None)
201
202 def _read_ready(self):
203 try:
204 data = os.read(self._fileno, self.max_size)
205 except (BlockingIOError, InterruptedError):
206 pass
207 except OSError as exc:
208 self._fatal_error(exc)
209 else:
210 if data:
211 self._protocol.data_received(data)
212 else:
213 self._closing = True
214 self._loop.remove_reader(self._fileno)
215 self._loop.call_soon(self._protocol.eof_received)
216 self._loop.call_soon(self._call_connection_lost, None)
217
Guido van Rossum57497ad2013-10-18 07:58:20 -0700218 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 self._loop.remove_reader(self._fileno)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.add_reader(self._fileno, self._read_ready)
223
224 def close(self):
225 if not self._closing:
226 self._close(None)
227
228 def _fatal_error(self, exc):
229 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800230 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
231 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 self._close(exc)
233
234 def _close(self, exc):
235 self._closing = True
236 self._loop.remove_reader(self._fileno)
237 self._loop.call_soon(self._call_connection_lost, exc)
238
239 def _call_connection_lost(self, exc):
240 try:
241 self._protocol.connection_lost(exc)
242 finally:
243 self._pipe.close()
244 self._pipe = None
245 self._protocol = None
246 self._loop = None
247
248
249class _UnixWritePipeTransport(transports.WriteTransport):
250
251 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
252 super().__init__(extra)
253 self._extra['pipe'] = pipe
254 self._loop = loop
255 self._pipe = pipe
256 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700257 mode = os.fstat(self._fileno).st_mode
258 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100259 if not (is_socket or
260 stat.S_ISFIFO(mode) or
261 stat.S_ISCHR(mode)):
262 raise ValueError("Pipe transport is only for "
263 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 _set_nonblocking(self._fileno)
265 self._protocol = protocol
266 self._buffer = []
267 self._conn_lost = 0
268 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700269
270 # On AIX, the reader trick only works for sockets.
271 # On other platforms it works for pipes and sockets.
272 # (Exception: OS X 10.4? Issue #19294.)
273 if is_socket or not sys.platform.startswith("aix"):
274 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 self._loop.call_soon(self._protocol.connection_made, self)
277 if waiter is not None:
278 self._loop.call_soon(waiter.set_result, None)
279
280 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700281 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 self._close()
283
284 def write(self, data):
285 assert isinstance(data, bytes), repr(data)
286 if not data:
287 return
288
289 if self._conn_lost or self._closing:
290 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700291 logger.warning('pipe closed by peer or '
292 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293 self._conn_lost += 1
294 return
295
296 if not self._buffer:
297 # Attempt to send it right away first.
298 try:
299 n = os.write(self._fileno, data)
300 except (BlockingIOError, InterruptedError):
301 n = 0
302 except Exception as exc:
303 self._conn_lost += 1
304 self._fatal_error(exc)
305 return
306 if n == len(data):
307 return
308 elif n > 0:
309 data = data[n:]
310 self._loop.add_writer(self._fileno, self._write_ready)
311
312 self._buffer.append(data)
313
314 def _write_ready(self):
315 data = b''.join(self._buffer)
316 assert data, 'Data should not be empty'
317
318 self._buffer.clear()
319 try:
320 n = os.write(self._fileno, data)
321 except (BlockingIOError, InterruptedError):
322 self._buffer.append(data)
323 except Exception as exc:
324 self._conn_lost += 1
325 # Remove writer here, _fatal_error() doesn't it
326 # because _buffer is empty.
327 self._loop.remove_writer(self._fileno)
328 self._fatal_error(exc)
329 else:
330 if n == len(data):
331 self._loop.remove_writer(self._fileno)
332 if self._closing:
333 self._loop.remove_reader(self._fileno)
334 self._call_connection_lost(None)
335 return
336 elif n > 0:
337 data = data[n:]
338
339 self._buffer.append(data) # Try again later.
340
341 def can_write_eof(self):
342 return True
343
344 # TODO: Make the relationships between write_eof(), close(),
345 # abort(), _fatal_error() and _close() more straightforward.
346
347 def write_eof(self):
348 if self._closing:
349 return
350 assert self._pipe
351 self._closing = True
352 if not self._buffer:
353 self._loop.remove_reader(self._fileno)
354 self._loop.call_soon(self._call_connection_lost, None)
355
356 def close(self):
357 if not self._closing:
358 # write_eof is all what we needed to close the write pipe
359 self.write_eof()
360
361 def abort(self):
362 self._close(None)
363
364 def _fatal_error(self, exc):
365 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800366 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
367 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._close(exc)
369
370 def _close(self, exc=None):
371 self._closing = True
372 if self._buffer:
373 self._loop.remove_writer(self._fileno)
374 self._buffer.clear()
375 self._loop.remove_reader(self._fileno)
376 self._loop.call_soon(self._call_connection_lost, exc)
377
378 def _call_connection_lost(self, exc):
379 try:
380 self._protocol.connection_lost(exc)
381 finally:
382 self._pipe.close()
383 self._pipe = None
384 self._protocol = None
385 self._loop = None
386
387
Guido van Rossum59691282013-10-30 14:52:03 -0700388class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389
Guido van Rossum59691282013-10-30 14:52:03 -0700390 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700391 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700393 # Use a socket pair for stdin, since not all platforms
394 # support selecting read events on the write end of a
395 # socket (which we use in order to detect closing of the
396 # other end). Notably this is needed on AIX, and works
397 # just fine on other platforms.
398 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._proc = subprocess.Popen(
400 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
401 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700402 if stdin_w is not None:
403 stdin.close()
404 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800405
406
407class AbstractChildWatcher:
408 """Abstract base class for monitoring child processes.
409
410 Objects derived from this class monitor a collection of subprocesses and
411 report their termination or interruption by a signal.
412
413 New callbacks are registered with .add_child_handler(). Starting a new
414 process must be done within a 'with' block to allow the watcher to suspend
415 its activity until the new process if fully registered (this is needed to
416 prevent a race condition in some implementations).
417
418 Example:
419 with watcher:
420 proc = subprocess.Popen("sleep 1")
421 watcher.add_child_handler(proc.pid, callback)
422
423 Notes:
424 Implementations of this class must be thread-safe.
425
426 Since child watcher objects may catch the SIGCHLD signal and call
427 waitpid(-1), there should be only one active object per process.
428 """
429
430 def add_child_handler(self, pid, callback, *args):
431 """Register a new child handler.
432
433 Arrange for callback(pid, returncode, *args) to be called when
434 process 'pid' terminates. Specifying another callback for the same
435 process replaces the previous handler.
436
437 Note: callback() must be thread-safe
438 """
439 raise NotImplementedError()
440
441 def remove_child_handler(self, pid):
442 """Removes the handler for process 'pid'.
443
444 The function returns True if the handler was successfully removed,
445 False if there was nothing to remove."""
446
447 raise NotImplementedError()
448
Guido van Rossum2bcae702013-11-13 15:50:08 -0800449 def attach_loop(self, loop):
450 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800451
Guido van Rossum2bcae702013-11-13 15:50:08 -0800452 If the watcher was previously attached to an event loop, then it is
453 first detached before attaching to the new loop.
454
455 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800456 """
457 raise NotImplementedError()
458
459 def close(self):
460 """Close the watcher.
461
462 This must be called to make sure that any underlying resource is freed.
463 """
464 raise NotImplementedError()
465
466 def __enter__(self):
467 """Enter the watcher's context and allow starting new processes
468
469 This function must return self"""
470 raise NotImplementedError()
471
472 def __exit__(self, a, b, c):
473 """Exit the watcher's context"""
474 raise NotImplementedError()
475
476
477class BaseChildWatcher(AbstractChildWatcher):
478
Guido van Rossum2bcae702013-11-13 15:50:08 -0800479 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800480 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800481
482 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800483 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800484
485 def _do_waitpid(self, expected_pid):
486 raise NotImplementedError()
487
488 def _do_waitpid_all(self):
489 raise NotImplementedError()
490
Guido van Rossum2bcae702013-11-13 15:50:08 -0800491 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800492 assert loop is None or isinstance(loop, events.AbstractEventLoop)
493
494 if self._loop is not None:
495 self._loop.remove_signal_handler(signal.SIGCHLD)
496
497 self._loop = loop
498 if loop is not None:
499 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
500
501 # Prevent a race condition in case a child terminated
502 # during the switch.
503 self._do_waitpid_all()
504
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800505 def _sig_chld(self):
506 try:
507 self._do_waitpid_all()
508 except Exception:
509 logger.exception('Unknown exception in SIGCHLD handler')
510
511 def _compute_returncode(self, status):
512 if os.WIFSIGNALED(status):
513 # The child process died because of a signal.
514 return -os.WTERMSIG(status)
515 elif os.WIFEXITED(status):
516 # The child process exited (e.g sys.exit()).
517 return os.WEXITSTATUS(status)
518 else:
519 # The child exited, but we don't understand its status.
520 # This shouldn't happen, but if it does, let's just
521 # return that status; perhaps that helps debug it.
522 return status
523
524
525class SafeChildWatcher(BaseChildWatcher):
526 """'Safe' child watcher implementation.
527
528 This implementation avoids disrupting other code spawning processes by
529 polling explicitly each process in the SIGCHLD handler instead of calling
530 os.waitpid(-1).
531
532 This is a safe solution but it has a significant overhead when handling a
533 big number of children (O(n) each time SIGCHLD is raised)
534 """
535
Guido van Rossum2bcae702013-11-13 15:50:08 -0800536 def __init__(self):
537 super().__init__()
538 self._callbacks = {}
539
540 def close(self):
541 self._callbacks.clear()
542 super().close()
543
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800544 def __enter__(self):
545 return self
546
547 def __exit__(self, a, b, c):
548 pass
549
550 def add_child_handler(self, pid, callback, *args):
551 self._callbacks[pid] = callback, args
552
553 # Prevent a race condition in case the child is already terminated.
554 self._do_waitpid(pid)
555
Guido van Rossum2bcae702013-11-13 15:50:08 -0800556 def remove_child_handler(self, pid):
557 try:
558 del self._callbacks[pid]
559 return True
560 except KeyError:
561 return False
562
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800563 def _do_waitpid_all(self):
564
565 for pid in list(self._callbacks):
566 self._do_waitpid(pid)
567
568 def _do_waitpid(self, expected_pid):
569 assert expected_pid > 0
570
571 try:
572 pid, status = os.waitpid(expected_pid, os.WNOHANG)
573 except ChildProcessError:
574 # The child process is already reaped
575 # (may happen if waitpid() is called elsewhere).
576 pid = expected_pid
577 returncode = 255
578 logger.warning(
579 "Unknown child process pid %d, will report returncode 255",
580 pid)
581 else:
582 if pid == 0:
583 # The child process is still alive.
584 return
585
586 returncode = self._compute_returncode(status)
587
588 try:
589 callback, args = self._callbacks.pop(pid)
590 except KeyError: # pragma: no cover
591 # May happen if .remove_child_handler() is called
592 # after os.waitpid() returns.
593 pass
594 else:
595 callback(pid, returncode, *args)
596
597
598class FastChildWatcher(BaseChildWatcher):
599 """'Fast' child watcher implementation.
600
601 This implementation reaps every terminated processes by calling
602 os.waitpid(-1) directly, possibly breaking other code spawning processes
603 and waiting for their termination.
604
605 There is no noticeable overhead when handling a big number of children
606 (O(1) each time a child terminates).
607 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800608 def __init__(self):
609 super().__init__()
610 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611 self._lock = threading.Lock()
612 self._zombies = {}
613 self._forks = 0
614
615 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800616 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800617 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800618 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800619
620 def __enter__(self):
621 with self._lock:
622 self._forks += 1
623
624 return self
625
626 def __exit__(self, a, b, c):
627 with self._lock:
628 self._forks -= 1
629
630 if self._forks or not self._zombies:
631 return
632
633 collateral_victims = str(self._zombies)
634 self._zombies.clear()
635
636 logger.warning(
637 "Caught subprocesses termination from unknown pids: %s",
638 collateral_victims)
639
640 def add_child_handler(self, pid, callback, *args):
641 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800642 with self._lock:
643 try:
644 returncode = self._zombies.pop(pid)
645 except KeyError:
646 # The child is running.
647 self._callbacks[pid] = callback, args
648 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800649
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800650 # The child is dead already. We can fire the callback.
651 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800652
Guido van Rossum2bcae702013-11-13 15:50:08 -0800653 def remove_child_handler(self, pid):
654 try:
655 del self._callbacks[pid]
656 return True
657 except KeyError:
658 return False
659
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800660 def _do_waitpid_all(self):
661 # Because of signal coalescing, we must keep calling waitpid() as
662 # long as we're able to reap a child.
663 while True:
664 try:
665 pid, status = os.waitpid(-1, os.WNOHANG)
666 except ChildProcessError:
667 # No more child processes exist.
668 return
669 else:
670 if pid == 0:
671 # A child process is still alive.
672 return
673
674 returncode = self._compute_returncode(status)
675
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800676 with self._lock:
677 try:
678 callback, args = self._callbacks.pop(pid)
679 except KeyError:
680 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800681 if self._forks:
682 # It may not be registered yet.
683 self._zombies[pid] = returncode
684 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800685 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800686
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800687 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800688 logger.warning(
689 "Caught subprocess termination from unknown pid: "
690 "%d -> %d", pid, returncode)
691 else:
692 callback(pid, returncode, *args)
693
694
695class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
696 """XXX"""
697 _loop_factory = _UnixSelectorEventLoop
698
699 def __init__(self):
700 super().__init__()
701 self._watcher = None
702
703 def _init_watcher(self):
704 with events._lock:
705 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800706 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800707 if isinstance(threading.current_thread(),
708 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800709 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710
711 def set_event_loop(self, loop):
712 """Set the event loop.
713
714 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800715 .set_event_loop() from the main thread will call .attach_loop(loop) on
716 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800717 """
718
719 super().set_event_loop(loop)
720
721 if self._watcher is not None and \
722 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724
725 def get_child_watcher(self):
726 """Get the child watcher
727
728 If not yet set, a SafeChildWatcher object is automatically created.
729 """
730 if self._watcher is None:
731 self._init_watcher()
732
733 return self._watcher
734
735 def set_child_watcher(self, watcher):
736 """Set the child watcher"""
737
738 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
739
740 if self._watcher is not None:
741 self._watcher.close()
742
743 self._watcher = watcher
744
745SelectorEventLoop = _UnixSelectorEventLoop
746DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy