blob: 24da3274f8df90064ea6891bd6fb994261f7deb6 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
162 extra=None, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800172 def _subprocess_closed(self, transp):
173 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175
176def _set_nonblocking(fd):
177 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
178 flags = flags | os.O_NONBLOCK
179 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
180
181
182class _UnixReadPipeTransport(transports.ReadTransport):
183
184 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
185
186 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
187 super().__init__(extra)
188 self._extra['pipe'] = pipe
189 self._loop = loop
190 self._pipe = pipe
191 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700192 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800193 if not (stat.S_ISFIFO(mode) or
194 stat.S_ISSOCK(mode) or
195 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700196 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 _set_nonblocking(self._fileno)
198 self._protocol = protocol
199 self._closing = False
200 self._loop.add_reader(self._fileno, self._read_ready)
201 self._loop.call_soon(self._protocol.connection_made, self)
202 if waiter is not None:
203 self._loop.call_soon(waiter.set_result, None)
204
205 def _read_ready(self):
206 try:
207 data = os.read(self._fileno, self.max_size)
208 except (BlockingIOError, InterruptedError):
209 pass
210 except OSError as exc:
211 self._fatal_error(exc)
212 else:
213 if data:
214 self._protocol.data_received(data)
215 else:
216 self._closing = True
217 self._loop.remove_reader(self._fileno)
218 self._loop.call_soon(self._protocol.eof_received)
219 self._loop.call_soon(self._call_connection_lost, None)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.remove_reader(self._fileno)
223
Guido van Rossum57497ad2013-10-18 07:58:20 -0700224 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 self._loop.add_reader(self._fileno, self._read_ready)
226
227 def close(self):
228 if not self._closing:
229 self._close(None)
230
231 def _fatal_error(self, exc):
232 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800233 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
234 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 self._close(exc)
236
237 def _close(self, exc):
238 self._closing = True
239 self._loop.remove_reader(self._fileno)
240 self._loop.call_soon(self._call_connection_lost, exc)
241
242 def _call_connection_lost(self, exc):
243 try:
244 self._protocol.connection_lost(exc)
245 finally:
246 self._pipe.close()
247 self._pipe = None
248 self._protocol = None
249 self._loop = None
250
251
252class _UnixWritePipeTransport(transports.WriteTransport):
253
254 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
255 super().__init__(extra)
256 self._extra['pipe'] = pipe
257 self._loop = loop
258 self._pipe = pipe
259 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700260 mode = os.fstat(self._fileno).st_mode
261 is_socket = stat.S_ISSOCK(mode)
262 is_pipe = stat.S_ISFIFO(mode)
263 if not (is_socket or is_pipe):
264 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 _set_nonblocking(self._fileno)
266 self._protocol = protocol
267 self._buffer = []
268 self._conn_lost = 0
269 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700270
271 # On AIX, the reader trick only works for sockets.
272 # On other platforms it works for pipes and sockets.
273 # (Exception: OS X 10.4? Issue #19294.)
274 if is_socket or not sys.platform.startswith("aix"):
275 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 self._loop.call_soon(self._protocol.connection_made, self)
278 if waiter is not None:
279 self._loop.call_soon(waiter.set_result, None)
280
281 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700282 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 self._close()
284
285 def write(self, data):
286 assert isinstance(data, bytes), repr(data)
287 if not data:
288 return
289
290 if self._conn_lost or self._closing:
291 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700292 logger.warning('pipe closed by peer or '
293 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 self._conn_lost += 1
295 return
296
297 if not self._buffer:
298 # Attempt to send it right away first.
299 try:
300 n = os.write(self._fileno, data)
301 except (BlockingIOError, InterruptedError):
302 n = 0
303 except Exception as exc:
304 self._conn_lost += 1
305 self._fatal_error(exc)
306 return
307 if n == len(data):
308 return
309 elif n > 0:
310 data = data[n:]
311 self._loop.add_writer(self._fileno, self._write_ready)
312
313 self._buffer.append(data)
314
315 def _write_ready(self):
316 data = b''.join(self._buffer)
317 assert data, 'Data should not be empty'
318
319 self._buffer.clear()
320 try:
321 n = os.write(self._fileno, data)
322 except (BlockingIOError, InterruptedError):
323 self._buffer.append(data)
324 except Exception as exc:
325 self._conn_lost += 1
326 # Remove writer here, _fatal_error() doesn't it
327 # because _buffer is empty.
328 self._loop.remove_writer(self._fileno)
329 self._fatal_error(exc)
330 else:
331 if n == len(data):
332 self._loop.remove_writer(self._fileno)
333 if self._closing:
334 self._loop.remove_reader(self._fileno)
335 self._call_connection_lost(None)
336 return
337 elif n > 0:
338 data = data[n:]
339
340 self._buffer.append(data) # Try again later.
341
342 def can_write_eof(self):
343 return True
344
345 # TODO: Make the relationships between write_eof(), close(),
346 # abort(), _fatal_error() and _close() more straightforward.
347
348 def write_eof(self):
349 if self._closing:
350 return
351 assert self._pipe
352 self._closing = True
353 if not self._buffer:
354 self._loop.remove_reader(self._fileno)
355 self._loop.call_soon(self._call_connection_lost, None)
356
357 def close(self):
358 if not self._closing:
359 # write_eof is all what we needed to close the write pipe
360 self.write_eof()
361
362 def abort(self):
363 self._close(None)
364
365 def _fatal_error(self, exc):
366 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700367 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._close(exc)
369
370 def _close(self, exc=None):
371 self._closing = True
372 if self._buffer:
373 self._loop.remove_writer(self._fileno)
374 self._buffer.clear()
375 self._loop.remove_reader(self._fileno)
376 self._loop.call_soon(self._call_connection_lost, exc)
377
378 def _call_connection_lost(self, exc):
379 try:
380 self._protocol.connection_lost(exc)
381 finally:
382 self._pipe.close()
383 self._pipe = None
384 self._protocol = None
385 self._loop = None
386
387
Guido van Rossum59691282013-10-30 14:52:03 -0700388class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389
Guido van Rossum59691282013-10-30 14:52:03 -0700390 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700391 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700393 # Use a socket pair for stdin, since not all platforms
394 # support selecting read events on the write end of a
395 # socket (which we use in order to detect closing of the
396 # other end). Notably this is needed on AIX, and works
397 # just fine on other platforms.
398 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._proc = subprocess.Popen(
400 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
401 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700402 if stdin_w is not None:
403 stdin.close()
404 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800405
406
407class AbstractChildWatcher:
408 """Abstract base class for monitoring child processes.
409
410 Objects derived from this class monitor a collection of subprocesses and
411 report their termination or interruption by a signal.
412
413 New callbacks are registered with .add_child_handler(). Starting a new
414 process must be done within a 'with' block to allow the watcher to suspend
415 its activity until the new process if fully registered (this is needed to
416 prevent a race condition in some implementations).
417
418 Example:
419 with watcher:
420 proc = subprocess.Popen("sleep 1")
421 watcher.add_child_handler(proc.pid, callback)
422
423 Notes:
424 Implementations of this class must be thread-safe.
425
426 Since child watcher objects may catch the SIGCHLD signal and call
427 waitpid(-1), there should be only one active object per process.
428 """
429
430 def add_child_handler(self, pid, callback, *args):
431 """Register a new child handler.
432
433 Arrange for callback(pid, returncode, *args) to be called when
434 process 'pid' terminates. Specifying another callback for the same
435 process replaces the previous handler.
436
437 Note: callback() must be thread-safe
438 """
439 raise NotImplementedError()
440
441 def remove_child_handler(self, pid):
442 """Removes the handler for process 'pid'.
443
444 The function returns True if the handler was successfully removed,
445 False if there was nothing to remove."""
446
447 raise NotImplementedError()
448
Guido van Rossum2bcae702013-11-13 15:50:08 -0800449 def attach_loop(self, loop):
450 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800451
Guido van Rossum2bcae702013-11-13 15:50:08 -0800452 If the watcher was previously attached to an event loop, then it is
453 first detached before attaching to the new loop.
454
455 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800456 """
457 raise NotImplementedError()
458
459 def close(self):
460 """Close the watcher.
461
462 This must be called to make sure that any underlying resource is freed.
463 """
464 raise NotImplementedError()
465
466 def __enter__(self):
467 """Enter the watcher's context and allow starting new processes
468
469 This function must return self"""
470 raise NotImplementedError()
471
472 def __exit__(self, a, b, c):
473 """Exit the watcher's context"""
474 raise NotImplementedError()
475
476
477class BaseChildWatcher(AbstractChildWatcher):
478
Guido van Rossum2bcae702013-11-13 15:50:08 -0800479 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800480 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800481
482 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800483 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800484
485 def _do_waitpid(self, expected_pid):
486 raise NotImplementedError()
487
488 def _do_waitpid_all(self):
489 raise NotImplementedError()
490
Guido van Rossum2bcae702013-11-13 15:50:08 -0800491 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800492 assert loop is None or isinstance(loop, events.AbstractEventLoop)
493
494 if self._loop is not None:
495 self._loop.remove_signal_handler(signal.SIGCHLD)
496
497 self._loop = loop
498 if loop is not None:
499 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
500
501 # Prevent a race condition in case a child terminated
502 # during the switch.
503 self._do_waitpid_all()
504
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800505 def _sig_chld(self):
506 try:
507 self._do_waitpid_all()
508 except Exception:
509 logger.exception('Unknown exception in SIGCHLD handler')
510
511 def _compute_returncode(self, status):
512 if os.WIFSIGNALED(status):
513 # The child process died because of a signal.
514 return -os.WTERMSIG(status)
515 elif os.WIFEXITED(status):
516 # The child process exited (e.g sys.exit()).
517 return os.WEXITSTATUS(status)
518 else:
519 # The child exited, but we don't understand its status.
520 # This shouldn't happen, but if it does, let's just
521 # return that status; perhaps that helps debug it.
522 return status
523
524
525class SafeChildWatcher(BaseChildWatcher):
526 """'Safe' child watcher implementation.
527
528 This implementation avoids disrupting other code spawning processes by
529 polling explicitly each process in the SIGCHLD handler instead of calling
530 os.waitpid(-1).
531
532 This is a safe solution but it has a significant overhead when handling a
533 big number of children (O(n) each time SIGCHLD is raised)
534 """
535
Guido van Rossum2bcae702013-11-13 15:50:08 -0800536 def __init__(self):
537 super().__init__()
538 self._callbacks = {}
539
540 def close(self):
541 self._callbacks.clear()
542 super().close()
543
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800544 def __enter__(self):
545 return self
546
547 def __exit__(self, a, b, c):
548 pass
549
550 def add_child_handler(self, pid, callback, *args):
551 self._callbacks[pid] = callback, args
552
553 # Prevent a race condition in case the child is already terminated.
554 self._do_waitpid(pid)
555
Guido van Rossum2bcae702013-11-13 15:50:08 -0800556 def remove_child_handler(self, pid):
557 try:
558 del self._callbacks[pid]
559 return True
560 except KeyError:
561 return False
562
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800563 def _do_waitpid_all(self):
564
565 for pid in list(self._callbacks):
566 self._do_waitpid(pid)
567
568 def _do_waitpid(self, expected_pid):
569 assert expected_pid > 0
570
571 try:
572 pid, status = os.waitpid(expected_pid, os.WNOHANG)
573 except ChildProcessError:
574 # The child process is already reaped
575 # (may happen if waitpid() is called elsewhere).
576 pid = expected_pid
577 returncode = 255
578 logger.warning(
579 "Unknown child process pid %d, will report returncode 255",
580 pid)
581 else:
582 if pid == 0:
583 # The child process is still alive.
584 return
585
586 returncode = self._compute_returncode(status)
587
588 try:
589 callback, args = self._callbacks.pop(pid)
590 except KeyError: # pragma: no cover
591 # May happen if .remove_child_handler() is called
592 # after os.waitpid() returns.
593 pass
594 else:
595 callback(pid, returncode, *args)
596
597
598class FastChildWatcher(BaseChildWatcher):
599 """'Fast' child watcher implementation.
600
601 This implementation reaps every terminated processes by calling
602 os.waitpid(-1) directly, possibly breaking other code spawning processes
603 and waiting for their termination.
604
605 There is no noticeable overhead when handling a big number of children
606 (O(1) each time a child terminates).
607 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800608 def __init__(self):
609 super().__init__()
610 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611 self._lock = threading.Lock()
612 self._zombies = {}
613 self._forks = 0
614
615 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800616 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800617 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800618 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800619
620 def __enter__(self):
621 with self._lock:
622 self._forks += 1
623
624 return self
625
626 def __exit__(self, a, b, c):
627 with self._lock:
628 self._forks -= 1
629
630 if self._forks or not self._zombies:
631 return
632
633 collateral_victims = str(self._zombies)
634 self._zombies.clear()
635
636 logger.warning(
637 "Caught subprocesses termination from unknown pids: %s",
638 collateral_victims)
639
640 def add_child_handler(self, pid, callback, *args):
641 assert self._forks, "Must use the context manager"
642
643 self._callbacks[pid] = callback, args
644
645 try:
646 # Ensure that the child is not already terminated.
647 # (raise KeyError if still alive)
648 returncode = self._zombies.pop(pid)
649
650 # Child is dead, therefore we can fire the callback immediately.
651 # First we remove it from the dict.
652 # (raise KeyError if .remove_child_handler() was called in-between)
653 del self._callbacks[pid]
654 except KeyError:
655 pass
656 else:
657 callback(pid, returncode, *args)
658
Guido van Rossum2bcae702013-11-13 15:50:08 -0800659 def remove_child_handler(self, pid):
660 try:
661 del self._callbacks[pid]
662 return True
663 except KeyError:
664 return False
665
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666 def _do_waitpid_all(self):
667 # Because of signal coalescing, we must keep calling waitpid() as
668 # long as we're able to reap a child.
669 while True:
670 try:
671 pid, status = os.waitpid(-1, os.WNOHANG)
672 except ChildProcessError:
673 # No more child processes exist.
674 return
675 else:
676 if pid == 0:
677 # A child process is still alive.
678 return
679
680 returncode = self._compute_returncode(status)
681
682 try:
683 callback, args = self._callbacks.pop(pid)
684 except KeyError:
685 # unknown child
686 with self._lock:
687 if self._forks:
688 # It may not be registered yet.
689 self._zombies[pid] = returncode
690 continue
691
692 logger.warning(
693 "Caught subprocess termination from unknown pid: "
694 "%d -> %d", pid, returncode)
695 else:
696 callback(pid, returncode, *args)
697
698
699class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
700 """XXX"""
701 _loop_factory = _UnixSelectorEventLoop
702
703 def __init__(self):
704 super().__init__()
705 self._watcher = None
706
707 def _init_watcher(self):
708 with events._lock:
709 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800710 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800711 if isinstance(threading.current_thread(),
712 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800713 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800714
715 def set_event_loop(self, loop):
716 """Set the event loop.
717
718 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800719 .set_event_loop() from the main thread will call .attach_loop(loop) on
720 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800721 """
722
723 super().set_event_loop(loop)
724
725 if self._watcher is not None and \
726 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800727 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800728
729 def get_child_watcher(self):
730 """Get the child watcher
731
732 If not yet set, a SafeChildWatcher object is automatically created.
733 """
734 if self._watcher is None:
735 self._init_watcher()
736
737 return self._watcher
738
739 def set_child_watcher(self, watcher):
740 """Set the child watcher"""
741
742 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
743
744 if self._watcher is not None:
745 self._watcher.close()
746
747 self._watcher = watcher
748
749SelectorEventLoop = _UnixSelectorEventLoop
750DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy