blob: 1fbdd313a8fba6283186f599c51c520801478a63 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080033class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050034 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Yury Selivanovb057c522014-02-18 12:15:06 -050036 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 """
38
39 def __init__(self, selector=None):
40 super().__init__(selector)
41 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
43 def _socketpair(self):
44 return socket.socketpair()
45
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080046 def close(self):
47 for sig in list(self._signal_handlers):
48 self.remove_signal_handler(sig)
49 super().close()
50
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 def add_signal_handler(self, sig, callback, *args):
52 """Add a handler for a signal. UNIX only.
53
54 Raise ValueError if the signal number is invalid or uncatchable.
55 Raise RuntimeError if there is a problem setting up the handler.
56 """
57 self._check_signal(sig)
58 try:
59 # set_wakeup_fd() raises ValueError if this is not the
60 # main thread. By calling it early we ensure that an
61 # event loop running in another thread cannot add a signal
62 # handler.
63 signal.set_wakeup_fd(self._csock.fileno())
64 except ValueError as exc:
65 raise RuntimeError(str(exc))
66
Yury Selivanov569efa22014-02-18 18:02:19 -050067 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 self._signal_handlers[sig] = handle
69
70 try:
71 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010072 # Set SA_RESTART to limit EINTR occurrences.
73 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 except OSError as exc:
75 del self._signal_handlers[sig]
76 if not self._signal_handlers:
77 try:
78 signal.set_wakeup_fd(-1)
79 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070080 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081
82 if exc.errno == errno.EINVAL:
83 raise RuntimeError('sig {} cannot be caught'.format(sig))
84 else:
85 raise
86
87 def _handle_signal(self, sig, arg):
88 """Internal helper that is the actual signal handler."""
89 handle = self._signal_handlers.get(sig)
90 if handle is None:
91 return # Assume it's some race condition.
92 if handle._cancelled:
93 self.remove_signal_handler(sig) # Remove it properly.
94 else:
95 self._add_callback_signalsafe(handle)
96
97 def remove_signal_handler(self, sig):
98 """Remove a handler for a signal. UNIX only.
99
100 Return True if a signal handler was removed, False if not.
101 """
102 self._check_signal(sig)
103 try:
104 del self._signal_handlers[sig]
105 except KeyError:
106 return False
107
108 if sig == signal.SIGINT:
109 handler = signal.default_int_handler
110 else:
111 handler = signal.SIG_DFL
112
113 try:
114 signal.signal(sig, handler)
115 except OSError as exc:
116 if exc.errno == errno.EINVAL:
117 raise RuntimeError('sig {} cannot be caught'.format(sig))
118 else:
119 raise
120
121 if not self._signal_handlers:
122 try:
123 signal.set_wakeup_fd(-1)
124 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700125 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 return True
128
129 def _check_signal(self, sig):
130 """Internal helper to validate a signal.
131
132 Raise ValueError if the signal number is invalid or uncatchable.
133 Raise RuntimeError if there is a problem setting up the handler.
134 """
135 if not isinstance(sig, int):
136 raise TypeError('sig must be an int, not {!r}'.format(sig))
137
138 if not (1 <= sig < signal.NSIG):
139 raise ValueError(
140 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
145
146 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
147 extra=None):
148 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
149
150 @tasks.coroutine
151 def _make_subprocess_transport(self, protocol, args, shell,
152 stdin, stdout, stderr, bufsize,
153 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800154 with events.get_child_watcher() as watcher:
155 transp = _UnixSubprocessTransport(self, protocol, args, shell,
156 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800157 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800158 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800159 watcher.add_child_handler(transp.get_pid(),
160 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800161
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 return transp
163
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 def _child_watcher_callback(self, pid, returncode, transp):
165 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
Yury Selivanovb057c522014-02-18 12:15:06 -0500167 @tasks.coroutine
168 def create_unix_connection(self, protocol_factory, path, *,
169 ssl=None, sock=None,
170 server_hostname=None):
171 assert server_hostname is None or isinstance(server_hostname, str)
172 if ssl:
173 if server_hostname is None:
174 raise ValueError(
175 'you have to pass server_hostname when using ssl')
176 else:
177 if server_hostname is not None:
178 raise ValueError('server_hostname is only meaningful with ssl')
179
180 if path is not None:
181 if sock is not None:
182 raise ValueError(
183 'path and sock can not be specified at the same time')
184
Victor Stinner79a29522014-02-19 01:45:59 +0100185 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500186 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 sock.setblocking(False)
188 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100189 except:
190 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500191 raise
192
193 else:
194 if sock is None:
195 raise ValueError('no path and sock were specified')
196 sock.setblocking(False)
197
198 transport, protocol = yield from self._create_connection_transport(
199 sock, protocol_factory, ssl, server_hostname)
200 return transport, protocol
201
202 @tasks.coroutine
203 def create_unix_server(self, protocol_factory, path=None, *,
204 sock=None, backlog=100, ssl=None):
205 if isinstance(ssl, bool):
206 raise TypeError('ssl argument must be an SSLContext or None')
207
208 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200209 if sock is not None:
210 raise ValueError(
211 'path and sock can not be specified at the same time')
212
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
214
215 try:
216 sock.bind(path)
217 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100218 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 if exc.errno == errno.EADDRINUSE:
220 # Let's improve the error message by adding
221 # with what exact address it occurs.
222 msg = 'Address {!r} is already in use'.format(path)
223 raise OSError(errno.EADDRINUSE, msg) from None
224 else:
225 raise
226 else:
227 if sock is None:
228 raise ValueError(
229 'path was not specified, and no sock specified')
230
231 if sock.family != socket.AF_UNIX:
232 raise ValueError(
233 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
234
235 server = base_events.Server(self, [sock])
236 sock.listen(backlog)
237 sock.setblocking(False)
238 self._start_serving(protocol_factory, sock, ssl, server)
239 return server
240
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241
242def _set_nonblocking(fd):
243 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
244 flags = flags | os.O_NONBLOCK
245 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
246
247
248class _UnixReadPipeTransport(transports.ReadTransport):
249
Yury Selivanovdec1a452014-02-18 22:27:48 -0500250 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251
252 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
253 super().__init__(extra)
254 self._extra['pipe'] = pipe
255 self._loop = loop
256 self._pipe = pipe
257 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700258 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800259 if not (stat.S_ISFIFO(mode) or
260 stat.S_ISSOCK(mode) or
261 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700262 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 _set_nonblocking(self._fileno)
264 self._protocol = protocol
265 self._closing = False
266 self._loop.add_reader(self._fileno, self._read_ready)
267 self._loop.call_soon(self._protocol.connection_made, self)
268 if waiter is not None:
269 self._loop.call_soon(waiter.set_result, None)
270
271 def _read_ready(self):
272 try:
273 data = os.read(self._fileno, self.max_size)
274 except (BlockingIOError, InterruptedError):
275 pass
276 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100277 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 else:
279 if data:
280 self._protocol.data_received(data)
281 else:
282 self._closing = True
283 self._loop.remove_reader(self._fileno)
284 self._loop.call_soon(self._protocol.eof_received)
285 self._loop.call_soon(self._call_connection_lost, None)
286
Guido van Rossum57497ad2013-10-18 07:58:20 -0700287 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 self._loop.remove_reader(self._fileno)
289
Guido van Rossum57497ad2013-10-18 07:58:20 -0700290 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 self._loop.add_reader(self._fileno, self._read_ready)
292
293 def close(self):
294 if not self._closing:
295 self._close(None)
296
Victor Stinner0ee29c22014-02-19 01:40:41 +0100297 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800299 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500300 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100301 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500302 'exception': exc,
303 'transport': self,
304 'protocol': self._protocol,
305 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 self._close(exc)
307
308 def _close(self, exc):
309 self._closing = True
310 self._loop.remove_reader(self._fileno)
311 self._loop.call_soon(self._call_connection_lost, exc)
312
313 def _call_connection_lost(self, exc):
314 try:
315 self._protocol.connection_lost(exc)
316 finally:
317 self._pipe.close()
318 self._pipe = None
319 self._protocol = None
320 self._loop = None
321
322
Yury Selivanov3cb99142014-02-18 18:41:13 -0500323class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800324 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325
326 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
327 super().__init__(extra)
328 self._extra['pipe'] = pipe
329 self._loop = loop
330 self._pipe = pipe
331 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700332 mode = os.fstat(self._fileno).st_mode
333 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100334 if not (is_socket or
335 stat.S_ISFIFO(mode) or
336 stat.S_ISCHR(mode)):
337 raise ValueError("Pipe transport is only for "
338 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 _set_nonblocking(self._fileno)
340 self._protocol = protocol
341 self._buffer = []
342 self._conn_lost = 0
343 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700344
345 # On AIX, the reader trick only works for sockets.
346 # On other platforms it works for pipes and sockets.
347 # (Exception: OS X 10.4? Issue #19294.)
348 if is_socket or not sys.platform.startswith("aix"):
349 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350
351 self._loop.call_soon(self._protocol.connection_made, self)
352 if waiter is not None:
353 self._loop.call_soon(waiter.set_result, None)
354
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800355 def get_write_buffer_size(self):
356 return sum(len(data) for data in self._buffer)
357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700359 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100360 if self._buffer:
361 self._close(BrokenPipeError())
362 else:
363 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
365 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800366 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
367 if isinstance(data, bytearray):
368 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 if not data:
370 return
371
372 if self._conn_lost or self._closing:
373 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700374 logger.warning('pipe closed by peer or '
375 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 self._conn_lost += 1
377 return
378
379 if not self._buffer:
380 # Attempt to send it right away first.
381 try:
382 n = os.write(self._fileno, data)
383 except (BlockingIOError, InterruptedError):
384 n = 0
385 except Exception as exc:
386 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100387 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 return
389 if n == len(data):
390 return
391 elif n > 0:
392 data = data[n:]
393 self._loop.add_writer(self._fileno, self._write_ready)
394
395 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800396 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
398 def _write_ready(self):
399 data = b''.join(self._buffer)
400 assert data, 'Data should not be empty'
401
402 self._buffer.clear()
403 try:
404 n = os.write(self._fileno, data)
405 except (BlockingIOError, InterruptedError):
406 self._buffer.append(data)
407 except Exception as exc:
408 self._conn_lost += 1
409 # Remove writer here, _fatal_error() doesn't it
410 # because _buffer is empty.
411 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100412 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 else:
414 if n == len(data):
415 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800416 self._maybe_resume_protocol() # May append to buffer.
417 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 self._loop.remove_reader(self._fileno)
419 self._call_connection_lost(None)
420 return
421 elif n > 0:
422 data = data[n:]
423
424 self._buffer.append(data) # Try again later.
425
426 def can_write_eof(self):
427 return True
428
429 # TODO: Make the relationships between write_eof(), close(),
430 # abort(), _fatal_error() and _close() more straightforward.
431
432 def write_eof(self):
433 if self._closing:
434 return
435 assert self._pipe
436 self._closing = True
437 if not self._buffer:
438 self._loop.remove_reader(self._fileno)
439 self._loop.call_soon(self._call_connection_lost, None)
440
441 def close(self):
442 if not self._closing:
443 # write_eof is all what we needed to close the write pipe
444 self.write_eof()
445
446 def abort(self):
447 self._close(None)
448
Victor Stinner0ee29c22014-02-19 01:40:41 +0100449 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800451 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500452 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100453 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500454 'exception': exc,
455 'transport': self,
456 'protocol': self._protocol,
457 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 self._close(exc)
459
460 def _close(self, exc=None):
461 self._closing = True
462 if self._buffer:
463 self._loop.remove_writer(self._fileno)
464 self._buffer.clear()
465 self._loop.remove_reader(self._fileno)
466 self._loop.call_soon(self._call_connection_lost, exc)
467
468 def _call_connection_lost(self, exc):
469 try:
470 self._protocol.connection_lost(exc)
471 finally:
472 self._pipe.close()
473 self._pipe = None
474 self._protocol = None
475 self._loop = None
476
477
Guido van Rossum59691282013-10-30 14:52:03 -0700478class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
Guido van Rossum59691282013-10-30 14:52:03 -0700480 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700481 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700483 # Use a socket pair for stdin, since not all platforms
484 # support selecting read events on the write end of a
485 # socket (which we use in order to detect closing of the
486 # other end). Notably this is needed on AIX, and works
487 # just fine on other platforms.
488 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 self._proc = subprocess.Popen(
490 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
491 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700492 if stdin_w is not None:
493 stdin.close()
494 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800495
496
497class AbstractChildWatcher:
498 """Abstract base class for monitoring child processes.
499
500 Objects derived from this class monitor a collection of subprocesses and
501 report their termination or interruption by a signal.
502
503 New callbacks are registered with .add_child_handler(). Starting a new
504 process must be done within a 'with' block to allow the watcher to suspend
505 its activity until the new process if fully registered (this is needed to
506 prevent a race condition in some implementations).
507
508 Example:
509 with watcher:
510 proc = subprocess.Popen("sleep 1")
511 watcher.add_child_handler(proc.pid, callback)
512
513 Notes:
514 Implementations of this class must be thread-safe.
515
516 Since child watcher objects may catch the SIGCHLD signal and call
517 waitpid(-1), there should be only one active object per process.
518 """
519
520 def add_child_handler(self, pid, callback, *args):
521 """Register a new child handler.
522
523 Arrange for callback(pid, returncode, *args) to be called when
524 process 'pid' terminates. Specifying another callback for the same
525 process replaces the previous handler.
526
527 Note: callback() must be thread-safe
528 """
529 raise NotImplementedError()
530
531 def remove_child_handler(self, pid):
532 """Removes the handler for process 'pid'.
533
534 The function returns True if the handler was successfully removed,
535 False if there was nothing to remove."""
536
537 raise NotImplementedError()
538
Guido van Rossum2bcae702013-11-13 15:50:08 -0800539 def attach_loop(self, loop):
540 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800541
Guido van Rossum2bcae702013-11-13 15:50:08 -0800542 If the watcher was previously attached to an event loop, then it is
543 first detached before attaching to the new loop.
544
545 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800546 """
547 raise NotImplementedError()
548
549 def close(self):
550 """Close the watcher.
551
552 This must be called to make sure that any underlying resource is freed.
553 """
554 raise NotImplementedError()
555
556 def __enter__(self):
557 """Enter the watcher's context and allow starting new processes
558
559 This function must return self"""
560 raise NotImplementedError()
561
562 def __exit__(self, a, b, c):
563 """Exit the watcher's context"""
564 raise NotImplementedError()
565
566
567class BaseChildWatcher(AbstractChildWatcher):
568
Guido van Rossum2bcae702013-11-13 15:50:08 -0800569 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800570 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800571
572 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800573 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800574
575 def _do_waitpid(self, expected_pid):
576 raise NotImplementedError()
577
578 def _do_waitpid_all(self):
579 raise NotImplementedError()
580
Guido van Rossum2bcae702013-11-13 15:50:08 -0800581 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800582 assert loop is None or isinstance(loop, events.AbstractEventLoop)
583
584 if self._loop is not None:
585 self._loop.remove_signal_handler(signal.SIGCHLD)
586
587 self._loop = loop
588 if loop is not None:
589 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
590
591 # Prevent a race condition in case a child terminated
592 # during the switch.
593 self._do_waitpid_all()
594
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800595 def _sig_chld(self):
596 try:
597 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500598 except Exception as exc:
599 # self._loop should always be available here
600 # as '_sig_chld' is added as a signal handler
601 # in 'attach_loop'
602 self._loop.call_exception_handler({
603 'message': 'Unknown exception in SIGCHLD handler',
604 'exception': exc,
605 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800606
607 def _compute_returncode(self, status):
608 if os.WIFSIGNALED(status):
609 # The child process died because of a signal.
610 return -os.WTERMSIG(status)
611 elif os.WIFEXITED(status):
612 # The child process exited (e.g sys.exit()).
613 return os.WEXITSTATUS(status)
614 else:
615 # The child exited, but we don't understand its status.
616 # This shouldn't happen, but if it does, let's just
617 # return that status; perhaps that helps debug it.
618 return status
619
620
621class SafeChildWatcher(BaseChildWatcher):
622 """'Safe' child watcher implementation.
623
624 This implementation avoids disrupting other code spawning processes by
625 polling explicitly each process in the SIGCHLD handler instead of calling
626 os.waitpid(-1).
627
628 This is a safe solution but it has a significant overhead when handling a
629 big number of children (O(n) each time SIGCHLD is raised)
630 """
631
Guido van Rossum2bcae702013-11-13 15:50:08 -0800632 def __init__(self):
633 super().__init__()
634 self._callbacks = {}
635
636 def close(self):
637 self._callbacks.clear()
638 super().close()
639
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800640 def __enter__(self):
641 return self
642
643 def __exit__(self, a, b, c):
644 pass
645
646 def add_child_handler(self, pid, callback, *args):
647 self._callbacks[pid] = callback, args
648
649 # Prevent a race condition in case the child is already terminated.
650 self._do_waitpid(pid)
651
Guido van Rossum2bcae702013-11-13 15:50:08 -0800652 def remove_child_handler(self, pid):
653 try:
654 del self._callbacks[pid]
655 return True
656 except KeyError:
657 return False
658
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800659 def _do_waitpid_all(self):
660
661 for pid in list(self._callbacks):
662 self._do_waitpid(pid)
663
664 def _do_waitpid(self, expected_pid):
665 assert expected_pid > 0
666
667 try:
668 pid, status = os.waitpid(expected_pid, os.WNOHANG)
669 except ChildProcessError:
670 # The child process is already reaped
671 # (may happen if waitpid() is called elsewhere).
672 pid = expected_pid
673 returncode = 255
674 logger.warning(
675 "Unknown child process pid %d, will report returncode 255",
676 pid)
677 else:
678 if pid == 0:
679 # The child process is still alive.
680 return
681
682 returncode = self._compute_returncode(status)
683
684 try:
685 callback, args = self._callbacks.pop(pid)
686 except KeyError: # pragma: no cover
687 # May happen if .remove_child_handler() is called
688 # after os.waitpid() returns.
689 pass
690 else:
691 callback(pid, returncode, *args)
692
693
694class FastChildWatcher(BaseChildWatcher):
695 """'Fast' child watcher implementation.
696
697 This implementation reaps every terminated processes by calling
698 os.waitpid(-1) directly, possibly breaking other code spawning processes
699 and waiting for their termination.
700
701 There is no noticeable overhead when handling a big number of children
702 (O(1) each time a child terminates).
703 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800704 def __init__(self):
705 super().__init__()
706 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800707 self._lock = threading.Lock()
708 self._zombies = {}
709 self._forks = 0
710
711 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800714 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800715
716 def __enter__(self):
717 with self._lock:
718 self._forks += 1
719
720 return self
721
722 def __exit__(self, a, b, c):
723 with self._lock:
724 self._forks -= 1
725
726 if self._forks or not self._zombies:
727 return
728
729 collateral_victims = str(self._zombies)
730 self._zombies.clear()
731
732 logger.warning(
733 "Caught subprocesses termination from unknown pids: %s",
734 collateral_victims)
735
736 def add_child_handler(self, pid, callback, *args):
737 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800738 with self._lock:
739 try:
740 returncode = self._zombies.pop(pid)
741 except KeyError:
742 # The child is running.
743 self._callbacks[pid] = callback, args
744 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800745
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800746 # The child is dead already. We can fire the callback.
747 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800748
Guido van Rossum2bcae702013-11-13 15:50:08 -0800749 def remove_child_handler(self, pid):
750 try:
751 del self._callbacks[pid]
752 return True
753 except KeyError:
754 return False
755
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800756 def _do_waitpid_all(self):
757 # Because of signal coalescing, we must keep calling waitpid() as
758 # long as we're able to reap a child.
759 while True:
760 try:
761 pid, status = os.waitpid(-1, os.WNOHANG)
762 except ChildProcessError:
763 # No more child processes exist.
764 return
765 else:
766 if pid == 0:
767 # A child process is still alive.
768 return
769
770 returncode = self._compute_returncode(status)
771
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800772 with self._lock:
773 try:
774 callback, args = self._callbacks.pop(pid)
775 except KeyError:
776 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800777 if self._forks:
778 # It may not be registered yet.
779 self._zombies[pid] = returncode
780 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800781 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800782
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800783 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800784 logger.warning(
785 "Caught subprocess termination from unknown pid: "
786 "%d -> %d", pid, returncode)
787 else:
788 callback(pid, returncode, *args)
789
790
791class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
792 """XXX"""
793 _loop_factory = _UnixSelectorEventLoop
794
795 def __init__(self):
796 super().__init__()
797 self._watcher = None
798
799 def _init_watcher(self):
800 with events._lock:
801 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800802 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800803 if isinstance(threading.current_thread(),
804 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800805 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806
807 def set_event_loop(self, loop):
808 """Set the event loop.
809
810 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800811 .set_event_loop() from the main thread will call .attach_loop(loop) on
812 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800813 """
814
815 super().set_event_loop(loop)
816
817 if self._watcher is not None and \
818 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800819 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820
821 def get_child_watcher(self):
822 """Get the child watcher
823
824 If not yet set, a SafeChildWatcher object is automatically created.
825 """
826 if self._watcher is None:
827 self._init_watcher()
828
829 return self._watcher
830
831 def set_child_watcher(self, watcher):
832 """Set the child watcher"""
833
834 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
835
836 if self._watcher is not None:
837 self._watcher.close()
838
839 self._watcher = watcher
840
841SelectorEventLoop = _UnixSelectorEventLoop
842DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy