blob: bf682a1a98a39f56be38a8e3e1ab4022fa06a132 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010019from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020021from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Yury Selivanovd7c15182016-11-15 15:26:34 -050041try:
42 _fspath = os.fspath
43except AttributeError:
44 # Python 3.5 or earlier
45 _fspath = lambda path: path
46
47
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080048class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050049 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Yury Selivanovb057c522014-02-18 12:15:06 -050051 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 """
53
54 def __init__(self, selector=None):
55 super().__init__(selector)
56 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
58 def _socketpair(self):
59 return socket.socketpair()
60
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080061 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020062 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080063 for sig in list(self._signal_handlers):
64 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080065
Victor Stinnerfe5649c2014-07-17 22:43:40 +020066 def _process_self_data(self, data):
67 for signum in data:
68 if not signum:
69 # ignore null bytes written by _write_to_self()
70 continue
71 self._handle_signal(signum)
72
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 def add_signal_handler(self, sig, callback, *args):
74 """Add a handler for a signal. UNIX only.
75
76 Raise ValueError if the signal number is invalid or uncatchable.
77 Raise RuntimeError if there is a problem setting up the handler.
78 """
Victor Stinner2d99d932014-11-20 15:03:52 +010079 if (coroutines.iscoroutine(callback)
80 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010081 raise TypeError("coroutines cannot be used "
82 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010084 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 try:
86 # set_wakeup_fd() raises ValueError if this is not the
87 # main thread. By calling it early we ensure that an
88 # event loop running in another thread cannot add a signal
89 # handler.
90 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020091 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 raise RuntimeError(str(exc))
93
Yury Selivanov569efa22014-02-18 18:02:19 -050094 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 self._signal_handlers[sig] = handle
96
97 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020098 # Register a dummy signal handler to ask Python to write the signal
99 # number in the wakup file descriptor. _process_self_data() will
100 # read signal numbers from this file descriptor to handle signals.
101 signal.signal(sig, _sighandler_noop)
102
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100103 # Set SA_RESTART to limit EINTR occurrences.
104 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 except OSError as exc:
106 del self._signal_handlers[sig]
107 if not self._signal_handlers:
108 try:
109 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200110 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700111 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
113 if exc.errno == errno.EINVAL:
114 raise RuntimeError('sig {} cannot be caught'.format(sig))
115 else:
116 raise
117
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200118 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 """Internal helper that is the actual signal handler."""
120 handle = self._signal_handlers.get(sig)
121 if handle is None:
122 return # Assume it's some race condition.
123 if handle._cancelled:
124 self.remove_signal_handler(sig) # Remove it properly.
125 else:
126 self._add_callback_signalsafe(handle)
127
128 def remove_signal_handler(self, sig):
129 """Remove a handler for a signal. UNIX only.
130
131 Return True if a signal handler was removed, False if not.
132 """
133 self._check_signal(sig)
134 try:
135 del self._signal_handlers[sig]
136 except KeyError:
137 return False
138
139 if sig == signal.SIGINT:
140 handler = signal.default_int_handler
141 else:
142 handler = signal.SIG_DFL
143
144 try:
145 signal.signal(sig, handler)
146 except OSError as exc:
147 if exc.errno == errno.EINVAL:
148 raise RuntimeError('sig {} cannot be caught'.format(sig))
149 else:
150 raise
151
152 if not self._signal_handlers:
153 try:
154 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200155 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700156 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 return True
159
160 def _check_signal(self, sig):
161 """Internal helper to validate a signal.
162
163 Raise ValueError if the signal number is invalid or uncatchable.
164 Raise RuntimeError if there is a problem setting up the handler.
165 """
166 if not isinstance(sig, int):
167 raise TypeError('sig must be an int, not {!r}'.format(sig))
168
169 if not (1 <= sig < signal.NSIG):
170 raise ValueError(
171 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
172
173 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
174 extra=None):
175 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
176
177 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
180
Victor Stinnerf951d282014-06-29 00:46:45 +0200181 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 def _make_subprocess_transport(self, protocol, args, shell,
183 stdin, stdout, stderr, bufsize,
184 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400186 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800187 transp = _UnixSubprocessTransport(self, protocol, args, shell,
188 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100189 waiter=waiter, extra=extra,
190 **kwargs)
191
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800192 watcher.add_child_handler(transp.get_pid(),
193 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100194 try:
195 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100196 except Exception as exc:
197 # Workaround CPython bug #23353: using yield/yield-from in an
198 # except block of a generator doesn't clear properly
199 # sys.exc_info()
200 err = exc
201 else:
202 err = None
203
204 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100205 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100206 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100207 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800208
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 return transp
210
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800211 def _child_watcher_callback(self, pid, returncode, transp):
212 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
Victor Stinnerf951d282014-06-29 00:46:45 +0200214 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500215 def create_unix_connection(self, protocol_factory, path, *,
216 ssl=None, sock=None,
217 server_hostname=None):
218 assert server_hostname is None or isinstance(server_hostname, str)
219 if ssl:
220 if server_hostname is None:
221 raise ValueError(
222 'you have to pass server_hostname when using ssl')
223 else:
224 if server_hostname is not None:
225 raise ValueError('server_hostname is only meaningful with ssl')
226
227 if path is not None:
228 if sock is not None:
229 raise ValueError(
230 'path and sock can not be specified at the same time')
231
Victor Stinner79a29522014-02-19 01:45:59 +0100232 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500233 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 sock.setblocking(False)
235 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100236 except:
237 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500238 raise
239
240 else:
241 if sock is None:
242 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500244 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400245 raise ValueError(
246 'A UNIX Domain Stream Socket was expected, got {!r}'
247 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 sock.setblocking(False)
249
250 transport, protocol = yield from self._create_connection_transport(
251 sock, protocol_factory, ssl, server_hostname)
252 return transport, protocol
253
Victor Stinnerf951d282014-06-29 00:46:45 +0200254 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500255 def create_unix_server(self, protocol_factory, path=None, *,
256 sock=None, backlog=100, ssl=None):
257 if isinstance(ssl, bool):
258 raise TypeError('ssl argument must be an SSLContext or None')
259
260 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200261 if sock is not None:
262 raise ValueError(
263 'path and sock can not be specified at the same time')
264
Yury Selivanovd7c15182016-11-15 15:26:34 -0500265 path = _fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500266 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
267
Yury Selivanov908d55d2016-10-09 12:15:08 -0400268 # Check for abstract socket. `str` and `bytes` paths are supported.
269 if path[0] not in (0, '\x00'):
270 try:
271 if stat.S_ISSOCK(os.stat(path).st_mode):
272 os.remove(path)
273 except FileNotFoundError:
274 pass
275 except OSError as err:
276 # Directory may have permissions only to create socket.
277 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
278
Yury Selivanovb057c522014-02-18 12:15:06 -0500279 try:
280 sock.bind(path)
281 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100282 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500283 if exc.errno == errno.EADDRINUSE:
284 # Let's improve the error message by adding
285 # with what exact address it occurs.
286 msg = 'Address {!r} is already in use'.format(path)
287 raise OSError(errno.EADDRINUSE, msg) from None
288 else:
289 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200290 except:
291 sock.close()
292 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500293 else:
294 if sock is None:
295 raise ValueError(
296 'path was not specified, and no sock specified')
297
Yury Selivanov36e7e972016-10-07 12:39:57 -0400298 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500299 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500300 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400301 'A UNIX Domain Stream Socket was expected, got {!r}'
302 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500303
304 server = base_events.Server(self, [sock])
305 sock.listen(backlog)
306 sock.setblocking(False)
307 self._start_serving(protocol_factory, sock, ssl, server)
308 return server
309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200311if hasattr(os, 'set_blocking'):
312 def _set_nonblocking(fd):
313 os.set_blocking(fd, False)
314else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400315 import fcntl
316
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200317 def _set_nonblocking(fd):
318 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
319 flags = flags | os.O_NONBLOCK
320 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321
322
323class _UnixReadPipeTransport(transports.ReadTransport):
324
Yury Selivanovdec1a452014-02-18 22:27:48 -0500325 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
328 super().__init__(extra)
329 self._extra['pipe'] = pipe
330 self._loop = loop
331 self._pipe = pipe
332 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700333 self._protocol = protocol
334 self._closing = False
335
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700336 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800337 if not (stat.S_ISFIFO(mode) or
338 stat.S_ISSOCK(mode) or
339 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700340 self._pipe = None
341 self._fileno = None
342 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700343 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700344
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100348 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400349 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100350 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100352 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500353 self._loop.call_soon(futures._set_result_unless_cancelled,
354 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355
Victor Stinnere912e652014-07-12 03:11:53 +0200356 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100357 info = [self.__class__.__name__]
358 if self._pipe is None:
359 info.append('closed')
360 elif self._closing:
361 info.append('closing')
362 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400363 selector = getattr(self._loop, '_selector', None)
364 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200365 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400366 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200367 self._fileno, selectors.EVENT_READ)
368 if polling:
369 info.append('polling')
370 else:
371 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400372 elif self._pipe is not None:
373 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200374 else:
375 info.append('closed')
376 return '<%s>' % ' '.join(info)
377
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 def _read_ready(self):
379 try:
380 data = os.read(self._fileno, self.max_size)
381 except (BlockingIOError, InterruptedError):
382 pass
383 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100384 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 else:
386 if data:
387 self._protocol.data_received(data)
388 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200389 if self._loop.get_debug():
390 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400392 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 self._loop.call_soon(self._protocol.eof_received)
394 self._loop.call_soon(self._call_connection_lost, None)
395
Guido van Rossum57497ad2013-10-18 07:58:20 -0700396 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400397 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
Guido van Rossum57497ad2013-10-18 07:58:20 -0700399 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400400 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400402 def set_protocol(self, protocol):
403 self._protocol = protocol
404
405 def get_protocol(self):
406 return self._protocol
407
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500408 def is_closing(self):
409 return self._closing
410
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 def close(self):
412 if not self._closing:
413 self._close(None)
414
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900415 def __del__(self):
416 if self._pipe is not None:
417 warnings.warn("unclosed transport %r" % self, ResourceWarning,
418 source=self)
419 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100420
Victor Stinner0ee29c22014-02-19 01:40:41 +0100421 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200423 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
424 if self._loop.get_debug():
425 logger.debug("%r: %s", self, message, exc_info=True)
426 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500427 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100428 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500429 'exception': exc,
430 'transport': self,
431 'protocol': self._protocol,
432 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 self._close(exc)
434
435 def _close(self, exc):
436 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400437 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._loop.call_soon(self._call_connection_lost, exc)
439
440 def _call_connection_lost(self, exc):
441 try:
442 self._protocol.connection_lost(exc)
443 finally:
444 self._pipe.close()
445 self._pipe = None
446 self._protocol = None
447 self._loop = None
448
449
Yury Selivanov3cb99142014-02-18 18:41:13 -0500450class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800451 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
453 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100454 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 self._pipe = pipe
457 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400459 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._conn_lost = 0
461 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700462
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700463 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700464 is_char = stat.S_ISCHR(mode)
465 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700466 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700467 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700468 self._pipe = None
469 self._fileno = None
470 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100471 raise ValueError("Pipe transport is only for "
472 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700473
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100476
477 # On AIX, the reader trick (to be notified when the read end of the
478 # socket is closed) only works for sockets. On other platforms it
479 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700480 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100481 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400482 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100483 self._fileno, self._read_ready)
484
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100486 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500487 self._loop.call_soon(futures._set_result_unless_cancelled,
488 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
Victor Stinnere912e652014-07-12 03:11:53 +0200490 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100491 info = [self.__class__.__name__]
492 if self._pipe is None:
493 info.append('closed')
494 elif self._closing:
495 info.append('closing')
496 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400497 selector = getattr(self._loop, '_selector', None)
498 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200499 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400500 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200501 self._fileno, selectors.EVENT_WRITE)
502 if polling:
503 info.append('polling')
504 else:
505 info.append('idle')
506
507 bufsize = self.get_write_buffer_size()
508 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400509 elif self._pipe is not None:
510 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200511 else:
512 info.append('closed')
513 return '<%s>' % ' '.join(info)
514
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800515 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400516 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800517
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700519 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200520 if self._loop.get_debug():
521 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100522 if self._buffer:
523 self._close(BrokenPipeError())
524 else:
525 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526
527 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800528 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
529 if isinstance(data, bytearray):
530 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 if not data:
532 return
533
534 if self._conn_lost or self._closing:
535 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700536 logger.warning('pipe closed by peer or '
537 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 self._conn_lost += 1
539 return
540
541 if not self._buffer:
542 # Attempt to send it right away first.
543 try:
544 n = os.write(self._fileno, data)
545 except (BlockingIOError, InterruptedError):
546 n = 0
547 except Exception as exc:
548 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100549 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 return
551 if n == len(data):
552 return
553 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400554 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400555 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800558 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559
560 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400561 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400564 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400566 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400568 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 self._conn_lost += 1
570 # Remove writer here, _fatal_error() doesn't it
571 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400572 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100573 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400575 if n == len(self._buffer):
576 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400577 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800578 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400579 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 self._call_connection_lost(None)
582 return
583 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400584 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585
586 def can_write_eof(self):
587 return True
588
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 def write_eof(self):
590 if self._closing:
591 return
592 assert self._pipe
593 self._closing = True
594 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400595 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 self._loop.call_soon(self._call_connection_lost, None)
597
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400598 def set_protocol(self, protocol):
599 self._protocol = protocol
600
601 def get_protocol(self):
602 return self._protocol
603
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500604 def is_closing(self):
605 return self._closing
606
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100608 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 # write_eof is all what we needed to close the write pipe
610 self.write_eof()
611
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900612 def __del__(self):
613 if self._pipe is not None:
614 warnings.warn("unclosed transport %r" % self, ResourceWarning,
615 source=self)
616 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100617
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 def abort(self):
619 self._close(None)
620
Victor Stinner0ee29c22014-02-19 01:40:41 +0100621 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200623 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200624 if self._loop.get_debug():
625 logger.debug("%r: %s", self, message, exc_info=True)
626 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500627 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100628 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500629 'exception': exc,
630 'transport': self,
631 'protocol': self._protocol,
632 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633 self._close(exc)
634
635 def _close(self, exc=None):
636 self._closing = True
637 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400638 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400640 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 self._loop.call_soon(self._call_connection_lost, exc)
642
643 def _call_connection_lost(self, exc):
644 try:
645 self._protocol.connection_lost(exc)
646 finally:
647 self._pipe.close()
648 self._pipe = None
649 self._protocol = None
650 self._loop = None
651
652
Victor Stinner1e40f102014-12-11 23:30:17 +0100653if hasattr(os, 'set_inheritable'):
654 # Python 3.4 and newer
655 _set_inheritable = os.set_inheritable
656else:
657 import fcntl
658
659 def _set_inheritable(fd, inheritable):
660 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
661
662 old = fcntl.fcntl(fd, fcntl.F_GETFD)
663 if not inheritable:
664 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
665 else:
666 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
667
668
Guido van Rossum59691282013-10-30 14:52:03 -0700669class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670
Guido van Rossum59691282013-10-30 14:52:03 -0700671 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700672 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700674 # Use a socket pair for stdin, since not all platforms
675 # support selecting read events on the write end of a
676 # socket (which we use in order to detect closing of the
677 # other end). Notably this is needed on AIX, and works
678 # just fine on other platforms.
679 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100680
681 # Mark the write end of the stdin pipe as non-inheritable,
682 # needed by close_fds=False on Python 3.3 and older
683 # (Python 3.4 implements the PEP 446, socketpair returns
684 # non-inheritable sockets)
685 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 self._proc = subprocess.Popen(
687 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
688 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700689 if stdin_w is not None:
690 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200691 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800692
693
694class AbstractChildWatcher:
695 """Abstract base class for monitoring child processes.
696
697 Objects derived from this class monitor a collection of subprocesses and
698 report their termination or interruption by a signal.
699
700 New callbacks are registered with .add_child_handler(). Starting a new
701 process must be done within a 'with' block to allow the watcher to suspend
702 its activity until the new process if fully registered (this is needed to
703 prevent a race condition in some implementations).
704
705 Example:
706 with watcher:
707 proc = subprocess.Popen("sleep 1")
708 watcher.add_child_handler(proc.pid, callback)
709
710 Notes:
711 Implementations of this class must be thread-safe.
712
713 Since child watcher objects may catch the SIGCHLD signal and call
714 waitpid(-1), there should be only one active object per process.
715 """
716
717 def add_child_handler(self, pid, callback, *args):
718 """Register a new child handler.
719
720 Arrange for callback(pid, returncode, *args) to be called when
721 process 'pid' terminates. Specifying another callback for the same
722 process replaces the previous handler.
723
Victor Stinneracdb7822014-07-14 18:33:40 +0200724 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725 """
726 raise NotImplementedError()
727
728 def remove_child_handler(self, pid):
729 """Removes the handler for process 'pid'.
730
731 The function returns True if the handler was successfully removed,
732 False if there was nothing to remove."""
733
734 raise NotImplementedError()
735
Guido van Rossum2bcae702013-11-13 15:50:08 -0800736 def attach_loop(self, loop):
737 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800738
Guido van Rossum2bcae702013-11-13 15:50:08 -0800739 If the watcher was previously attached to an event loop, then it is
740 first detached before attaching to the new loop.
741
742 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800743 """
744 raise NotImplementedError()
745
746 def close(self):
747 """Close the watcher.
748
749 This must be called to make sure that any underlying resource is freed.
750 """
751 raise NotImplementedError()
752
753 def __enter__(self):
754 """Enter the watcher's context and allow starting new processes
755
756 This function must return self"""
757 raise NotImplementedError()
758
759 def __exit__(self, a, b, c):
760 """Exit the watcher's context"""
761 raise NotImplementedError()
762
763
764class BaseChildWatcher(AbstractChildWatcher):
765
Guido van Rossum2bcae702013-11-13 15:50:08 -0800766 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800767 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400768 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800769
770 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800771 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800772
773 def _do_waitpid(self, expected_pid):
774 raise NotImplementedError()
775
776 def _do_waitpid_all(self):
777 raise NotImplementedError()
778
Guido van Rossum2bcae702013-11-13 15:50:08 -0800779 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780 assert loop is None or isinstance(loop, events.AbstractEventLoop)
781
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400782 if self._loop is not None and loop is None and self._callbacks:
783 warnings.warn(
784 'A loop is being detached '
785 'from a child watcher with pending handlers',
786 RuntimeWarning)
787
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800788 if self._loop is not None:
789 self._loop.remove_signal_handler(signal.SIGCHLD)
790
791 self._loop = loop
792 if loop is not None:
793 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
794
795 # Prevent a race condition in case a child terminated
796 # during the switch.
797 self._do_waitpid_all()
798
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800799 def _sig_chld(self):
800 try:
801 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500802 except Exception as exc:
803 # self._loop should always be available here
804 # as '_sig_chld' is added as a signal handler
805 # in 'attach_loop'
806 self._loop.call_exception_handler({
807 'message': 'Unknown exception in SIGCHLD handler',
808 'exception': exc,
809 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800810
811 def _compute_returncode(self, status):
812 if os.WIFSIGNALED(status):
813 # The child process died because of a signal.
814 return -os.WTERMSIG(status)
815 elif os.WIFEXITED(status):
816 # The child process exited (e.g sys.exit()).
817 return os.WEXITSTATUS(status)
818 else:
819 # The child exited, but we don't understand its status.
820 # This shouldn't happen, but if it does, let's just
821 # return that status; perhaps that helps debug it.
822 return status
823
824
825class SafeChildWatcher(BaseChildWatcher):
826 """'Safe' child watcher implementation.
827
828 This implementation avoids disrupting other code spawning processes by
829 polling explicitly each process in the SIGCHLD handler instead of calling
830 os.waitpid(-1).
831
832 This is a safe solution but it has a significant overhead when handling a
833 big number of children (O(n) each time SIGCHLD is raised)
834 """
835
Guido van Rossum2bcae702013-11-13 15:50:08 -0800836 def close(self):
837 self._callbacks.clear()
838 super().close()
839
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840 def __enter__(self):
841 return self
842
843 def __exit__(self, a, b, c):
844 pass
845
846 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400847 if self._loop is None:
848 raise RuntimeError(
849 "Cannot add child handler, "
850 "the child watcher does not have a loop attached")
851
Victor Stinner47cd10d2015-01-30 00:05:19 +0100852 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853
854 # Prevent a race condition in case the child is already terminated.
855 self._do_waitpid(pid)
856
Guido van Rossum2bcae702013-11-13 15:50:08 -0800857 def remove_child_handler(self, pid):
858 try:
859 del self._callbacks[pid]
860 return True
861 except KeyError:
862 return False
863
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800864 def _do_waitpid_all(self):
865
866 for pid in list(self._callbacks):
867 self._do_waitpid(pid)
868
869 def _do_waitpid(self, expected_pid):
870 assert expected_pid > 0
871
872 try:
873 pid, status = os.waitpid(expected_pid, os.WNOHANG)
874 except ChildProcessError:
875 # The child process is already reaped
876 # (may happen if waitpid() is called elsewhere).
877 pid = expected_pid
878 returncode = 255
879 logger.warning(
880 "Unknown child process pid %d, will report returncode 255",
881 pid)
882 else:
883 if pid == 0:
884 # The child process is still alive.
885 return
886
887 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200888 if self._loop.get_debug():
889 logger.debug('process %s exited with returncode %s',
890 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891
892 try:
893 callback, args = self._callbacks.pop(pid)
894 except KeyError: # pragma: no cover
895 # May happen if .remove_child_handler() is called
896 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200897 if self._loop.get_debug():
898 logger.warning("Child watcher got an unexpected pid: %r",
899 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900 else:
901 callback(pid, returncode, *args)
902
903
904class FastChildWatcher(BaseChildWatcher):
905 """'Fast' child watcher implementation.
906
907 This implementation reaps every terminated processes by calling
908 os.waitpid(-1) directly, possibly breaking other code spawning processes
909 and waiting for their termination.
910
911 There is no noticeable overhead when handling a big number of children
912 (O(1) each time a child terminates).
913 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800914 def __init__(self):
915 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800916 self._lock = threading.Lock()
917 self._zombies = {}
918 self._forks = 0
919
920 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800921 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800922 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800923 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800924
925 def __enter__(self):
926 with self._lock:
927 self._forks += 1
928
929 return self
930
931 def __exit__(self, a, b, c):
932 with self._lock:
933 self._forks -= 1
934
935 if self._forks or not self._zombies:
936 return
937
938 collateral_victims = str(self._zombies)
939 self._zombies.clear()
940
941 logger.warning(
942 "Caught subprocesses termination from unknown pids: %s",
943 collateral_victims)
944
945 def add_child_handler(self, pid, callback, *args):
946 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400947
948 if self._loop is None:
949 raise RuntimeError(
950 "Cannot add child handler, "
951 "the child watcher does not have a loop attached")
952
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800953 with self._lock:
954 try:
955 returncode = self._zombies.pop(pid)
956 except KeyError:
957 # The child is running.
958 self._callbacks[pid] = callback, args
959 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800960
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800961 # The child is dead already. We can fire the callback.
962 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800963
Guido van Rossum2bcae702013-11-13 15:50:08 -0800964 def remove_child_handler(self, pid):
965 try:
966 del self._callbacks[pid]
967 return True
968 except KeyError:
969 return False
970
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800971 def _do_waitpid_all(self):
972 # Because of signal coalescing, we must keep calling waitpid() as
973 # long as we're able to reap a child.
974 while True:
975 try:
976 pid, status = os.waitpid(-1, os.WNOHANG)
977 except ChildProcessError:
978 # No more child processes exist.
979 return
980 else:
981 if pid == 0:
982 # A child process is still alive.
983 return
984
985 returncode = self._compute_returncode(status)
986
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800987 with self._lock:
988 try:
989 callback, args = self._callbacks.pop(pid)
990 except KeyError:
991 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800992 if self._forks:
993 # It may not be registered yet.
994 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200995 if self._loop.get_debug():
996 logger.debug('unknown process %s exited '
997 'with returncode %s',
998 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800999 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001000 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001001 else:
1002 if self._loop.get_debug():
1003 logger.debug('process %s exited with returncode %s',
1004 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001005
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001006 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001007 logger.warning(
1008 "Caught subprocess termination from unknown pid: "
1009 "%d -> %d", pid, returncode)
1010 else:
1011 callback(pid, returncode, *args)
1012
1013
1014class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001015 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001016 _loop_factory = _UnixSelectorEventLoop
1017
1018 def __init__(self):
1019 super().__init__()
1020 self._watcher = None
1021
1022 def _init_watcher(self):
1023 with events._lock:
1024 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001025 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001026 if isinstance(threading.current_thread(),
1027 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001028 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001029
1030 def set_event_loop(self, loop):
1031 """Set the event loop.
1032
1033 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001034 .set_event_loop() from the main thread will call .attach_loop(loop) on
1035 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001036 """
1037
1038 super().set_event_loop(loop)
1039
1040 if self._watcher is not None and \
1041 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001042 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001043
1044 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001045 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001046
1047 If not yet set, a SafeChildWatcher object is automatically created.
1048 """
1049 if self._watcher is None:
1050 self._init_watcher()
1051
1052 return self._watcher
1053
1054 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001055 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001056
1057 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1058
1059 if self._watcher is not None:
1060 self._watcher.close()
1061
1062 self._watcher = watcher
1063
1064SelectorEventLoop = _UnixSelectorEventLoop
1065DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy