blob: 212554801db1078ec9807321fe7a2ef0ba450bbb [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080033class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050034 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Yury Selivanovb057c522014-02-18 12:15:06 -050036 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 """
38
39 def __init__(self, selector=None):
40 super().__init__(selector)
41 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
43 def _socketpair(self):
44 return socket.socketpair()
45
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080046 def close(self):
47 for sig in list(self._signal_handlers):
48 self.remove_signal_handler(sig)
49 super().close()
50
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 def add_signal_handler(self, sig, callback, *args):
52 """Add a handler for a signal. UNIX only.
53
54 Raise ValueError if the signal number is invalid or uncatchable.
55 Raise RuntimeError if there is a problem setting up the handler.
56 """
57 self._check_signal(sig)
58 try:
59 # set_wakeup_fd() raises ValueError if this is not the
60 # main thread. By calling it early we ensure that an
61 # event loop running in another thread cannot add a signal
62 # handler.
63 signal.set_wakeup_fd(self._csock.fileno())
64 except ValueError as exc:
65 raise RuntimeError(str(exc))
66
Yury Selivanov569efa22014-02-18 18:02:19 -050067 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 self._signal_handlers[sig] = handle
69
70 try:
71 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010072 # Set SA_RESTART to limit EINTR occurrences.
73 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 except OSError as exc:
75 del self._signal_handlers[sig]
76 if not self._signal_handlers:
77 try:
78 signal.set_wakeup_fd(-1)
79 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070080 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82 if exc.errno == errno.EINVAL:
83 raise RuntimeError('sig {} cannot be caught'.format(sig))
84 else:
85 raise
86
87 def _handle_signal(self, sig, arg):
88 """Internal helper that is the actual signal handler."""
89 handle = self._signal_handlers.get(sig)
90 if handle is None:
91 return # Assume it's some race condition.
92 if handle._cancelled:
93 self.remove_signal_handler(sig) # Remove it properly.
94 else:
95 self._add_callback_signalsafe(handle)
96
97 def remove_signal_handler(self, sig):
98 """Remove a handler for a signal. UNIX only.
99
100 Return True if a signal handler was removed, False if not.
101 """
102 self._check_signal(sig)
103 try:
104 del self._signal_handlers[sig]
105 except KeyError:
106 return False
107
108 if sig == signal.SIGINT:
109 handler = signal.default_int_handler
110 else:
111 handler = signal.SIG_DFL
112
113 try:
114 signal.signal(sig, handler)
115 except OSError as exc:
116 if exc.errno == errno.EINVAL:
117 raise RuntimeError('sig {} cannot be caught'.format(sig))
118 else:
119 raise
120
121 if not self._signal_handlers:
122 try:
123 signal.set_wakeup_fd(-1)
124 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700125 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 return True
128
129 def _check_signal(self, sig):
130 """Internal helper to validate a signal.
131
132 Raise ValueError if the signal number is invalid or uncatchable.
133 Raise RuntimeError if there is a problem setting up the handler.
134 """
135 if not isinstance(sig, int):
136 raise TypeError('sig must be an int, not {!r}'.format(sig))
137
138 if not (1 <= sig < signal.NSIG):
139 raise ValueError(
140 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
145
146 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
147 extra=None):
148 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
149
150 @tasks.coroutine
151 def _make_subprocess_transport(self, protocol, args, shell,
152 stdin, stdout, stderr, bufsize,
153 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800154 with events.get_child_watcher() as watcher:
155 transp = _UnixSubprocessTransport(self, protocol, args, shell,
156 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800157 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800158 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 watcher.add_child_handler(transp.get_pid(),
160 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800161
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 return transp
163
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 def _child_watcher_callback(self, pid, returncode, transp):
165 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
Yury Selivanovb057c522014-02-18 12:15:06 -0500167 @tasks.coroutine
168 def create_unix_connection(self, protocol_factory, path, *,
169 ssl=None, sock=None,
170 server_hostname=None):
171 assert server_hostname is None or isinstance(server_hostname, str)
172 if ssl:
173 if server_hostname is None:
174 raise ValueError(
175 'you have to pass server_hostname when using ssl')
176 else:
177 if server_hostname is not None:
178 raise ValueError('server_hostname is only meaningful with ssl')
179
180 if path is not None:
181 if sock is not None:
182 raise ValueError(
183 'path and sock can not be specified at the same time')
184
Victor Stinner79a29522014-02-19 01:45:59 +0100185 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500186 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 sock.setblocking(False)
188 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100189 except:
190 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500191 raise
192
193 else:
194 if sock is None:
195 raise ValueError('no path and sock were specified')
196 sock.setblocking(False)
197
198 transport, protocol = yield from self._create_connection_transport(
199 sock, protocol_factory, ssl, server_hostname)
200 return transport, protocol
201
202 @tasks.coroutine
203 def create_unix_server(self, protocol_factory, path=None, *,
204 sock=None, backlog=100, ssl=None):
205 if isinstance(ssl, bool):
206 raise TypeError('ssl argument must be an SSLContext or None')
207
208 if path is not None:
209 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
210
211 try:
212 sock.bind(path)
213 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100214 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500215 if exc.errno == errno.EADDRINUSE:
216 # Let's improve the error message by adding
217 # with what exact address it occurs.
218 msg = 'Address {!r} is already in use'.format(path)
219 raise OSError(errno.EADDRINUSE, msg) from None
220 else:
221 raise
222 else:
223 if sock is None:
224 raise ValueError(
225 'path was not specified, and no sock specified')
226
227 if sock.family != socket.AF_UNIX:
228 raise ValueError(
229 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
230
231 server = base_events.Server(self, [sock])
232 sock.listen(backlog)
233 sock.setblocking(False)
234 self._start_serving(protocol_factory, sock, ssl, server)
235 return server
236
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
238def _set_nonblocking(fd):
239 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
240 flags = flags | os.O_NONBLOCK
241 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
242
243
244class _UnixReadPipeTransport(transports.ReadTransport):
245
Yury Selivanovdec1a452014-02-18 22:27:48 -0500246 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247
248 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
249 super().__init__(extra)
250 self._extra['pipe'] = pipe
251 self._loop = loop
252 self._pipe = pipe
253 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700254 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800255 if not (stat.S_ISFIFO(mode) or
256 stat.S_ISSOCK(mode) or
257 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700258 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 _set_nonblocking(self._fileno)
260 self._protocol = protocol
261 self._closing = False
262 self._loop.add_reader(self._fileno, self._read_ready)
263 self._loop.call_soon(self._protocol.connection_made, self)
264 if waiter is not None:
265 self._loop.call_soon(waiter.set_result, None)
266
267 def _read_ready(self):
268 try:
269 data = os.read(self._fileno, self.max_size)
270 except (BlockingIOError, InterruptedError):
271 pass
272 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100273 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 else:
275 if data:
276 self._protocol.data_received(data)
277 else:
278 self._closing = True
279 self._loop.remove_reader(self._fileno)
280 self._loop.call_soon(self._protocol.eof_received)
281 self._loop.call_soon(self._call_connection_lost, None)
282
Guido van Rossum57497ad2013-10-18 07:58:20 -0700283 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 self._loop.remove_reader(self._fileno)
285
Guido van Rossum57497ad2013-10-18 07:58:20 -0700286 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 self._loop.add_reader(self._fileno, self._read_ready)
288
289 def close(self):
290 if not self._closing:
291 self._close(None)
292
Victor Stinner0ee29c22014-02-19 01:40:41 +0100293 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800295 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500296 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100297 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500298 'exception': exc,
299 'transport': self,
300 'protocol': self._protocol,
301 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 self._close(exc)
303
304 def _close(self, exc):
305 self._closing = True
306 self._loop.remove_reader(self._fileno)
307 self._loop.call_soon(self._call_connection_lost, exc)
308
309 def _call_connection_lost(self, exc):
310 try:
311 self._protocol.connection_lost(exc)
312 finally:
313 self._pipe.close()
314 self._pipe = None
315 self._protocol = None
316 self._loop = None
317
318
Yury Selivanov3cb99142014-02-18 18:41:13 -0500319class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800320 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321
322 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
323 super().__init__(extra)
324 self._extra['pipe'] = pipe
325 self._loop = loop
326 self._pipe = pipe
327 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700328 mode = os.fstat(self._fileno).st_mode
329 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100330 if not (is_socket or
331 stat.S_ISFIFO(mode) or
332 stat.S_ISCHR(mode)):
333 raise ValueError("Pipe transport is only for "
334 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 _set_nonblocking(self._fileno)
336 self._protocol = protocol
337 self._buffer = []
338 self._conn_lost = 0
339 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700340
341 # On AIX, the reader trick only works for sockets.
342 # On other platforms it works for pipes and sockets.
343 # (Exception: OS X 10.4? Issue #19294.)
344 if is_socket or not sys.platform.startswith("aix"):
345 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
347 self._loop.call_soon(self._protocol.connection_made, self)
348 if waiter is not None:
349 self._loop.call_soon(waiter.set_result, None)
350
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800351 def get_write_buffer_size(self):
352 return sum(len(data) for data in self._buffer)
353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700355 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100356 if self._buffer:
357 self._close(BrokenPipeError())
358 else:
359 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360
361 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800362 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
363 if isinstance(data, bytearray):
364 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 if not data:
366 return
367
368 if self._conn_lost or self._closing:
369 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700370 logger.warning('pipe closed by peer or '
371 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 self._conn_lost += 1
373 return
374
375 if not self._buffer:
376 # Attempt to send it right away first.
377 try:
378 n = os.write(self._fileno, data)
379 except (BlockingIOError, InterruptedError):
380 n = 0
381 except Exception as exc:
382 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100383 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 return
385 if n == len(data):
386 return
387 elif n > 0:
388 data = data[n:]
389 self._loop.add_writer(self._fileno, self._write_ready)
390
391 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800392 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
394 def _write_ready(self):
395 data = b''.join(self._buffer)
396 assert data, 'Data should not be empty'
397
398 self._buffer.clear()
399 try:
400 n = os.write(self._fileno, data)
401 except (BlockingIOError, InterruptedError):
402 self._buffer.append(data)
403 except Exception as exc:
404 self._conn_lost += 1
405 # Remove writer here, _fatal_error() doesn't it
406 # because _buffer is empty.
407 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100408 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 else:
410 if n == len(data):
411 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800412 self._maybe_resume_protocol() # May append to buffer.
413 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 self._loop.remove_reader(self._fileno)
415 self._call_connection_lost(None)
416 return
417 elif n > 0:
418 data = data[n:]
419
420 self._buffer.append(data) # Try again later.
421
422 def can_write_eof(self):
423 return True
424
425 # TODO: Make the relationships between write_eof(), close(),
426 # abort(), _fatal_error() and _close() more straightforward.
427
428 def write_eof(self):
429 if self._closing:
430 return
431 assert self._pipe
432 self._closing = True
433 if not self._buffer:
434 self._loop.remove_reader(self._fileno)
435 self._loop.call_soon(self._call_connection_lost, None)
436
437 def close(self):
438 if not self._closing:
439 # write_eof is all what we needed to close the write pipe
440 self.write_eof()
441
442 def abort(self):
443 self._close(None)
444
Victor Stinner0ee29c22014-02-19 01:40:41 +0100445 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800447 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500448 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100449 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500450 'exception': exc,
451 'transport': self,
452 'protocol': self._protocol,
453 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 self._close(exc)
455
456 def _close(self, exc=None):
457 self._closing = True
458 if self._buffer:
459 self._loop.remove_writer(self._fileno)
460 self._buffer.clear()
461 self._loop.remove_reader(self._fileno)
462 self._loop.call_soon(self._call_connection_lost, exc)
463
464 def _call_connection_lost(self, exc):
465 try:
466 self._protocol.connection_lost(exc)
467 finally:
468 self._pipe.close()
469 self._pipe = None
470 self._protocol = None
471 self._loop = None
472
473
Guido van Rossum59691282013-10-30 14:52:03 -0700474class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
Guido van Rossum59691282013-10-30 14:52:03 -0700476 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700477 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700479 # Use a socket pair for stdin, since not all platforms
480 # support selecting read events on the write end of a
481 # socket (which we use in order to detect closing of the
482 # other end). Notably this is needed on AIX, and works
483 # just fine on other platforms.
484 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 self._proc = subprocess.Popen(
486 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
487 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700488 if stdin_w is not None:
489 stdin.close()
490 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800491
492
493class AbstractChildWatcher:
494 """Abstract base class for monitoring child processes.
495
496 Objects derived from this class monitor a collection of subprocesses and
497 report their termination or interruption by a signal.
498
499 New callbacks are registered with .add_child_handler(). Starting a new
500 process must be done within a 'with' block to allow the watcher to suspend
501 its activity until the new process if fully registered (this is needed to
502 prevent a race condition in some implementations).
503
504 Example:
505 with watcher:
506 proc = subprocess.Popen("sleep 1")
507 watcher.add_child_handler(proc.pid, callback)
508
509 Notes:
510 Implementations of this class must be thread-safe.
511
512 Since child watcher objects may catch the SIGCHLD signal and call
513 waitpid(-1), there should be only one active object per process.
514 """
515
516 def add_child_handler(self, pid, callback, *args):
517 """Register a new child handler.
518
519 Arrange for callback(pid, returncode, *args) to be called when
520 process 'pid' terminates. Specifying another callback for the same
521 process replaces the previous handler.
522
523 Note: callback() must be thread-safe
524 """
525 raise NotImplementedError()
526
527 def remove_child_handler(self, pid):
528 """Removes the handler for process 'pid'.
529
530 The function returns True if the handler was successfully removed,
531 False if there was nothing to remove."""
532
533 raise NotImplementedError()
534
Guido van Rossum2bcae702013-11-13 15:50:08 -0800535 def attach_loop(self, loop):
536 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800537
Guido van Rossum2bcae702013-11-13 15:50:08 -0800538 If the watcher was previously attached to an event loop, then it is
539 first detached before attaching to the new loop.
540
541 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800542 """
543 raise NotImplementedError()
544
545 def close(self):
546 """Close the watcher.
547
548 This must be called to make sure that any underlying resource is freed.
549 """
550 raise NotImplementedError()
551
552 def __enter__(self):
553 """Enter the watcher's context and allow starting new processes
554
555 This function must return self"""
556 raise NotImplementedError()
557
558 def __exit__(self, a, b, c):
559 """Exit the watcher's context"""
560 raise NotImplementedError()
561
562
563class BaseChildWatcher(AbstractChildWatcher):
564
Guido van Rossum2bcae702013-11-13 15:50:08 -0800565 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800566 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800567
568 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800569 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800570
571 def _do_waitpid(self, expected_pid):
572 raise NotImplementedError()
573
574 def _do_waitpid_all(self):
575 raise NotImplementedError()
576
Guido van Rossum2bcae702013-11-13 15:50:08 -0800577 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800578 assert loop is None or isinstance(loop, events.AbstractEventLoop)
579
580 if self._loop is not None:
581 self._loop.remove_signal_handler(signal.SIGCHLD)
582
583 self._loop = loop
584 if loop is not None:
585 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
586
587 # Prevent a race condition in case a child terminated
588 # during the switch.
589 self._do_waitpid_all()
590
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800591 def _sig_chld(self):
592 try:
593 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500594 except Exception as exc:
595 # self._loop should always be available here
596 # as '_sig_chld' is added as a signal handler
597 # in 'attach_loop'
598 self._loop.call_exception_handler({
599 'message': 'Unknown exception in SIGCHLD handler',
600 'exception': exc,
601 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800602
603 def _compute_returncode(self, status):
604 if os.WIFSIGNALED(status):
605 # The child process died because of a signal.
606 return -os.WTERMSIG(status)
607 elif os.WIFEXITED(status):
608 # The child process exited (e.g sys.exit()).
609 return os.WEXITSTATUS(status)
610 else:
611 # The child exited, but we don't understand its status.
612 # This shouldn't happen, but if it does, let's just
613 # return that status; perhaps that helps debug it.
614 return status
615
616
617class SafeChildWatcher(BaseChildWatcher):
618 """'Safe' child watcher implementation.
619
620 This implementation avoids disrupting other code spawning processes by
621 polling explicitly each process in the SIGCHLD handler instead of calling
622 os.waitpid(-1).
623
624 This is a safe solution but it has a significant overhead when handling a
625 big number of children (O(n) each time SIGCHLD is raised)
626 """
627
Guido van Rossum2bcae702013-11-13 15:50:08 -0800628 def __init__(self):
629 super().__init__()
630 self._callbacks = {}
631
632 def close(self):
633 self._callbacks.clear()
634 super().close()
635
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800636 def __enter__(self):
637 return self
638
639 def __exit__(self, a, b, c):
640 pass
641
642 def add_child_handler(self, pid, callback, *args):
643 self._callbacks[pid] = callback, args
644
645 # Prevent a race condition in case the child is already terminated.
646 self._do_waitpid(pid)
647
Guido van Rossum2bcae702013-11-13 15:50:08 -0800648 def remove_child_handler(self, pid):
649 try:
650 del self._callbacks[pid]
651 return True
652 except KeyError:
653 return False
654
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800655 def _do_waitpid_all(self):
656
657 for pid in list(self._callbacks):
658 self._do_waitpid(pid)
659
660 def _do_waitpid(self, expected_pid):
661 assert expected_pid > 0
662
663 try:
664 pid, status = os.waitpid(expected_pid, os.WNOHANG)
665 except ChildProcessError:
666 # The child process is already reaped
667 # (may happen if waitpid() is called elsewhere).
668 pid = expected_pid
669 returncode = 255
670 logger.warning(
671 "Unknown child process pid %d, will report returncode 255",
672 pid)
673 else:
674 if pid == 0:
675 # The child process is still alive.
676 return
677
678 returncode = self._compute_returncode(status)
679
680 try:
681 callback, args = self._callbacks.pop(pid)
682 except KeyError: # pragma: no cover
683 # May happen if .remove_child_handler() is called
684 # after os.waitpid() returns.
685 pass
686 else:
687 callback(pid, returncode, *args)
688
689
690class FastChildWatcher(BaseChildWatcher):
691 """'Fast' child watcher implementation.
692
693 This implementation reaps every terminated processes by calling
694 os.waitpid(-1) directly, possibly breaking other code spawning processes
695 and waiting for their termination.
696
697 There is no noticeable overhead when handling a big number of children
698 (O(1) each time a child terminates).
699 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800700 def __init__(self):
701 super().__init__()
702 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800703 self._lock = threading.Lock()
704 self._zombies = {}
705 self._forks = 0
706
707 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800708 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800709 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800710 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800711
712 def __enter__(self):
713 with self._lock:
714 self._forks += 1
715
716 return self
717
718 def __exit__(self, a, b, c):
719 with self._lock:
720 self._forks -= 1
721
722 if self._forks or not self._zombies:
723 return
724
725 collateral_victims = str(self._zombies)
726 self._zombies.clear()
727
728 logger.warning(
729 "Caught subprocesses termination from unknown pids: %s",
730 collateral_victims)
731
732 def add_child_handler(self, pid, callback, *args):
733 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800734 with self._lock:
735 try:
736 returncode = self._zombies.pop(pid)
737 except KeyError:
738 # The child is running.
739 self._callbacks[pid] = callback, args
740 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800741
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800742 # The child is dead already. We can fire the callback.
743 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800744
Guido van Rossum2bcae702013-11-13 15:50:08 -0800745 def remove_child_handler(self, pid):
746 try:
747 del self._callbacks[pid]
748 return True
749 except KeyError:
750 return False
751
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800752 def _do_waitpid_all(self):
753 # Because of signal coalescing, we must keep calling waitpid() as
754 # long as we're able to reap a child.
755 while True:
756 try:
757 pid, status = os.waitpid(-1, os.WNOHANG)
758 except ChildProcessError:
759 # No more child processes exist.
760 return
761 else:
762 if pid == 0:
763 # A child process is still alive.
764 return
765
766 returncode = self._compute_returncode(status)
767
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800768 with self._lock:
769 try:
770 callback, args = self._callbacks.pop(pid)
771 except KeyError:
772 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773 if self._forks:
774 # It may not be registered yet.
775 self._zombies[pid] = returncode
776 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800777 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800778
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800779 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780 logger.warning(
781 "Caught subprocess termination from unknown pid: "
782 "%d -> %d", pid, returncode)
783 else:
784 callback(pid, returncode, *args)
785
786
787class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
788 """XXX"""
789 _loop_factory = _UnixSelectorEventLoop
790
791 def __init__(self):
792 super().__init__()
793 self._watcher = None
794
795 def _init_watcher(self):
796 with events._lock:
797 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800798 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800799 if isinstance(threading.current_thread(),
800 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800801 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800802
803 def set_event_loop(self, loop):
804 """Set the event loop.
805
806 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800807 .set_event_loop() from the main thread will call .attach_loop(loop) on
808 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809 """
810
811 super().set_event_loop(loop)
812
813 if self._watcher is not None and \
814 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800815 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800816
817 def get_child_watcher(self):
818 """Get the child watcher
819
820 If not yet set, a SafeChildWatcher object is automatically created.
821 """
822 if self._watcher is None:
823 self._init_watcher()
824
825 return self._watcher
826
827 def set_child_watcher(self, watcher):
828 """Set the child watcher"""
829
830 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
831
832 if self._watcher is not None:
833 self._watcher.close()
834
835 self._watcher = watcher
836
837SelectorEventLoop = _UnixSelectorEventLoop
838DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy