blob: 7a6546d136fdf7647d8fcc18f45b6551ef1829e8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010077 # Set SA_RESTART to limit EINTR occurrences.
78 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 except OSError as exc:
80 del self._signal_handlers[sig]
81 if not self._signal_handlers:
82 try:
83 signal.set_wakeup_fd(-1)
84 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070085 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
87 if exc.errno == errno.EINVAL:
88 raise RuntimeError('sig {} cannot be caught'.format(sig))
89 else:
90 raise
91
92 def _handle_signal(self, sig, arg):
93 """Internal helper that is the actual signal handler."""
94 handle = self._signal_handlers.get(sig)
95 if handle is None:
96 return # Assume it's some race condition.
97 if handle._cancelled:
98 self.remove_signal_handler(sig) # Remove it properly.
99 else:
100 self._add_callback_signalsafe(handle)
101
102 def remove_signal_handler(self, sig):
103 """Remove a handler for a signal. UNIX only.
104
105 Return True if a signal handler was removed, False if not.
106 """
107 self._check_signal(sig)
108 try:
109 del self._signal_handlers[sig]
110 except KeyError:
111 return False
112
113 if sig == signal.SIGINT:
114 handler = signal.default_int_handler
115 else:
116 handler = signal.SIG_DFL
117
118 try:
119 signal.signal(sig, handler)
120 except OSError as exc:
121 if exc.errno == errno.EINVAL:
122 raise RuntimeError('sig {} cannot be caught'.format(sig))
123 else:
124 raise
125
126 if not self._signal_handlers:
127 try:
128 signal.set_wakeup_fd(-1)
129 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700130 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
132 return True
133
134 def _check_signal(self, sig):
135 """Internal helper to validate a signal.
136
137 Raise ValueError if the signal number is invalid or uncatchable.
138 Raise RuntimeError if there is a problem setting up the handler.
139 """
140 if not isinstance(sig, int):
141 raise TypeError('sig must be an int, not {!r}'.format(sig))
142
143 if not (1 <= sig < signal.NSIG):
144 raise ValueError(
145 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
146
147 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
150
151 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
152 extra=None):
153 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
154
155 @tasks.coroutine
156 def _make_subprocess_transport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 with events.get_child_watcher() as watcher:
160 transp = _UnixSubprocessTransport(self, protocol, args, shell,
161 stdin, stdout, stderr, bufsize,
162 extra=None, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800163 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 watcher.add_child_handler(transp.get_pid(),
165 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800166
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 return transp
168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _child_watcher_callback(self, pid, returncode, transp):
170 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800172 def _subprocess_closed(self, transp):
173 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175
176def _set_nonblocking(fd):
177 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
178 flags = flags | os.O_NONBLOCK
179 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
180
181
182class _UnixReadPipeTransport(transports.ReadTransport):
183
184 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
185
186 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
187 super().__init__(extra)
188 self._extra['pipe'] = pipe
189 self._loop = loop
190 self._pipe = pipe
191 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700192 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800193 if not (stat.S_ISFIFO(mode) or
194 stat.S_ISSOCK(mode) or
195 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700196 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197 _set_nonblocking(self._fileno)
198 self._protocol = protocol
199 self._closing = False
200 self._loop.add_reader(self._fileno, self._read_ready)
201 self._loop.call_soon(self._protocol.connection_made, self)
202 if waiter is not None:
203 self._loop.call_soon(waiter.set_result, None)
204
205 def _read_ready(self):
206 try:
207 data = os.read(self._fileno, self.max_size)
208 except (BlockingIOError, InterruptedError):
209 pass
210 except OSError as exc:
211 self._fatal_error(exc)
212 else:
213 if data:
214 self._protocol.data_received(data)
215 else:
216 self._closing = True
217 self._loop.remove_reader(self._fileno)
218 self._loop.call_soon(self._protocol.eof_received)
219 self._loop.call_soon(self._call_connection_lost, None)
220
Guido van Rossum57497ad2013-10-18 07:58:20 -0700221 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 self._loop.remove_reader(self._fileno)
223
Guido van Rossum57497ad2013-10-18 07:58:20 -0700224 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 self._loop.add_reader(self._fileno, self._read_ready)
226
227 def close(self):
228 if not self._closing:
229 self._close(None)
230
231 def _fatal_error(self, exc):
232 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800233 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
234 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 self._close(exc)
236
237 def _close(self, exc):
238 self._closing = True
239 self._loop.remove_reader(self._fileno)
240 self._loop.call_soon(self._call_connection_lost, exc)
241
242 def _call_connection_lost(self, exc):
243 try:
244 self._protocol.connection_lost(exc)
245 finally:
246 self._pipe.close()
247 self._pipe = None
248 self._protocol = None
249 self._loop = None
250
251
252class _UnixWritePipeTransport(transports.WriteTransport):
253
254 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
255 super().__init__(extra)
256 self._extra['pipe'] = pipe
257 self._loop = loop
258 self._pipe = pipe
259 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700260 mode = os.fstat(self._fileno).st_mode
261 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100262 if not (is_socket or
263 stat.S_ISFIFO(mode) or
264 stat.S_ISCHR(mode)):
265 raise ValueError("Pipe transport is only for "
266 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 _set_nonblocking(self._fileno)
268 self._protocol = protocol
269 self._buffer = []
270 self._conn_lost = 0
271 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700272
273 # On AIX, the reader trick only works for sockets.
274 # On other platforms it works for pipes and sockets.
275 # (Exception: OS X 10.4? Issue #19294.)
276 if is_socket or not sys.platform.startswith("aix"):
277 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278
279 self._loop.call_soon(self._protocol.connection_made, self)
280 if waiter is not None:
281 self._loop.call_soon(waiter.set_result, None)
282
283 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700284 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 self._close()
286
287 def write(self, data):
288 assert isinstance(data, bytes), repr(data)
289 if not data:
290 return
291
292 if self._conn_lost or self._closing:
293 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700294 logger.warning('pipe closed by peer or '
295 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 self._conn_lost += 1
297 return
298
299 if not self._buffer:
300 # Attempt to send it right away first.
301 try:
302 n = os.write(self._fileno, data)
303 except (BlockingIOError, InterruptedError):
304 n = 0
305 except Exception as exc:
306 self._conn_lost += 1
307 self._fatal_error(exc)
308 return
309 if n == len(data):
310 return
311 elif n > 0:
312 data = data[n:]
313 self._loop.add_writer(self._fileno, self._write_ready)
314
315 self._buffer.append(data)
316
317 def _write_ready(self):
318 data = b''.join(self._buffer)
319 assert data, 'Data should not be empty'
320
321 self._buffer.clear()
322 try:
323 n = os.write(self._fileno, data)
324 except (BlockingIOError, InterruptedError):
325 self._buffer.append(data)
326 except Exception as exc:
327 self._conn_lost += 1
328 # Remove writer here, _fatal_error() doesn't it
329 # because _buffer is empty.
330 self._loop.remove_writer(self._fileno)
331 self._fatal_error(exc)
332 else:
333 if n == len(data):
334 self._loop.remove_writer(self._fileno)
335 if self._closing:
336 self._loop.remove_reader(self._fileno)
337 self._call_connection_lost(None)
338 return
339 elif n > 0:
340 data = data[n:]
341
342 self._buffer.append(data) # Try again later.
343
344 def can_write_eof(self):
345 return True
346
347 # TODO: Make the relationships between write_eof(), close(),
348 # abort(), _fatal_error() and _close() more straightforward.
349
350 def write_eof(self):
351 if self._closing:
352 return
353 assert self._pipe
354 self._closing = True
355 if not self._buffer:
356 self._loop.remove_reader(self._fileno)
357 self._loop.call_soon(self._call_connection_lost, None)
358
359 def close(self):
360 if not self._closing:
361 # write_eof is all what we needed to close the write pipe
362 self.write_eof()
363
364 def abort(self):
365 self._close(None)
366
367 def _fatal_error(self, exc):
368 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700369 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 self._close(exc)
371
372 def _close(self, exc=None):
373 self._closing = True
374 if self._buffer:
375 self._loop.remove_writer(self._fileno)
376 self._buffer.clear()
377 self._loop.remove_reader(self._fileno)
378 self._loop.call_soon(self._call_connection_lost, exc)
379
380 def _call_connection_lost(self, exc):
381 try:
382 self._protocol.connection_lost(exc)
383 finally:
384 self._pipe.close()
385 self._pipe = None
386 self._protocol = None
387 self._loop = None
388
389
Guido van Rossum59691282013-10-30 14:52:03 -0700390class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
Guido van Rossum59691282013-10-30 14:52:03 -0700392 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700393 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700395 # Use a socket pair for stdin, since not all platforms
396 # support selecting read events on the write end of a
397 # socket (which we use in order to detect closing of the
398 # other end). Notably this is needed on AIX, and works
399 # just fine on other platforms.
400 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 self._proc = subprocess.Popen(
402 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
403 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700404 if stdin_w is not None:
405 stdin.close()
406 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800407
408
409class AbstractChildWatcher:
410 """Abstract base class for monitoring child processes.
411
412 Objects derived from this class monitor a collection of subprocesses and
413 report their termination or interruption by a signal.
414
415 New callbacks are registered with .add_child_handler(). Starting a new
416 process must be done within a 'with' block to allow the watcher to suspend
417 its activity until the new process if fully registered (this is needed to
418 prevent a race condition in some implementations).
419
420 Example:
421 with watcher:
422 proc = subprocess.Popen("sleep 1")
423 watcher.add_child_handler(proc.pid, callback)
424
425 Notes:
426 Implementations of this class must be thread-safe.
427
428 Since child watcher objects may catch the SIGCHLD signal and call
429 waitpid(-1), there should be only one active object per process.
430 """
431
432 def add_child_handler(self, pid, callback, *args):
433 """Register a new child handler.
434
435 Arrange for callback(pid, returncode, *args) to be called when
436 process 'pid' terminates. Specifying another callback for the same
437 process replaces the previous handler.
438
439 Note: callback() must be thread-safe
440 """
441 raise NotImplementedError()
442
443 def remove_child_handler(self, pid):
444 """Removes the handler for process 'pid'.
445
446 The function returns True if the handler was successfully removed,
447 False if there was nothing to remove."""
448
449 raise NotImplementedError()
450
Guido van Rossum2bcae702013-11-13 15:50:08 -0800451 def attach_loop(self, loop):
452 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800453
Guido van Rossum2bcae702013-11-13 15:50:08 -0800454 If the watcher was previously attached to an event loop, then it is
455 first detached before attaching to the new loop.
456
457 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800458 """
459 raise NotImplementedError()
460
461 def close(self):
462 """Close the watcher.
463
464 This must be called to make sure that any underlying resource is freed.
465 """
466 raise NotImplementedError()
467
468 def __enter__(self):
469 """Enter the watcher's context and allow starting new processes
470
471 This function must return self"""
472 raise NotImplementedError()
473
474 def __exit__(self, a, b, c):
475 """Exit the watcher's context"""
476 raise NotImplementedError()
477
478
479class BaseChildWatcher(AbstractChildWatcher):
480
Guido van Rossum2bcae702013-11-13 15:50:08 -0800481 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800482 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800483
484 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800485 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800486
487 def _do_waitpid(self, expected_pid):
488 raise NotImplementedError()
489
490 def _do_waitpid_all(self):
491 raise NotImplementedError()
492
Guido van Rossum2bcae702013-11-13 15:50:08 -0800493 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800494 assert loop is None or isinstance(loop, events.AbstractEventLoop)
495
496 if self._loop is not None:
497 self._loop.remove_signal_handler(signal.SIGCHLD)
498
499 self._loop = loop
500 if loop is not None:
501 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
502
503 # Prevent a race condition in case a child terminated
504 # during the switch.
505 self._do_waitpid_all()
506
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800507 def _sig_chld(self):
508 try:
509 self._do_waitpid_all()
510 except Exception:
511 logger.exception('Unknown exception in SIGCHLD handler')
512
513 def _compute_returncode(self, status):
514 if os.WIFSIGNALED(status):
515 # The child process died because of a signal.
516 return -os.WTERMSIG(status)
517 elif os.WIFEXITED(status):
518 # The child process exited (e.g sys.exit()).
519 return os.WEXITSTATUS(status)
520 else:
521 # The child exited, but we don't understand its status.
522 # This shouldn't happen, but if it does, let's just
523 # return that status; perhaps that helps debug it.
524 return status
525
526
527class SafeChildWatcher(BaseChildWatcher):
528 """'Safe' child watcher implementation.
529
530 This implementation avoids disrupting other code spawning processes by
531 polling explicitly each process in the SIGCHLD handler instead of calling
532 os.waitpid(-1).
533
534 This is a safe solution but it has a significant overhead when handling a
535 big number of children (O(n) each time SIGCHLD is raised)
536 """
537
Guido van Rossum2bcae702013-11-13 15:50:08 -0800538 def __init__(self):
539 super().__init__()
540 self._callbacks = {}
541
542 def close(self):
543 self._callbacks.clear()
544 super().close()
545
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800546 def __enter__(self):
547 return self
548
549 def __exit__(self, a, b, c):
550 pass
551
552 def add_child_handler(self, pid, callback, *args):
553 self._callbacks[pid] = callback, args
554
555 # Prevent a race condition in case the child is already terminated.
556 self._do_waitpid(pid)
557
Guido van Rossum2bcae702013-11-13 15:50:08 -0800558 def remove_child_handler(self, pid):
559 try:
560 del self._callbacks[pid]
561 return True
562 except KeyError:
563 return False
564
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800565 def _do_waitpid_all(self):
566
567 for pid in list(self._callbacks):
568 self._do_waitpid(pid)
569
570 def _do_waitpid(self, expected_pid):
571 assert expected_pid > 0
572
573 try:
574 pid, status = os.waitpid(expected_pid, os.WNOHANG)
575 except ChildProcessError:
576 # The child process is already reaped
577 # (may happen if waitpid() is called elsewhere).
578 pid = expected_pid
579 returncode = 255
580 logger.warning(
581 "Unknown child process pid %d, will report returncode 255",
582 pid)
583 else:
584 if pid == 0:
585 # The child process is still alive.
586 return
587
588 returncode = self._compute_returncode(status)
589
590 try:
591 callback, args = self._callbacks.pop(pid)
592 except KeyError: # pragma: no cover
593 # May happen if .remove_child_handler() is called
594 # after os.waitpid() returns.
595 pass
596 else:
597 callback(pid, returncode, *args)
598
599
600class FastChildWatcher(BaseChildWatcher):
601 """'Fast' child watcher implementation.
602
603 This implementation reaps every terminated processes by calling
604 os.waitpid(-1) directly, possibly breaking other code spawning processes
605 and waiting for their termination.
606
607 There is no noticeable overhead when handling a big number of children
608 (O(1) each time a child terminates).
609 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800610 def __init__(self):
611 super().__init__()
612 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613 self._lock = threading.Lock()
614 self._zombies = {}
615 self._forks = 0
616
617 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800618 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800619 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800620 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800621
622 def __enter__(self):
623 with self._lock:
624 self._forks += 1
625
626 return self
627
628 def __exit__(self, a, b, c):
629 with self._lock:
630 self._forks -= 1
631
632 if self._forks or not self._zombies:
633 return
634
635 collateral_victims = str(self._zombies)
636 self._zombies.clear()
637
638 logger.warning(
639 "Caught subprocesses termination from unknown pids: %s",
640 collateral_victims)
641
642 def add_child_handler(self, pid, callback, *args):
643 assert self._forks, "Must use the context manager"
644
645 self._callbacks[pid] = callback, args
646
647 try:
648 # Ensure that the child is not already terminated.
649 # (raise KeyError if still alive)
650 returncode = self._zombies.pop(pid)
651
652 # Child is dead, therefore we can fire the callback immediately.
653 # First we remove it from the dict.
654 # (raise KeyError if .remove_child_handler() was called in-between)
655 del self._callbacks[pid]
656 except KeyError:
657 pass
658 else:
659 callback(pid, returncode, *args)
660
Guido van Rossum2bcae702013-11-13 15:50:08 -0800661 def remove_child_handler(self, pid):
662 try:
663 del self._callbacks[pid]
664 return True
665 except KeyError:
666 return False
667
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800668 def _do_waitpid_all(self):
669 # Because of signal coalescing, we must keep calling waitpid() as
670 # long as we're able to reap a child.
671 while True:
672 try:
673 pid, status = os.waitpid(-1, os.WNOHANG)
674 except ChildProcessError:
675 # No more child processes exist.
676 return
677 else:
678 if pid == 0:
679 # A child process is still alive.
680 return
681
682 returncode = self._compute_returncode(status)
683
684 try:
685 callback, args = self._callbacks.pop(pid)
686 except KeyError:
687 # unknown child
688 with self._lock:
689 if self._forks:
690 # It may not be registered yet.
691 self._zombies[pid] = returncode
692 continue
693
694 logger.warning(
695 "Caught subprocess termination from unknown pid: "
696 "%d -> %d", pid, returncode)
697 else:
698 callback(pid, returncode, *args)
699
700
701class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
702 """XXX"""
703 _loop_factory = _UnixSelectorEventLoop
704
705 def __init__(self):
706 super().__init__()
707 self._watcher = None
708
709 def _init_watcher(self):
710 with events._lock:
711 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713 if isinstance(threading.current_thread(),
714 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800715 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800716
717 def set_event_loop(self, loop):
718 """Set the event loop.
719
720 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800721 .set_event_loop() from the main thread will call .attach_loop(loop) on
722 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723 """
724
725 super().set_event_loop(loop)
726
727 if self._watcher is not None and \
728 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800729 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730
731 def get_child_watcher(self):
732 """Get the child watcher
733
734 If not yet set, a SafeChildWatcher object is automatically created.
735 """
736 if self._watcher is None:
737 self._init_watcher()
738
739 return self._watcher
740
741 def set_child_watcher(self, watcher):
742 """Set the child watcher"""
743
744 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
745
746 if self._watcher is not None:
747 self._watcher.close()
748
749 self._watcher = watcher
750
751SelectorEventLoop = _UnixSelectorEventLoop
752DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy