blob: faf4c60d5bb525e16d6ab08b8938e2901b504aa1 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
18from . import protocols
19from . import selector_events
20from . import tasks
21from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080034class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050035 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Yury Selivanovb057c522014-02-18 12:15:06 -050037 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 """
39
40 def __init__(self, selector=None):
41 super().__init__(selector)
42 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44 def _socketpair(self):
45 return socket.socketpair()
46
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080047 def close(self):
48 for sig in list(self._signal_handlers):
49 self.remove_signal_handler(sig)
50 super().close()
51
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 def add_signal_handler(self, sig, callback, *args):
53 """Add a handler for a signal. UNIX only.
54
55 Raise ValueError if the signal number is invalid or uncatchable.
56 Raise RuntimeError if there is a problem setting up the handler.
57 """
58 self._check_signal(sig)
59 try:
60 # set_wakeup_fd() raises ValueError if this is not the
61 # main thread. By calling it early we ensure that an
62 # event loop running in another thread cannot add a signal
63 # handler.
64 signal.set_wakeup_fd(self._csock.fileno())
65 except ValueError as exc:
66 raise RuntimeError(str(exc))
67
Yury Selivanov569efa22014-02-18 18:02:19 -050068 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 self._signal_handlers[sig] = handle
70
71 try:
72 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010073 # Set SA_RESTART to limit EINTR occurrences.
74 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 except OSError as exc:
76 del self._signal_handlers[sig]
77 if not self._signal_handlers:
78 try:
79 signal.set_wakeup_fd(-1)
80 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070081 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082
83 if exc.errno == errno.EINVAL:
84 raise RuntimeError('sig {} cannot be caught'.format(sig))
85 else:
86 raise
87
88 def _handle_signal(self, sig, arg):
89 """Internal helper that is the actual signal handler."""
90 handle = self._signal_handlers.get(sig)
91 if handle is None:
92 return # Assume it's some race condition.
93 if handle._cancelled:
94 self.remove_signal_handler(sig) # Remove it properly.
95 else:
96 self._add_callback_signalsafe(handle)
97
98 def remove_signal_handler(self, sig):
99 """Remove a handler for a signal. UNIX only.
100
101 Return True if a signal handler was removed, False if not.
102 """
103 self._check_signal(sig)
104 try:
105 del self._signal_handlers[sig]
106 except KeyError:
107 return False
108
109 if sig == signal.SIGINT:
110 handler = signal.default_int_handler
111 else:
112 handler = signal.SIG_DFL
113
114 try:
115 signal.signal(sig, handler)
116 except OSError as exc:
117 if exc.errno == errno.EINVAL:
118 raise RuntimeError('sig {} cannot be caught'.format(sig))
119 else:
120 raise
121
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 return True
129
130 def _check_signal(self, sig):
131 """Internal helper to validate a signal.
132
133 Raise ValueError if the signal number is invalid or uncatchable.
134 Raise RuntimeError if there is a problem setting up the handler.
135 """
136 if not isinstance(sig, int):
137 raise TypeError('sig must be an int, not {!r}'.format(sig))
138
139 if not (1 <= sig < signal.NSIG):
140 raise ValueError(
141 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
142
143 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
144 extra=None):
145 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
150
151 @tasks.coroutine
152 def _make_subprocess_transport(self, protocol, args, shell,
153 stdin, stdout, stderr, bufsize,
154 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800155 with events.get_child_watcher() as watcher:
156 transp = _UnixSubprocessTransport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800158 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800159 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800160 watcher.add_child_handler(transp.get_pid(),
161 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 return transp
164
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800165 def _child_watcher_callback(self, pid, returncode, transp):
166 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167
Yury Selivanovb057c522014-02-18 12:15:06 -0500168 @tasks.coroutine
169 def create_unix_connection(self, protocol_factory, path, *,
170 ssl=None, sock=None,
171 server_hostname=None):
172 assert server_hostname is None or isinstance(server_hostname, str)
173 if ssl:
174 if server_hostname is None:
175 raise ValueError(
176 'you have to pass server_hostname when using ssl')
177 else:
178 if server_hostname is not None:
179 raise ValueError('server_hostname is only meaningful with ssl')
180
181 if path is not None:
182 if sock is not None:
183 raise ValueError(
184 'path and sock can not be specified at the same time')
185
Victor Stinner79a29522014-02-19 01:45:59 +0100186 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500188 sock.setblocking(False)
189 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100190 except:
191 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500192 raise
193
194 else:
195 if sock is None:
196 raise ValueError('no path and sock were specified')
197 sock.setblocking(False)
198
199 transport, protocol = yield from self._create_connection_transport(
200 sock, protocol_factory, ssl, server_hostname)
201 return transport, protocol
202
203 @tasks.coroutine
204 def create_unix_server(self, protocol_factory, path=None, *,
205 sock=None, backlog=100, ssl=None):
206 if isinstance(ssl, bool):
207 raise TypeError('ssl argument must be an SSLContext or None')
208
209 if path is not None:
210 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
211
212 try:
213 sock.bind(path)
214 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100215 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500216 if exc.errno == errno.EADDRINUSE:
217 # Let's improve the error message by adding
218 # with what exact address it occurs.
219 msg = 'Address {!r} is already in use'.format(path)
220 raise OSError(errno.EADDRINUSE, msg) from None
221 else:
222 raise
223 else:
224 if sock is None:
225 raise ValueError(
226 'path was not specified, and no sock specified')
227
228 if sock.family != socket.AF_UNIX:
229 raise ValueError(
230 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
231
232 server = base_events.Server(self, [sock])
233 sock.listen(backlog)
234 sock.setblocking(False)
235 self._start_serving(protocol_factory, sock, ssl, server)
236 return server
237
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238
239def _set_nonblocking(fd):
240 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
241 flags = flags | os.O_NONBLOCK
242 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
243
244
245class _UnixReadPipeTransport(transports.ReadTransport):
246
247 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
248
249 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
250 super().__init__(extra)
251 self._extra['pipe'] = pipe
252 self._loop = loop
253 self._pipe = pipe
254 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700255 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800256 if not (stat.S_ISFIFO(mode) or
257 stat.S_ISSOCK(mode) or
258 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700259 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 _set_nonblocking(self._fileno)
261 self._protocol = protocol
262 self._closing = False
263 self._loop.add_reader(self._fileno, self._read_ready)
264 self._loop.call_soon(self._protocol.connection_made, self)
265 if waiter is not None:
266 self._loop.call_soon(waiter.set_result, None)
267
268 def _read_ready(self):
269 try:
270 data = os.read(self._fileno, self.max_size)
271 except (BlockingIOError, InterruptedError):
272 pass
273 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100274 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275 else:
276 if data:
277 self._protocol.data_received(data)
278 else:
279 self._closing = True
280 self._loop.remove_reader(self._fileno)
281 self._loop.call_soon(self._protocol.eof_received)
282 self._loop.call_soon(self._call_connection_lost, None)
283
Guido van Rossum57497ad2013-10-18 07:58:20 -0700284 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 self._loop.remove_reader(self._fileno)
286
Guido van Rossum57497ad2013-10-18 07:58:20 -0700287 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 self._loop.add_reader(self._fileno, self._read_ready)
289
290 def close(self):
291 if not self._closing:
292 self._close(None)
293
Victor Stinner0ee29c22014-02-19 01:40:41 +0100294 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800296 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500297 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100298 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500299 'exception': exc,
300 'transport': self,
301 'protocol': self._protocol,
302 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 self._close(exc)
304
305 def _close(self, exc):
306 self._closing = True
307 self._loop.remove_reader(self._fileno)
308 self._loop.call_soon(self._call_connection_lost, exc)
309
310 def _call_connection_lost(self, exc):
311 try:
312 self._protocol.connection_lost(exc)
313 finally:
314 self._pipe.close()
315 self._pipe = None
316 self._protocol = None
317 self._loop = None
318
319
Yury Selivanov3cb99142014-02-18 18:41:13 -0500320class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800321 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
323 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
324 super().__init__(extra)
325 self._extra['pipe'] = pipe
326 self._loop = loop
327 self._pipe = pipe
328 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700329 mode = os.fstat(self._fileno).st_mode
330 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100331 if not (is_socket or
332 stat.S_ISFIFO(mode) or
333 stat.S_ISCHR(mode)):
334 raise ValueError("Pipe transport is only for "
335 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 _set_nonblocking(self._fileno)
337 self._protocol = protocol
338 self._buffer = []
339 self._conn_lost = 0
340 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700341
342 # On AIX, the reader trick only works for sockets.
343 # On other platforms it works for pipes and sockets.
344 # (Exception: OS X 10.4? Issue #19294.)
345 if is_socket or not sys.platform.startswith("aix"):
346 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347
348 self._loop.call_soon(self._protocol.connection_made, self)
349 if waiter is not None:
350 self._loop.call_soon(waiter.set_result, None)
351
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800352 def get_write_buffer_size(self):
353 return sum(len(data) for data in self._buffer)
354
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700356 # Pipe was closed by peer.
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100357 if self._buffer:
358 self._close(BrokenPipeError())
359 else:
360 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361
362 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800363 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
364 if isinstance(data, bytearray):
365 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 if not data:
367 return
368
369 if self._conn_lost or self._closing:
370 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700371 logger.warning('pipe closed by peer or '
372 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._conn_lost += 1
374 return
375
376 if not self._buffer:
377 # Attempt to send it right away first.
378 try:
379 n = os.write(self._fileno, data)
380 except (BlockingIOError, InterruptedError):
381 n = 0
382 except Exception as exc:
383 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100384 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 return
386 if n == len(data):
387 return
388 elif n > 0:
389 data = data[n:]
390 self._loop.add_writer(self._fileno, self._write_ready)
391
392 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800393 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395 def _write_ready(self):
396 data = b''.join(self._buffer)
397 assert data, 'Data should not be empty'
398
399 self._buffer.clear()
400 try:
401 n = os.write(self._fileno, data)
402 except (BlockingIOError, InterruptedError):
403 self._buffer.append(data)
404 except Exception as exc:
405 self._conn_lost += 1
406 # Remove writer here, _fatal_error() doesn't it
407 # because _buffer is empty.
408 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100409 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 else:
411 if n == len(data):
412 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800413 self._maybe_resume_protocol() # May append to buffer.
414 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 self._loop.remove_reader(self._fileno)
416 self._call_connection_lost(None)
417 return
418 elif n > 0:
419 data = data[n:]
420
421 self._buffer.append(data) # Try again later.
422
423 def can_write_eof(self):
424 return True
425
426 # TODO: Make the relationships between write_eof(), close(),
427 # abort(), _fatal_error() and _close() more straightforward.
428
429 def write_eof(self):
430 if self._closing:
431 return
432 assert self._pipe
433 self._closing = True
434 if not self._buffer:
435 self._loop.remove_reader(self._fileno)
436 self._loop.call_soon(self._call_connection_lost, None)
437
438 def close(self):
439 if not self._closing:
440 # write_eof is all what we needed to close the write pipe
441 self.write_eof()
442
443 def abort(self):
444 self._close(None)
445
Victor Stinner0ee29c22014-02-19 01:40:41 +0100446 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700447 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800448 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500449 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100450 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500451 'exception': exc,
452 'transport': self,
453 'protocol': self._protocol,
454 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 self._close(exc)
456
457 def _close(self, exc=None):
458 self._closing = True
459 if self._buffer:
460 self._loop.remove_writer(self._fileno)
461 self._buffer.clear()
462 self._loop.remove_reader(self._fileno)
463 self._loop.call_soon(self._call_connection_lost, exc)
464
465 def _call_connection_lost(self, exc):
466 try:
467 self._protocol.connection_lost(exc)
468 finally:
469 self._pipe.close()
470 self._pipe = None
471 self._protocol = None
472 self._loop = None
473
474
Guido van Rossum59691282013-10-30 14:52:03 -0700475class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476
Guido van Rossum59691282013-10-30 14:52:03 -0700477 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700478 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700480 # Use a socket pair for stdin, since not all platforms
481 # support selecting read events on the write end of a
482 # socket (which we use in order to detect closing of the
483 # other end). Notably this is needed on AIX, and works
484 # just fine on other platforms.
485 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 self._proc = subprocess.Popen(
487 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
488 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700489 if stdin_w is not None:
490 stdin.close()
491 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800492
493
494class AbstractChildWatcher:
495 """Abstract base class for monitoring child processes.
496
497 Objects derived from this class monitor a collection of subprocesses and
498 report their termination or interruption by a signal.
499
500 New callbacks are registered with .add_child_handler(). Starting a new
501 process must be done within a 'with' block to allow the watcher to suspend
502 its activity until the new process if fully registered (this is needed to
503 prevent a race condition in some implementations).
504
505 Example:
506 with watcher:
507 proc = subprocess.Popen("sleep 1")
508 watcher.add_child_handler(proc.pid, callback)
509
510 Notes:
511 Implementations of this class must be thread-safe.
512
513 Since child watcher objects may catch the SIGCHLD signal and call
514 waitpid(-1), there should be only one active object per process.
515 """
516
517 def add_child_handler(self, pid, callback, *args):
518 """Register a new child handler.
519
520 Arrange for callback(pid, returncode, *args) to be called when
521 process 'pid' terminates. Specifying another callback for the same
522 process replaces the previous handler.
523
524 Note: callback() must be thread-safe
525 """
526 raise NotImplementedError()
527
528 def remove_child_handler(self, pid):
529 """Removes the handler for process 'pid'.
530
531 The function returns True if the handler was successfully removed,
532 False if there was nothing to remove."""
533
534 raise NotImplementedError()
535
Guido van Rossum2bcae702013-11-13 15:50:08 -0800536 def attach_loop(self, loop):
537 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800538
Guido van Rossum2bcae702013-11-13 15:50:08 -0800539 If the watcher was previously attached to an event loop, then it is
540 first detached before attaching to the new loop.
541
542 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800543 """
544 raise NotImplementedError()
545
546 def close(self):
547 """Close the watcher.
548
549 This must be called to make sure that any underlying resource is freed.
550 """
551 raise NotImplementedError()
552
553 def __enter__(self):
554 """Enter the watcher's context and allow starting new processes
555
556 This function must return self"""
557 raise NotImplementedError()
558
559 def __exit__(self, a, b, c):
560 """Exit the watcher's context"""
561 raise NotImplementedError()
562
563
564class BaseChildWatcher(AbstractChildWatcher):
565
Guido van Rossum2bcae702013-11-13 15:50:08 -0800566 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800567 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800568
569 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800570 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800571
572 def _do_waitpid(self, expected_pid):
573 raise NotImplementedError()
574
575 def _do_waitpid_all(self):
576 raise NotImplementedError()
577
Guido van Rossum2bcae702013-11-13 15:50:08 -0800578 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800579 assert loop is None or isinstance(loop, events.AbstractEventLoop)
580
581 if self._loop is not None:
582 self._loop.remove_signal_handler(signal.SIGCHLD)
583
584 self._loop = loop
585 if loop is not None:
586 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
587
588 # Prevent a race condition in case a child terminated
589 # during the switch.
590 self._do_waitpid_all()
591
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800592 def _sig_chld(self):
593 try:
594 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500595 except Exception as exc:
596 # self._loop should always be available here
597 # as '_sig_chld' is added as a signal handler
598 # in 'attach_loop'
599 self._loop.call_exception_handler({
600 'message': 'Unknown exception in SIGCHLD handler',
601 'exception': exc,
602 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800603
604 def _compute_returncode(self, status):
605 if os.WIFSIGNALED(status):
606 # The child process died because of a signal.
607 return -os.WTERMSIG(status)
608 elif os.WIFEXITED(status):
609 # The child process exited (e.g sys.exit()).
610 return os.WEXITSTATUS(status)
611 else:
612 # The child exited, but we don't understand its status.
613 # This shouldn't happen, but if it does, let's just
614 # return that status; perhaps that helps debug it.
615 return status
616
617
618class SafeChildWatcher(BaseChildWatcher):
619 """'Safe' child watcher implementation.
620
621 This implementation avoids disrupting other code spawning processes by
622 polling explicitly each process in the SIGCHLD handler instead of calling
623 os.waitpid(-1).
624
625 This is a safe solution but it has a significant overhead when handling a
626 big number of children (O(n) each time SIGCHLD is raised)
627 """
628
Guido van Rossum2bcae702013-11-13 15:50:08 -0800629 def __init__(self):
630 super().__init__()
631 self._callbacks = {}
632
633 def close(self):
634 self._callbacks.clear()
635 super().close()
636
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800637 def __enter__(self):
638 return self
639
640 def __exit__(self, a, b, c):
641 pass
642
643 def add_child_handler(self, pid, callback, *args):
644 self._callbacks[pid] = callback, args
645
646 # Prevent a race condition in case the child is already terminated.
647 self._do_waitpid(pid)
648
Guido van Rossum2bcae702013-11-13 15:50:08 -0800649 def remove_child_handler(self, pid):
650 try:
651 del self._callbacks[pid]
652 return True
653 except KeyError:
654 return False
655
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800656 def _do_waitpid_all(self):
657
658 for pid in list(self._callbacks):
659 self._do_waitpid(pid)
660
661 def _do_waitpid(self, expected_pid):
662 assert expected_pid > 0
663
664 try:
665 pid, status = os.waitpid(expected_pid, os.WNOHANG)
666 except ChildProcessError:
667 # The child process is already reaped
668 # (may happen if waitpid() is called elsewhere).
669 pid = expected_pid
670 returncode = 255
671 logger.warning(
672 "Unknown child process pid %d, will report returncode 255",
673 pid)
674 else:
675 if pid == 0:
676 # The child process is still alive.
677 return
678
679 returncode = self._compute_returncode(status)
680
681 try:
682 callback, args = self._callbacks.pop(pid)
683 except KeyError: # pragma: no cover
684 # May happen if .remove_child_handler() is called
685 # after os.waitpid() returns.
686 pass
687 else:
688 callback(pid, returncode, *args)
689
690
691class FastChildWatcher(BaseChildWatcher):
692 """'Fast' child watcher implementation.
693
694 This implementation reaps every terminated processes by calling
695 os.waitpid(-1) directly, possibly breaking other code spawning processes
696 and waiting for their termination.
697
698 There is no noticeable overhead when handling a big number of children
699 (O(1) each time a child terminates).
700 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800701 def __init__(self):
702 super().__init__()
703 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800704 self._lock = threading.Lock()
705 self._zombies = {}
706 self._forks = 0
707
708 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800709 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800711 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800712
713 def __enter__(self):
714 with self._lock:
715 self._forks += 1
716
717 return self
718
719 def __exit__(self, a, b, c):
720 with self._lock:
721 self._forks -= 1
722
723 if self._forks or not self._zombies:
724 return
725
726 collateral_victims = str(self._zombies)
727 self._zombies.clear()
728
729 logger.warning(
730 "Caught subprocesses termination from unknown pids: %s",
731 collateral_victims)
732
733 def add_child_handler(self, pid, callback, *args):
734 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800735 with self._lock:
736 try:
737 returncode = self._zombies.pop(pid)
738 except KeyError:
739 # The child is running.
740 self._callbacks[pid] = callback, args
741 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800742
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800743 # The child is dead already. We can fire the callback.
744 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800745
Guido van Rossum2bcae702013-11-13 15:50:08 -0800746 def remove_child_handler(self, pid):
747 try:
748 del self._callbacks[pid]
749 return True
750 except KeyError:
751 return False
752
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753 def _do_waitpid_all(self):
754 # Because of signal coalescing, we must keep calling waitpid() as
755 # long as we're able to reap a child.
756 while True:
757 try:
758 pid, status = os.waitpid(-1, os.WNOHANG)
759 except ChildProcessError:
760 # No more child processes exist.
761 return
762 else:
763 if pid == 0:
764 # A child process is still alive.
765 return
766
767 returncode = self._compute_returncode(status)
768
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800769 with self._lock:
770 try:
771 callback, args = self._callbacks.pop(pid)
772 except KeyError:
773 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800774 if self._forks:
775 # It may not be registered yet.
776 self._zombies[pid] = returncode
777 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800778 callback = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800779
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800780 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800781 logger.warning(
782 "Caught subprocess termination from unknown pid: "
783 "%d -> %d", pid, returncode)
784 else:
785 callback(pid, returncode, *args)
786
787
788class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
789 """XXX"""
790 _loop_factory = _UnixSelectorEventLoop
791
792 def __init__(self):
793 super().__init__()
794 self._watcher = None
795
796 def _init_watcher(self):
797 with events._lock:
798 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800799 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 if isinstance(threading.current_thread(),
801 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800802 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800803
804 def set_event_loop(self, loop):
805 """Set the event loop.
806
807 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800808 .set_event_loop() from the main thread will call .attach_loop(loop) on
809 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800810 """
811
812 super().set_event_loop(loop)
813
814 if self._watcher is not None and \
815 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800816 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817
818 def get_child_watcher(self):
819 """Get the child watcher
820
821 If not yet set, a SafeChildWatcher object is automatically created.
822 """
823 if self._watcher is None:
824 self._init_watcher()
825
826 return self._watcher
827
828 def set_child_watcher(self, watcher):
829 """Set the child watcher"""
830
831 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
832
833 if self._watcher is not None:
834 self._watcher.close()
835
836 self._watcher = watcher
837
838SelectorEventLoop = _UnixSelectorEventLoop
839DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy