blob: 06bcdcc5ed1c8d3f297d5d0c576f09a4ffd65782 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01005import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Yury Selivanovb057c522014-02-18 12:15:06 -050015from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070016from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Yury Selivanovd7c15182016-11-15 15:26:34 -050041try:
42 _fspath = os.fspath
43except AttributeError:
44 # Python 3.5 or earlier
45 _fspath = lambda path: path
46
47
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080048class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050049 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Yury Selivanovb057c522014-02-18 12:15:06 -050051 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 """
53
54 def __init__(self, selector=None):
55 super().__init__(selector)
56 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080058 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020059 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080060 for sig in list(self._signal_handlers):
61 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080062
Victor Stinnerfe5649c2014-07-17 22:43:40 +020063 def _process_self_data(self, data):
64 for signum in data:
65 if not signum:
66 # ignore null bytes written by _write_to_self()
67 continue
68 self._handle_signal(signum)
69
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 def add_signal_handler(self, sig, callback, *args):
71 """Add a handler for a signal. UNIX only.
72
73 Raise ValueError if the signal number is invalid or uncatchable.
74 Raise RuntimeError if there is a problem setting up the handler.
75 """
Victor Stinner2d99d932014-11-20 15:03:52 +010076 if (coroutines.iscoroutine(callback)
77 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010078 raise TypeError("coroutines cannot be used "
79 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010081 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 try:
83 # set_wakeup_fd() raises ValueError if this is not the
84 # main thread. By calling it early we ensure that an
85 # event loop running in another thread cannot add a signal
86 # handler.
87 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020088 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 raise RuntimeError(str(exc))
90
Yury Selivanov569efa22014-02-18 18:02:19 -050091 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 self._signal_handlers[sig] = handle
93
94 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020095 # Register a dummy signal handler to ask Python to write the signal
96 # number in the wakup file descriptor. _process_self_data() will
97 # read signal numbers from this file descriptor to handle signals.
98 signal.signal(sig, _sighandler_noop)
99
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100100 # Set SA_RESTART to limit EINTR occurrences.
101 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 except OSError as exc:
103 del self._signal_handlers[sig]
104 if not self._signal_handlers:
105 try:
106 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200107 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700108 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 if exc.errno == errno.EINVAL:
111 raise RuntimeError('sig {} cannot be caught'.format(sig))
112 else:
113 raise
114
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200115 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 """Internal helper that is the actual signal handler."""
117 handle = self._signal_handlers.get(sig)
118 if handle is None:
119 return # Assume it's some race condition.
120 if handle._cancelled:
121 self.remove_signal_handler(sig) # Remove it properly.
122 else:
123 self._add_callback_signalsafe(handle)
124
125 def remove_signal_handler(self, sig):
126 """Remove a handler for a signal. UNIX only.
127
128 Return True if a signal handler was removed, False if not.
129 """
130 self._check_signal(sig)
131 try:
132 del self._signal_handlers[sig]
133 except KeyError:
134 return False
135
136 if sig == signal.SIGINT:
137 handler = signal.default_int_handler
138 else:
139 handler = signal.SIG_DFL
140
141 try:
142 signal.signal(sig, handler)
143 except OSError as exc:
144 if exc.errno == errno.EINVAL:
145 raise RuntimeError('sig {} cannot be caught'.format(sig))
146 else:
147 raise
148
149 if not self._signal_handlers:
150 try:
151 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200152 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700153 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
155 return True
156
157 def _check_signal(self, sig):
158 """Internal helper to validate a signal.
159
160 Raise ValueError if the signal number is invalid or uncatchable.
161 Raise RuntimeError if there is a problem setting up the handler.
162 """
163 if not isinstance(sig, int):
164 raise TypeError('sig must be an int, not {!r}'.format(sig))
165
166 if not (1 <= sig < signal.NSIG):
167 raise ValueError(
168 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
169
170 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
171 extra=None):
172 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
173
174 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
175 extra=None):
176 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
177
Victor Stinnerf951d282014-06-29 00:46:45 +0200178 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 def _make_subprocess_transport(self, protocol, args, shell,
180 stdin, stdout, stderr, bufsize,
181 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800182 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400183 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800184 transp = _UnixSubprocessTransport(self, protocol, args, shell,
185 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100186 waiter=waiter, extra=extra,
187 **kwargs)
188
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800189 watcher.add_child_handler(transp.get_pid(),
190 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100191 try:
192 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100193 except Exception as exc:
194 # Workaround CPython bug #23353: using yield/yield-from in an
195 # except block of a generator doesn't clear properly
196 # sys.exc_info()
197 err = exc
198 else:
199 err = None
200
201 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100202 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100203 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100204 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800205
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700206 return transp
207
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800208 def _child_watcher_callback(self, pid, returncode, transp):
209 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210
Victor Stinnerf951d282014-06-29 00:46:45 +0200211 @coroutine
Yury Selivanov423fd362017-11-20 17:26:28 -0500212 def create_unix_connection(self, protocol_factory, path=None, *,
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 ssl=None, sock=None,
214 server_hostname=None):
215 assert server_hostname is None or isinstance(server_hostname, str)
216 if ssl:
217 if server_hostname is None:
218 raise ValueError(
219 'you have to pass server_hostname when using ssl')
220 else:
221 if server_hostname is not None:
222 raise ValueError('server_hostname is only meaningful with ssl')
223
224 if path is not None:
225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Yury Selivanov423fd362017-11-20 17:26:28 -0500229 path = _fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 sock.setblocking(False)
233 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100234 except:
235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 raise
237
238 else:
239 if sock is None:
240 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400241 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500242 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 raise ValueError(
244 'A UNIX Domain Stream Socket was expected, got {!r}'
245 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500246 sock.setblocking(False)
247
248 transport, protocol = yield from self._create_connection_transport(
249 sock, protocol_factory, ssl, server_hostname)
250 return transport, protocol
251
Victor Stinnerf951d282014-06-29 00:46:45 +0200252 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500253 def create_unix_server(self, protocol_factory, path=None, *,
254 sock=None, backlog=100, ssl=None):
255 if isinstance(ssl, bool):
256 raise TypeError('ssl argument must be an SSLContext or None')
257
258 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200259 if sock is not None:
260 raise ValueError(
261 'path and sock can not be specified at the same time')
262
Yury Selivanovd7c15182016-11-15 15:26:34 -0500263 path = _fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500264 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
265
Yury Selivanov908d55d2016-10-09 12:15:08 -0400266 # Check for abstract socket. `str` and `bytes` paths are supported.
267 if path[0] not in (0, '\x00'):
268 try:
269 if stat.S_ISSOCK(os.stat(path).st_mode):
270 os.remove(path)
271 except FileNotFoundError:
272 pass
273 except OSError as err:
274 # Directory may have permissions only to create socket.
275 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
276
Yury Selivanovb057c522014-02-18 12:15:06 -0500277 try:
278 sock.bind(path)
279 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100280 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500281 if exc.errno == errno.EADDRINUSE:
282 # Let's improve the error message by adding
283 # with what exact address it occurs.
284 msg = 'Address {!r} is already in use'.format(path)
285 raise OSError(errno.EADDRINUSE, msg) from None
286 else:
287 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200288 except:
289 sock.close()
290 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500291 else:
292 if sock is None:
293 raise ValueError(
294 'path was not specified, and no sock specified')
295
Yury Selivanov36e7e972016-10-07 12:39:57 -0400296 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500297 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500298 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400299 'A UNIX Domain Stream Socket was expected, got {!r}'
300 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500301
302 server = base_events.Server(self, [sock])
303 sock.listen(backlog)
304 sock.setblocking(False)
305 self._start_serving(protocol_factory, sock, ssl, server)
306 return server
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200309if hasattr(os, 'set_blocking'):
310 def _set_nonblocking(fd):
311 os.set_blocking(fd, False)
312else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400313 import fcntl
314
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200315 def _set_nonblocking(fd):
316 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
317 flags = flags | os.O_NONBLOCK
318 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319
320
321class _UnixReadPipeTransport(transports.ReadTransport):
322
Yury Selivanovdec1a452014-02-18 22:27:48 -0500323 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
326 super().__init__(extra)
327 self._extra['pipe'] = pipe
328 self._loop = loop
329 self._pipe = pipe
330 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700331 self._protocol = protocol
332 self._closing = False
333
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700334 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800335 if not (stat.S_ISFIFO(mode) or
336 stat.S_ISSOCK(mode) or
337 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700338 self._pipe = None
339 self._fileno = None
340 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700341 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700344
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100346 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400347 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100348 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100350 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500351 self._loop.call_soon(futures._set_result_unless_cancelled,
352 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
Victor Stinnere912e652014-07-12 03:11:53 +0200354 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100355 info = [self.__class__.__name__]
356 if self._pipe is None:
357 info.append('closed')
358 elif self._closing:
359 info.append('closing')
360 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400361 selector = getattr(self._loop, '_selector', None)
362 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200363 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400364 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200365 self._fileno, selectors.EVENT_READ)
366 if polling:
367 info.append('polling')
368 else:
369 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400370 elif self._pipe is not None:
371 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200372 else:
373 info.append('closed')
374 return '<%s>' % ' '.join(info)
375
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 def _read_ready(self):
377 try:
378 data = os.read(self._fileno, self.max_size)
379 except (BlockingIOError, InterruptedError):
380 pass
381 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100382 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 else:
384 if data:
385 self._protocol.data_received(data)
386 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200387 if self._loop.get_debug():
388 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400390 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._loop.call_soon(self._protocol.eof_received)
392 self._loop.call_soon(self._call_connection_lost, None)
393
Guido van Rossum57497ad2013-10-18 07:58:20 -0700394 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400395 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Guido van Rossum57497ad2013-10-18 07:58:20 -0700397 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400398 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400400 def set_protocol(self, protocol):
401 self._protocol = protocol
402
403 def get_protocol(self):
404 return self._protocol
405
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500406 def is_closing(self):
407 return self._closing
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def close(self):
410 if not self._closing:
411 self._close(None)
412
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900413 def __del__(self):
414 if self._pipe is not None:
415 warnings.warn("unclosed transport %r" % self, ResourceWarning,
416 source=self)
417 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100418
Victor Stinner0ee29c22014-02-19 01:40:41 +0100419 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200421 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
422 if self._loop.get_debug():
423 logger.debug("%r: %s", self, message, exc_info=True)
424 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500425 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100426 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500427 'exception': exc,
428 'transport': self,
429 'protocol': self._protocol,
430 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 self._close(exc)
432
433 def _close(self, exc):
434 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400435 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 self._loop.call_soon(self._call_connection_lost, exc)
437
438 def _call_connection_lost(self, exc):
439 try:
440 self._protocol.connection_lost(exc)
441 finally:
442 self._pipe.close()
443 self._pipe = None
444 self._protocol = None
445 self._loop = None
446
447
Yury Selivanov3cb99142014-02-18 18:41:13 -0500448class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800449 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
451 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100452 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 self._pipe = pipe
455 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400457 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 self._conn_lost = 0
459 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700460
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700461 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700462 is_char = stat.S_ISCHR(mode)
463 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700464 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700465 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700466 self._pipe = None
467 self._fileno = None
468 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100469 raise ValueError("Pipe transport is only for "
470 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700471
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100474
475 # On AIX, the reader trick (to be notified when the read end of the
476 # socket is closed) only works for sockets. On other platforms it
477 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700478 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100479 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400480 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100481 self._fileno, self._read_ready)
482
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100484 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500485 self._loop.call_soon(futures._set_result_unless_cancelled,
486 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487
Victor Stinnere912e652014-07-12 03:11:53 +0200488 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100489 info = [self.__class__.__name__]
490 if self._pipe is None:
491 info.append('closed')
492 elif self._closing:
493 info.append('closing')
494 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400495 selector = getattr(self._loop, '_selector', None)
496 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200497 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400498 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200499 self._fileno, selectors.EVENT_WRITE)
500 if polling:
501 info.append('polling')
502 else:
503 info.append('idle')
504
505 bufsize = self.get_write_buffer_size()
506 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400507 elif self._pipe is not None:
508 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200509 else:
510 info.append('closed')
511 return '<%s>' % ' '.join(info)
512
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800513 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400514 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800515
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700517 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200518 if self._loop.get_debug():
519 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100520 if self._buffer:
521 self._close(BrokenPipeError())
522 else:
523 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524
525 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800526 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
527 if isinstance(data, bytearray):
528 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 if not data:
530 return
531
532 if self._conn_lost or self._closing:
533 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700534 logger.warning('pipe closed by peer or '
535 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._conn_lost += 1
537 return
538
539 if not self._buffer:
540 # Attempt to send it right away first.
541 try:
542 n = os.write(self._fileno, data)
543 except (BlockingIOError, InterruptedError):
544 n = 0
545 except Exception as exc:
546 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100547 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 return
549 if n == len(data):
550 return
551 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400552 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400553 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400555 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800556 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557
558 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400559 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400564 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400566 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 self._conn_lost += 1
568 # Remove writer here, _fatal_error() doesn't it
569 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400570 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100571 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400573 if n == len(self._buffer):
574 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400575 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800576 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400577 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400578 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 self._call_connection_lost(None)
580 return
581 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400582 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583
584 def can_write_eof(self):
585 return True
586
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 def write_eof(self):
588 if self._closing:
589 return
590 assert self._pipe
591 self._closing = True
592 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400593 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 self._loop.call_soon(self._call_connection_lost, None)
595
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400596 def set_protocol(self, protocol):
597 self._protocol = protocol
598
599 def get_protocol(self):
600 return self._protocol
601
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500602 def is_closing(self):
603 return self._closing
604
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100606 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 # write_eof is all what we needed to close the write pipe
608 self.write_eof()
609
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900610 def __del__(self):
611 if self._pipe is not None:
612 warnings.warn("unclosed transport %r" % self, ResourceWarning,
613 source=self)
614 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100615
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 def abort(self):
617 self._close(None)
618
Victor Stinner0ee29c22014-02-19 01:40:41 +0100619 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200621 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200622 if self._loop.get_debug():
623 logger.debug("%r: %s", self, message, exc_info=True)
624 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500625 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100626 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500627 'exception': exc,
628 'transport': self,
629 'protocol': self._protocol,
630 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 self._close(exc)
632
633 def _close(self, exc=None):
634 self._closing = True
635 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400636 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400638 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 self._loop.call_soon(self._call_connection_lost, exc)
640
641 def _call_connection_lost(self, exc):
642 try:
643 self._protocol.connection_lost(exc)
644 finally:
645 self._pipe.close()
646 self._pipe = None
647 self._protocol = None
648 self._loop = None
649
650
Victor Stinner1e40f102014-12-11 23:30:17 +0100651if hasattr(os, 'set_inheritable'):
652 # Python 3.4 and newer
653 _set_inheritable = os.set_inheritable
654else:
655 import fcntl
656
657 def _set_inheritable(fd, inheritable):
658 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
659
660 old = fcntl.fcntl(fd, fcntl.F_GETFD)
661 if not inheritable:
662 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
663 else:
664 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
665
666
Guido van Rossum59691282013-10-30 14:52:03 -0700667class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
Guido van Rossum59691282013-10-30 14:52:03 -0700669 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700670 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700672 # Use a socket pair for stdin, since not all platforms
673 # support selecting read events on the write end of a
674 # socket (which we use in order to detect closing of the
675 # other end). Notably this is needed on AIX, and works
676 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100677 stdin, stdin_w = socket.socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100678
679 # Mark the write end of the stdin pipe as non-inheritable,
680 # needed by close_fds=False on Python 3.3 and older
681 # (Python 3.4 implements the PEP 446, socketpair returns
682 # non-inheritable sockets)
683 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 self._proc = subprocess.Popen(
685 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
686 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700687 if stdin_w is not None:
688 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200689 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800690
691
692class AbstractChildWatcher:
693 """Abstract base class for monitoring child processes.
694
695 Objects derived from this class monitor a collection of subprocesses and
696 report their termination or interruption by a signal.
697
698 New callbacks are registered with .add_child_handler(). Starting a new
699 process must be done within a 'with' block to allow the watcher to suspend
700 its activity until the new process if fully registered (this is needed to
701 prevent a race condition in some implementations).
702
703 Example:
704 with watcher:
705 proc = subprocess.Popen("sleep 1")
706 watcher.add_child_handler(proc.pid, callback)
707
708 Notes:
709 Implementations of this class must be thread-safe.
710
711 Since child watcher objects may catch the SIGCHLD signal and call
712 waitpid(-1), there should be only one active object per process.
713 """
714
715 def add_child_handler(self, pid, callback, *args):
716 """Register a new child handler.
717
718 Arrange for callback(pid, returncode, *args) to be called when
719 process 'pid' terminates. Specifying another callback for the same
720 process replaces the previous handler.
721
Victor Stinneracdb7822014-07-14 18:33:40 +0200722 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723 """
724 raise NotImplementedError()
725
726 def remove_child_handler(self, pid):
727 """Removes the handler for process 'pid'.
728
729 The function returns True if the handler was successfully removed,
730 False if there was nothing to remove."""
731
732 raise NotImplementedError()
733
Guido van Rossum2bcae702013-11-13 15:50:08 -0800734 def attach_loop(self, loop):
735 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800736
Guido van Rossum2bcae702013-11-13 15:50:08 -0800737 If the watcher was previously attached to an event loop, then it is
738 first detached before attaching to the new loop.
739
740 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800741 """
742 raise NotImplementedError()
743
744 def close(self):
745 """Close the watcher.
746
747 This must be called to make sure that any underlying resource is freed.
748 """
749 raise NotImplementedError()
750
751 def __enter__(self):
752 """Enter the watcher's context and allow starting new processes
753
754 This function must return self"""
755 raise NotImplementedError()
756
757 def __exit__(self, a, b, c):
758 """Exit the watcher's context"""
759 raise NotImplementedError()
760
761
762class BaseChildWatcher(AbstractChildWatcher):
763
Guido van Rossum2bcae702013-11-13 15:50:08 -0800764 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800765 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400766 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800767
768 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800769 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800770
771 def _do_waitpid(self, expected_pid):
772 raise NotImplementedError()
773
774 def _do_waitpid_all(self):
775 raise NotImplementedError()
776
Guido van Rossum2bcae702013-11-13 15:50:08 -0800777 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800778 assert loop is None or isinstance(loop, events.AbstractEventLoop)
779
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400780 if self._loop is not None and loop is None and self._callbacks:
781 warnings.warn(
782 'A loop is being detached '
783 'from a child watcher with pending handlers',
784 RuntimeWarning)
785
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800786 if self._loop is not None:
787 self._loop.remove_signal_handler(signal.SIGCHLD)
788
789 self._loop = loop
790 if loop is not None:
791 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
792
793 # Prevent a race condition in case a child terminated
794 # during the switch.
795 self._do_waitpid_all()
796
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800797 def _sig_chld(self):
798 try:
799 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500800 except Exception as exc:
801 # self._loop should always be available here
802 # as '_sig_chld' is added as a signal handler
803 # in 'attach_loop'
804 self._loop.call_exception_handler({
805 'message': 'Unknown exception in SIGCHLD handler',
806 'exception': exc,
807 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800808
809 def _compute_returncode(self, status):
810 if os.WIFSIGNALED(status):
811 # The child process died because of a signal.
812 return -os.WTERMSIG(status)
813 elif os.WIFEXITED(status):
814 # The child process exited (e.g sys.exit()).
815 return os.WEXITSTATUS(status)
816 else:
817 # The child exited, but we don't understand its status.
818 # This shouldn't happen, but if it does, let's just
819 # return that status; perhaps that helps debug it.
820 return status
821
822
823class SafeChildWatcher(BaseChildWatcher):
824 """'Safe' child watcher implementation.
825
826 This implementation avoids disrupting other code spawning processes by
827 polling explicitly each process in the SIGCHLD handler instead of calling
828 os.waitpid(-1).
829
830 This is a safe solution but it has a significant overhead when handling a
831 big number of children (O(n) each time SIGCHLD is raised)
832 """
833
Guido van Rossum2bcae702013-11-13 15:50:08 -0800834 def close(self):
835 self._callbacks.clear()
836 super().close()
837
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800838 def __enter__(self):
839 return self
840
841 def __exit__(self, a, b, c):
842 pass
843
844 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400845 if self._loop is None:
846 raise RuntimeError(
847 "Cannot add child handler, "
848 "the child watcher does not have a loop attached")
849
Victor Stinner47cd10d2015-01-30 00:05:19 +0100850 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800851
852 # Prevent a race condition in case the child is already terminated.
853 self._do_waitpid(pid)
854
Guido van Rossum2bcae702013-11-13 15:50:08 -0800855 def remove_child_handler(self, pid):
856 try:
857 del self._callbacks[pid]
858 return True
859 except KeyError:
860 return False
861
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862 def _do_waitpid_all(self):
863
864 for pid in list(self._callbacks):
865 self._do_waitpid(pid)
866
867 def _do_waitpid(self, expected_pid):
868 assert expected_pid > 0
869
870 try:
871 pid, status = os.waitpid(expected_pid, os.WNOHANG)
872 except ChildProcessError:
873 # The child process is already reaped
874 # (may happen if waitpid() is called elsewhere).
875 pid = expected_pid
876 returncode = 255
877 logger.warning(
878 "Unknown child process pid %d, will report returncode 255",
879 pid)
880 else:
881 if pid == 0:
882 # The child process is still alive.
883 return
884
885 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200886 if self._loop.get_debug():
887 logger.debug('process %s exited with returncode %s',
888 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800889
890 try:
891 callback, args = self._callbacks.pop(pid)
892 except KeyError: # pragma: no cover
893 # May happen if .remove_child_handler() is called
894 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200895 if self._loop.get_debug():
896 logger.warning("Child watcher got an unexpected pid: %r",
897 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800898 else:
899 callback(pid, returncode, *args)
900
901
902class FastChildWatcher(BaseChildWatcher):
903 """'Fast' child watcher implementation.
904
905 This implementation reaps every terminated processes by calling
906 os.waitpid(-1) directly, possibly breaking other code spawning processes
907 and waiting for their termination.
908
909 There is no noticeable overhead when handling a big number of children
910 (O(1) each time a child terminates).
911 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800912 def __init__(self):
913 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800914 self._lock = threading.Lock()
915 self._zombies = {}
916 self._forks = 0
917
918 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800919 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800920 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800921 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800922
923 def __enter__(self):
924 with self._lock:
925 self._forks += 1
926
927 return self
928
929 def __exit__(self, a, b, c):
930 with self._lock:
931 self._forks -= 1
932
933 if self._forks or not self._zombies:
934 return
935
936 collateral_victims = str(self._zombies)
937 self._zombies.clear()
938
939 logger.warning(
940 "Caught subprocesses termination from unknown pids: %s",
941 collateral_victims)
942
943 def add_child_handler(self, pid, callback, *args):
944 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400945
946 if self._loop is None:
947 raise RuntimeError(
948 "Cannot add child handler, "
949 "the child watcher does not have a loop attached")
950
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800951 with self._lock:
952 try:
953 returncode = self._zombies.pop(pid)
954 except KeyError:
955 # The child is running.
956 self._callbacks[pid] = callback, args
957 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800958
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800959 # The child is dead already. We can fire the callback.
960 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800961
Guido van Rossum2bcae702013-11-13 15:50:08 -0800962 def remove_child_handler(self, pid):
963 try:
964 del self._callbacks[pid]
965 return True
966 except KeyError:
967 return False
968
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969 def _do_waitpid_all(self):
970 # Because of signal coalescing, we must keep calling waitpid() as
971 # long as we're able to reap a child.
972 while True:
973 try:
974 pid, status = os.waitpid(-1, os.WNOHANG)
975 except ChildProcessError:
976 # No more child processes exist.
977 return
978 else:
979 if pid == 0:
980 # A child process is still alive.
981 return
982
983 returncode = self._compute_returncode(status)
984
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800985 with self._lock:
986 try:
987 callback, args = self._callbacks.pop(pid)
988 except KeyError:
989 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800990 if self._forks:
991 # It may not be registered yet.
992 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200993 if self._loop.get_debug():
994 logger.debug('unknown process %s exited '
995 'with returncode %s',
996 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800997 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800998 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200999 else:
1000 if self._loop.get_debug():
1001 logger.debug('process %s exited with returncode %s',
1002 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001003
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001004 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001005 logger.warning(
1006 "Caught subprocess termination from unknown pid: "
1007 "%d -> %d", pid, returncode)
1008 else:
1009 callback(pid, returncode, *args)
1010
1011
1012class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001013 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001014 _loop_factory = _UnixSelectorEventLoop
1015
1016 def __init__(self):
1017 super().__init__()
1018 self._watcher = None
1019
1020 def _init_watcher(self):
1021 with events._lock:
1022 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001023 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001024 if isinstance(threading.current_thread(),
1025 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001026 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001027
1028 def set_event_loop(self, loop):
1029 """Set the event loop.
1030
1031 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001032 .set_event_loop() from the main thread will call .attach_loop(loop) on
1033 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001034 """
1035
1036 super().set_event_loop(loop)
1037
1038 if self._watcher is not None and \
1039 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001040 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001041
1042 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001043 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001044
1045 If not yet set, a SafeChildWatcher object is automatically created.
1046 """
1047 if self._watcher is None:
1048 self._init_watcher()
1049
1050 return self._watcher
1051
1052 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001053 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001054
1055 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1056
1057 if self._watcher is not None:
1058 self._watcher.close()
1059
1060 self._watcher = watcher
1061
1062SelectorEventLoop = _UnixSelectorEventLoop
1063DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy