blob: e0d75077518278e7261d3a1c7f48799564c1295e [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanov88a5bf02014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
18from . import protocols
19from . import selector_events
20from . import tasks
21from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080034class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanov88a5bf02014-02-18 12:15:06 -050035 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Yury Selivanov88a5bf02014-02-18 12:15:06 -050037 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 """
39
40 def __init__(self, selector=None):
41 super().__init__(selector)
42 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44 def _socketpair(self):
45 return socket.socketpair()
46
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080047 def close(self):
48 for sig in list(self._signal_handlers):
49 self.remove_signal_handler(sig)
50 super().close()
51
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 def add_signal_handler(self, sig, callback, *args):
53 """Add a handler for a signal. UNIX only.
54
55 Raise ValueError if the signal number is invalid or uncatchable.
56 Raise RuntimeError if there is a problem setting up the handler.
57 """
58 self._check_signal(sig)
59 try:
60 # set_wakeup_fd() raises ValueError if this is not the
61 # main thread. By calling it early we ensure that an
62 # event loop running in another thread cannot add a signal
63 # handler.
64 signal.set_wakeup_fd(self._csock.fileno())
65 except ValueError as exc:
66 raise RuntimeError(str(exc))
67
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010068 handle = events.Handle(callback, args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 self._signal_handlers[sig] = handle
70
71 try:
72 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010073 # Set SA_RESTART to limit EINTR occurrences.
74 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 except OSError as exc:
76 del self._signal_handlers[sig]
77 if not self._signal_handlers:
78 try:
79 signal.set_wakeup_fd(-1)
80 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070081 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082
83 if exc.errno == errno.EINVAL:
84 raise RuntimeError('sig {} cannot be caught'.format(sig))
85 else:
86 raise
87
88 def _handle_signal(self, sig, arg):
89 """Internal helper that is the actual signal handler."""
90 handle = self._signal_handlers.get(sig)
91 if handle is None:
92 return # Assume it's some race condition.
93 if handle._cancelled:
94 self.remove_signal_handler(sig) # Remove it properly.
95 else:
96 self._add_callback_signalsafe(handle)
97
98 def remove_signal_handler(self, sig):
99 """Remove a handler for a signal. UNIX only.
100
101 Return True if a signal handler was removed, False if not.
102 """
103 self._check_signal(sig)
104 try:
105 del self._signal_handlers[sig]
106 except KeyError:
107 return False
108
109 if sig == signal.SIGINT:
110 handler = signal.default_int_handler
111 else:
112 handler = signal.SIG_DFL
113
114 try:
115 signal.signal(sig, handler)
116 except OSError as exc:
117 if exc.errno == errno.EINVAL:
118 raise RuntimeError('sig {} cannot be caught'.format(sig))
119 else:
120 raise
121
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 return True
129
130 def _check_signal(self, sig):
131 """Internal helper to validate a signal.
132
133 Raise ValueError if the signal number is invalid or uncatchable.
134 Raise RuntimeError if there is a problem setting up the handler.
135 """
136 if not isinstance(sig, int):
137 raise TypeError('sig must be an int, not {!r}'.format(sig))
138
139 if not (1 <= sig < signal.NSIG):
140 raise ValueError(
141 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
142
143 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
144 extra=None):
145 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
150
151 @tasks.coroutine
152 def _make_subprocess_transport(self, protocol, args, shell,
153 stdin, stdout, stderr, bufsize,
154 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800155 with events.get_child_watcher() as watcher:
156 transp = _UnixSubprocessTransport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800158 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800159 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800160 watcher.add_child_handler(transp.get_pid(),
161 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 return transp
164
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800165 def _child_watcher_callback(self, pid, returncode, transp):
166 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167
Yury Selivanov88a5bf02014-02-18 12:15:06 -0500168 @tasks.coroutine
169 def create_unix_connection(self, protocol_factory, path, *,
170 ssl=None, sock=None,
171 server_hostname=None):
172 assert server_hostname is None or isinstance(server_hostname, str)
173 if ssl:
174 if server_hostname is None:
175 raise ValueError(
176 'you have to pass server_hostname when using ssl')
177 else:
178 if server_hostname is not None:
179 raise ValueError('server_hostname is only meaningful with ssl')
180
181 if path is not None:
182 if sock is not None:
183 raise ValueError(
184 'path and sock can not be specified at the same time')
185
186 try:
187 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
188 sock.setblocking(False)
189 yield from self.sock_connect(sock, path)
190 except OSError:
191 if sock is not None:
192 sock.close()
193 raise
194
195 else:
196 if sock is None:
197 raise ValueError('no path and sock were specified')
198 sock.setblocking(False)
199
200 transport, protocol = yield from self._create_connection_transport(
201 sock, protocol_factory, ssl, server_hostname)
202 return transport, protocol
203
204 @tasks.coroutine
205 def create_unix_server(self, protocol_factory, path=None, *,
206 sock=None, backlog=100, ssl=None):
207 if isinstance(ssl, bool):
208 raise TypeError('ssl argument must be an SSLContext or None')
209
210 if path is not None:
211 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
212
213 try:
214 sock.bind(path)
215 except OSError as exc:
216 if exc.errno == errno.EADDRINUSE:
217 # Let's improve the error message by adding
218 # with what exact address it occurs.
219 msg = 'Address {!r} is already in use'.format(path)
220 raise OSError(errno.EADDRINUSE, msg) from None
221 else:
222 raise
223 else:
224 if sock is None:
225 raise ValueError(
226 'path was not specified, and no sock specified')
227
228 if sock.family != socket.AF_UNIX:
229 raise ValueError(
230 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
231
232 server = base_events.Server(self, [sock])
233 sock.listen(backlog)
234 sock.setblocking(False)
235 self._start_serving(protocol_factory, sock, ssl, server)
236 return server
237
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239def _set_nonblocking(fd):
240 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
241 flags = flags | os.O_NONBLOCK
242 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
243
244
245class _UnixReadPipeTransport(transports.ReadTransport):
246
247 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
248
249 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
250 super().__init__(extra)
251 self._extra['pipe'] = pipe
252 self._loop = loop
253 self._pipe = pipe
254 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700255 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800256 if not (stat.S_ISFIFO(mode) or
257 stat.S_ISSOCK(mode) or
258 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700259 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 _set_nonblocking(self._fileno)
261 self._protocol = protocol
262 self._closing = False
263 self._loop.add_reader(self._fileno, self._read_ready)
264 self._loop.call_soon(self._protocol.connection_made, self)
265 if waiter is not None:
266 self._loop.call_soon(waiter.set_result, None)
267
268 def _read_ready(self):
269 try:
270 data = os.read(self._fileno, self.max_size)
271 except (BlockingIOError, InterruptedError):
272 pass
273 except OSError as exc:
274 self._fatal_error(exc)
275 else:
276 if data:
277 self._protocol.data_received(data)
278 else:
279 self._closing = True
280 self._loop.remove_reader(self._fileno)
281 self._loop.call_soon(self._protocol.eof_received)
282 self._loop.call_soon(self._call_connection_lost, None)
283
Guido van Rossum57497ad2013-10-18 07:58:20 -0700284 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 self._loop.remove_reader(self._fileno)
286
Guido van Rossum57497ad2013-10-18 07:58:20 -0700287 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 self._loop.add_reader(self._fileno, self._read_ready)
289
290 def close(self):
291 if not self._closing:
292 self._close(None)
293
294 def _fatal_error(self, exc):
295 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800296 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
297 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 self._close(exc)
299
300 def _close(self, exc):
301 self._closing = True
302 self._loop.remove_reader(self._fileno)
303 self._loop.call_soon(self._call_connection_lost, exc)
304
305 def _call_connection_lost(self, exc):
306 try:
307 self._protocol.connection_lost(exc)
308 finally:
309 self._pipe.close()
310 self._pipe = None
311 self._protocol = None
312 self._loop = None
313
314
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800315class _UnixWritePipeTransport(selector_events._FlowControlMixin,
316 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317
318 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
319 super().__init__(extra)
320 self._extra['pipe'] = pipe
321 self._loop = loop
322 self._pipe = pipe
323 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700324 mode = os.fstat(self._fileno).st_mode
325 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100326 if not (is_socket or
327 stat.S_ISFIFO(mode) or
328 stat.S_ISCHR(mode)):
329 raise ValueError("Pipe transport is only for "
330 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 _set_nonblocking(self._fileno)
332 self._protocol = protocol
333 self._buffer = []
334 self._conn_lost = 0
335 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700336
337 # On AIX, the reader trick only works for sockets.
338 # On other platforms it works for pipes and sockets.
339 # (Exception: OS X 10.4? Issue #19294.)
340 if is_socket or not sys.platform.startswith("aix"):
341 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342
343 self._loop.call_soon(self._protocol.connection_made, self)
344 if waiter is not None:
345 self._loop.call_soon(waiter.set_result, None)
346
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800347 def get_write_buffer_size(self):
348 return sum(len(data) for data in self._buffer)
349
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700351 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100352 if self._buffer:
353 self._close(BrokenPipeError())
354 else:
355 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
357 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800358 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
359 if isinstance(data, bytearray):
360 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 if not data:
362 return
363
364 if self._conn_lost or self._closing:
365 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700366 logger.warning('pipe closed by peer or '
367 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._conn_lost += 1
369 return
370
371 if not self._buffer:
372 # Attempt to send it right away first.
373 try:
374 n = os.write(self._fileno, data)
375 except (BlockingIOError, InterruptedError):
376 n = 0
377 except Exception as exc:
378 self._conn_lost += 1
379 self._fatal_error(exc)
380 return
381 if n == len(data):
382 return
383 elif n > 0:
384 data = data[n:]
385 self._loop.add_writer(self._fileno, self._write_ready)
386
387 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800388 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389
390 def _write_ready(self):
391 data = b''.join(self._buffer)
392 assert data, 'Data should not be empty'
393
394 self._buffer.clear()
395 try:
396 n = os.write(self._fileno, data)
397 except (BlockingIOError, InterruptedError):
398 self._buffer.append(data)
399 except Exception as exc:
400 self._conn_lost += 1
401 # Remove writer here, _fatal_error() doesn't it
402 # because _buffer is empty.
403 self._loop.remove_writer(self._fileno)
404 self._fatal_error(exc)
405 else:
406 if n == len(data):
407 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800408 self._maybe_resume_protocol() # May append to buffer.
409 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 self._loop.remove_reader(self._fileno)
411 self._call_connection_lost(None)
412 return
413 elif n > 0:
414 data = data[n:]
415
416 self._buffer.append(data) # Try again later.
417
418 def can_write_eof(self):
419 return True
420
421 # TODO: Make the relationships between write_eof(), close(),
422 # abort(), _fatal_error() and _close() more straightforward.
423
424 def write_eof(self):
425 if self._closing:
426 return
427 assert self._pipe
428 self._closing = True
429 if not self._buffer:
430 self._loop.remove_reader(self._fileno)
431 self._loop.call_soon(self._call_connection_lost, None)
432
433 def close(self):
434 if not self._closing:
435 # write_eof is all what we needed to close the write pipe
436 self.write_eof()
437
438 def abort(self):
439 self._close(None)
440
441 def _fatal_error(self, exc):
442 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800443 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
444 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 self._close(exc)
446
447 def _close(self, exc=None):
448 self._closing = True
449 if self._buffer:
450 self._loop.remove_writer(self._fileno)
451 self._buffer.clear()
452 self._loop.remove_reader(self._fileno)
453 self._loop.call_soon(self._call_connection_lost, exc)
454
455 def _call_connection_lost(self, exc):
456 try:
457 self._protocol.connection_lost(exc)
458 finally:
459 self._pipe.close()
460 self._pipe = None
461 self._protocol = None
462 self._loop = None
463
464
Guido van Rossum59691282013-10-30 14:52:03 -0700465class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
Guido van Rossum59691282013-10-30 14:52:03 -0700467 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700468 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700470 # Use a socket pair for stdin, since not all platforms
471 # support selecting read events on the write end of a
472 # socket (which we use in order to detect closing of the
473 # other end). Notably this is needed on AIX, and works
474 # just fine on other platforms.
475 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 self._proc = subprocess.Popen(
477 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
478 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700479 if stdin_w is not None:
480 stdin.close()
481 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800482
483
484class AbstractChildWatcher:
485 """Abstract base class for monitoring child processes.
486
487 Objects derived from this class monitor a collection of subprocesses and
488 report their termination or interruption by a signal.
489
490 New callbacks are registered with .add_child_handler(). Starting a new
491 process must be done within a 'with' block to allow the watcher to suspend
492 its activity until the new process if fully registered (this is needed to
493 prevent a race condition in some implementations).
494
495 Example:
496 with watcher:
497 proc = subprocess.Popen("sleep 1")
498 watcher.add_child_handler(proc.pid, callback)
499
500 Notes:
501 Implementations of this class must be thread-safe.
502
503 Since child watcher objects may catch the SIGCHLD signal and call
504 waitpid(-1), there should be only one active object per process.
505 """
506
507 def add_child_handler(self, pid, callback, *args):
508 """Register a new child handler.
509
510 Arrange for callback(pid, returncode, *args) to be called when
511 process 'pid' terminates. Specifying another callback for the same
512 process replaces the previous handler.
513
514 Note: callback() must be thread-safe
515 """
516 raise NotImplementedError()
517
518 def remove_child_handler(self, pid):
519 """Removes the handler for process 'pid'.
520
521 The function returns True if the handler was successfully removed,
522 False if there was nothing to remove."""
523
524 raise NotImplementedError()
525
Guido van Rossum2bcae702013-11-13 15:50:08 -0800526 def attach_loop(self, loop):
527 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800528
Guido van Rossum2bcae702013-11-13 15:50:08 -0800529 If the watcher was previously attached to an event loop, then it is
530 first detached before attaching to the new loop.
531
532 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800533 """
534 raise NotImplementedError()
535
536 def close(self):
537 """Close the watcher.
538
539 This must be called to make sure that any underlying resource is freed.
540 """
541 raise NotImplementedError()
542
543 def __enter__(self):
544 """Enter the watcher's context and allow starting new processes
545
546 This function must return self"""
547 raise NotImplementedError()
548
549 def __exit__(self, a, b, c):
550 """Exit the watcher's context"""
551 raise NotImplementedError()
552
553
554class BaseChildWatcher(AbstractChildWatcher):
555
Guido van Rossum2bcae702013-11-13 15:50:08 -0800556 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800557 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800558
559 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800560 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800561
562 def _do_waitpid(self, expected_pid):
563 raise NotImplementedError()
564
565 def _do_waitpid_all(self):
566 raise NotImplementedError()
567
Guido van Rossum2bcae702013-11-13 15:50:08 -0800568 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800569 assert loop is None or isinstance(loop, events.AbstractEventLoop)
570
571 if self._loop is not None:
572 self._loop.remove_signal_handler(signal.SIGCHLD)
573
574 self._loop = loop
575 if loop is not None:
576 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
577
578 # Prevent a race condition in case a child terminated
579 # during the switch.
580 self._do_waitpid_all()
581
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800582 def _sig_chld(self):
583 try:
584 self._do_waitpid_all()
585 except Exception:
586 logger.exception('Unknown exception in SIGCHLD handler')
587
588 def _compute_returncode(self, status):
589 if os.WIFSIGNALED(status):
590 # The child process died because of a signal.
591 return -os.WTERMSIG(status)
592 elif os.WIFEXITED(status):
593 # The child process exited (e.g sys.exit()).
594 return os.WEXITSTATUS(status)
595 else:
596 # The child exited, but we don't understand its status.
597 # This shouldn't happen, but if it does, let's just
598 # return that status; perhaps that helps debug it.
599 return status
600
601
602class SafeChildWatcher(BaseChildWatcher):
603 """'Safe' child watcher implementation.
604
605 This implementation avoids disrupting other code spawning processes by
606 polling explicitly each process in the SIGCHLD handler instead of calling
607 os.waitpid(-1).
608
609 This is a safe solution but it has a significant overhead when handling a
610 big number of children (O(n) each time SIGCHLD is raised)
611 """
612
Guido van Rossum2bcae702013-11-13 15:50:08 -0800613 def __init__(self):
614 super().__init__()
615 self._callbacks = {}
616
617 def close(self):
618 self._callbacks.clear()
619 super().close()
620
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800621 def __enter__(self):
622 return self
623
624 def __exit__(self, a, b, c):
625 pass
626
627 def add_child_handler(self, pid, callback, *args):
628 self._callbacks[pid] = callback, args
629
630 # Prevent a race condition in case the child is already terminated.
631 self._do_waitpid(pid)
632
Guido van Rossum2bcae702013-11-13 15:50:08 -0800633 def remove_child_handler(self, pid):
634 try:
635 del self._callbacks[pid]
636 return True
637 except KeyError:
638 return False
639
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800640 def _do_waitpid_all(self):
641
642 for pid in list(self._callbacks):
643 self._do_waitpid(pid)
644
645 def _do_waitpid(self, expected_pid):
646 assert expected_pid > 0
647
648 try:
649 pid, status = os.waitpid(expected_pid, os.WNOHANG)
650 except ChildProcessError:
651 # The child process is already reaped
652 # (may happen if waitpid() is called elsewhere).
653 pid = expected_pid
654 returncode = 255
655 logger.warning(
656 "Unknown child process pid %d, will report returncode 255",
657 pid)
658 else:
659 if pid == 0:
660 # The child process is still alive.
661 return
662
663 returncode = self._compute_returncode(status)
664
665 try:
666 callback, args = self._callbacks.pop(pid)
667 except KeyError: # pragma: no cover
668 # May happen if .remove_child_handler() is called
669 # after os.waitpid() returns.
670 pass
671 else:
672 callback(pid, returncode, *args)
673
674
675class FastChildWatcher(BaseChildWatcher):
676 """'Fast' child watcher implementation.
677
678 This implementation reaps every terminated processes by calling
679 os.waitpid(-1) directly, possibly breaking other code spawning processes
680 and waiting for their termination.
681
682 There is no noticeable overhead when handling a big number of children
683 (O(1) each time a child terminates).
684 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800685 def __init__(self):
686 super().__init__()
687 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800688 self._lock = threading.Lock()
689 self._zombies = {}
690 self._forks = 0
691
692 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800693 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800694 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800695 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800696
697 def __enter__(self):
698 with self._lock:
699 self._forks += 1
700
701 return self
702
703 def __exit__(self, a, b, c):
704 with self._lock:
705 self._forks -= 1
706
707 if self._forks or not self._zombies:
708 return
709
710 collateral_victims = str(self._zombies)
711 self._zombies.clear()
712
713 logger.warning(
714 "Caught subprocesses termination from unknown pids: %s",
715 collateral_victims)
716
717 def add_child_handler(self, pid, callback, *args):
718 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800719 with self._lock:
720 try:
721 returncode = self._zombies.pop(pid)
722 except KeyError:
723 # The child is running.
724 self._callbacks[pid] = callback, args
725 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800727 # The child is dead already. We can fire the callback.
728 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800729
Guido van Rossum2bcae702013-11-13 15:50:08 -0800730 def remove_child_handler(self, pid):
731 try:
732 del self._callbacks[pid]
733 return True
734 except KeyError:
735 return False
736
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800737 def _do_waitpid_all(self):
738 # Because of signal coalescing, we must keep calling waitpid() as
739 # long as we're able to reap a child.
740 while True:
741 try:
742 pid, status = os.waitpid(-1, os.WNOHANG)
743 except ChildProcessError:
744 # No more child processes exist.
745 return
746 else:
747 if pid == 0:
748 # A child process is still alive.
749 return
750
751 returncode = self._compute_returncode(status)
752
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800753 with self._lock:
754 try:
755 callback, args = self._callbacks.pop(pid)
756 except KeyError:
757 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800758 if self._forks:
759 # It may not be registered yet.
760 self._zombies[pid] = returncode
761 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800762 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800763
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800764 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800765 logger.warning(
766 "Caught subprocess termination from unknown pid: "
767 "%d -> %d", pid, returncode)
768 else:
769 callback(pid, returncode, *args)
770
771
772class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
773 """XXX"""
774 _loop_factory = _UnixSelectorEventLoop
775
776 def __init__(self):
777 super().__init__()
778 self._watcher = None
779
780 def _init_watcher(self):
781 with events._lock:
782 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800783 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800784 if isinstance(threading.current_thread(),
785 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800786 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800787
788 def set_event_loop(self, loop):
789 """Set the event loop.
790
791 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800792 .set_event_loop() from the main thread will call .attach_loop(loop) on
793 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800794 """
795
796 super().set_event_loop(loop)
797
798 if self._watcher is not None and \
799 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800800 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800801
802 def get_child_watcher(self):
803 """Get the child watcher
804
805 If not yet set, a SafeChildWatcher object is automatically created.
806 """
807 if self._watcher is None:
808 self._init_watcher()
809
810 return self._watcher
811
812 def set_child_watcher(self, watcher):
813 """Set the child watcher"""
814
815 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
816
817 if self._watcher is not None:
818 self._watcher.close()
819
820 self._watcher = watcher
821
822SelectorEventLoop = _UnixSelectorEventLoop
823DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy