blob: ab818da1dfabfa50fb85e3732310deb462c77151 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01005import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Yury Selivanovb057c522014-02-18 12:15:06 -050015from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070016from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080041class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050042 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
Yury Selivanovb057c522014-02-18 12:15:06 -050044 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 """
46
47 def __init__(self, selector=None):
48 super().__init__(selector)
49 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020052 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 for sig in list(self._signal_handlers):
54 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055
Victor Stinnerfe5649c2014-07-17 22:43:40 +020056 def _process_self_data(self, data):
57 for signum in data:
58 if not signum:
59 # ignore null bytes written by _write_to_self()
60 continue
61 self._handle_signal(signum)
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 def add_signal_handler(self, sig, callback, *args):
64 """Add a handler for a signal. UNIX only.
65
66 Raise ValueError if the signal number is invalid or uncatchable.
67 Raise RuntimeError if there is a problem setting up the handler.
68 """
Victor Stinner2d99d932014-11-20 15:03:52 +010069 if (coroutines.iscoroutine(callback)
Andrew Svetlovcc839202017-11-29 18:23:43 +020070 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010071 raise TypeError("coroutines cannot be used "
72 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010074 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 try:
76 # set_wakeup_fd() raises ValueError if this is not the
77 # main thread. By calling it early we ensure that an
78 # event loop running in another thread cannot add a signal
79 # handler.
80 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020081 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082 raise RuntimeError(str(exc))
83
Yury Selivanov569efa22014-02-18 18:02:19 -050084 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._signal_handlers[sig] = handle
86
87 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020088 # Register a dummy signal handler to ask Python to write the signal
89 # number in the wakup file descriptor. _process_self_data() will
90 # read signal numbers from this file descriptor to handle signals.
91 signal.signal(sig, _sighandler_noop)
92
Charles-François Natali74e7cf32013-12-05 22:47:19 +010093 # Set SA_RESTART to limit EINTR occurrences.
94 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 except OSError as exc:
96 del self._signal_handlers[sig]
97 if not self._signal_handlers:
98 try:
99 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200100 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700101 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102
103 if exc.errno == errno.EINVAL:
104 raise RuntimeError('sig {} cannot be caught'.format(sig))
105 else:
106 raise
107
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200108 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 """Internal helper that is the actual signal handler."""
110 handle = self._signal_handlers.get(sig)
111 if handle is None:
112 return # Assume it's some race condition.
113 if handle._cancelled:
114 self.remove_signal_handler(sig) # Remove it properly.
115 else:
116 self._add_callback_signalsafe(handle)
117
118 def remove_signal_handler(self, sig):
119 """Remove a handler for a signal. UNIX only.
120
121 Return True if a signal handler was removed, False if not.
122 """
123 self._check_signal(sig)
124 try:
125 del self._signal_handlers[sig]
126 except KeyError:
127 return False
128
129 if sig == signal.SIGINT:
130 handler = signal.default_int_handler
131 else:
132 handler = signal.SIG_DFL
133
134 try:
135 signal.signal(sig, handler)
136 except OSError as exc:
137 if exc.errno == errno.EINVAL:
138 raise RuntimeError('sig {} cannot be caught'.format(sig))
139 else:
140 raise
141
142 if not self._signal_handlers:
143 try:
144 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200145 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700146 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
148 return True
149
150 def _check_signal(self, sig):
151 """Internal helper to validate a signal.
152
153 Raise ValueError if the signal number is invalid or uncatchable.
154 Raise RuntimeError if there is a problem setting up the handler.
155 """
156 if not isinstance(sig, int):
157 raise TypeError('sig must be an int, not {!r}'.format(sig))
158
159 if not (1 <= sig < signal.NSIG):
160 raise ValueError(
161 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
162
163 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
166
167 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
170
Victor Stinnerf951d282014-06-29 00:46:45 +0200171 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 def _make_subprocess_transport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
174 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400176 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800177 transp = _UnixSubprocessTransport(self, protocol, args, shell,
178 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100179 waiter=waiter, extra=extra,
180 **kwargs)
181
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800182 watcher.add_child_handler(transp.get_pid(),
183 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100184 try:
185 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100186 except Exception as exc:
187 # Workaround CPython bug #23353: using yield/yield-from in an
188 # except block of a generator doesn't clear properly
189 # sys.exc_info()
190 err = exc
191 else:
192 err = None
193
194 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100196 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100197 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800198
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 return transp
200
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800201 def _child_watcher_callback(self, pid, returncode, transp):
202 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
Victor Stinnerf951d282014-06-29 00:46:45 +0200204 @coroutine
Yury Selivanov423fd362017-11-20 17:26:28 -0500205 def create_unix_connection(self, protocol_factory, path=None, *,
Yury Selivanovb057c522014-02-18 12:15:06 -0500206 ssl=None, sock=None,
207 server_hostname=None):
208 assert server_hostname is None or isinstance(server_hostname, str)
209 if ssl:
210 if server_hostname is None:
211 raise ValueError(
212 'you have to pass server_hostname when using ssl')
213 else:
214 if server_hostname is not None:
215 raise ValueError('server_hostname is only meaningful with ssl')
216
217 if path is not None:
218 if sock is not None:
219 raise ValueError(
220 'path and sock can not be specified at the same time')
221
Andrew Svetlovcc839202017-11-29 18:23:43 +0200222 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100223 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500224 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500225 sock.setblocking(False)
226 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100227 except:
228 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 raise
230
231 else:
232 if sock is None:
233 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400234 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500235 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400236 raise ValueError(
237 'A UNIX Domain Stream Socket was expected, got {!r}'
238 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 sock.setblocking(False)
240
241 transport, protocol = yield from self._create_connection_transport(
242 sock, protocol_factory, ssl, server_hostname)
243 return transport, protocol
244
Victor Stinnerf951d282014-06-29 00:46:45 +0200245 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500246 def create_unix_server(self, protocol_factory, path=None, *,
247 sock=None, backlog=100, ssl=None):
248 if isinstance(ssl, bool):
249 raise TypeError('ssl argument must be an SSLContext or None')
250
251 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200252 if sock is not None:
253 raise ValueError(
254 'path and sock can not be specified at the same time')
255
Andrew Svetlovcc839202017-11-29 18:23:43 +0200256 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
258
Yury Selivanov908d55d2016-10-09 12:15:08 -0400259 # Check for abstract socket. `str` and `bytes` paths are supported.
260 if path[0] not in (0, '\x00'):
261 try:
262 if stat.S_ISSOCK(os.stat(path).st_mode):
263 os.remove(path)
264 except FileNotFoundError:
265 pass
266 except OSError as err:
267 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200268 logger.error('Unable to check or remove stale UNIX socket '
269 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400270
Yury Selivanovb057c522014-02-18 12:15:06 -0500271 try:
272 sock.bind(path)
273 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100274 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500275 if exc.errno == errno.EADDRINUSE:
276 # Let's improve the error message by adding
277 # with what exact address it occurs.
278 msg = 'Address {!r} is already in use'.format(path)
279 raise OSError(errno.EADDRINUSE, msg) from None
280 else:
281 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200282 except:
283 sock.close()
284 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500285 else:
286 if sock is None:
287 raise ValueError(
288 'path was not specified, and no sock specified')
289
Yury Selivanov36e7e972016-10-07 12:39:57 -0400290 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500291 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500292 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400293 'A UNIX Domain Stream Socket was expected, got {!r}'
294 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500295
296 server = base_events.Server(self, [sock])
297 sock.listen(backlog)
298 sock.setblocking(False)
299 self._start_serving(protocol_factory, sock, ssl, server)
300 return server
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303class _UnixReadPipeTransport(transports.ReadTransport):
304
Yury Selivanovdec1a452014-02-18 22:27:48 -0500305 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
307 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
308 super().__init__(extra)
309 self._extra['pipe'] = pipe
310 self._loop = loop
311 self._pipe = pipe
312 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700313 self._protocol = protocol
314 self._closing = False
315
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700316 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800317 if not (stat.S_ISFIFO(mode) or
318 stat.S_ISSOCK(mode) or
319 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700320 self._pipe = None
321 self._fileno = None
322 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700323 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700324
Andrew Svetlovcc839202017-11-29 18:23:43 +0200325 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700326
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100328 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400329 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100330 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100332 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500333 self._loop.call_soon(futures._set_result_unless_cancelled,
334 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335
Victor Stinnere912e652014-07-12 03:11:53 +0200336 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100337 info = [self.__class__.__name__]
338 if self._pipe is None:
339 info.append('closed')
340 elif self._closing:
341 info.append('closing')
342 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400343 selector = getattr(self._loop, '_selector', None)
344 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200345 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400346 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200347 self._fileno, selectors.EVENT_READ)
348 if polling:
349 info.append('polling')
350 else:
351 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400352 elif self._pipe is not None:
353 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200354 else:
355 info.append('closed')
356 return '<%s>' % ' '.join(info)
357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 def _read_ready(self):
359 try:
360 data = os.read(self._fileno, self.max_size)
361 except (BlockingIOError, InterruptedError):
362 pass
363 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100364 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 else:
366 if data:
367 self._protocol.data_received(data)
368 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200369 if self._loop.get_debug():
370 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400372 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._loop.call_soon(self._protocol.eof_received)
374 self._loop.call_soon(self._call_connection_lost, None)
375
Guido van Rossum57497ad2013-10-18 07:58:20 -0700376 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400377 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
Guido van Rossum57497ad2013-10-18 07:58:20 -0700379 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400380 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400382 def set_protocol(self, protocol):
383 self._protocol = protocol
384
385 def get_protocol(self):
386 return self._protocol
387
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500388 def is_closing(self):
389 return self._closing
390
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 def close(self):
392 if not self._closing:
393 self._close(None)
394
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900395 def __del__(self):
396 if self._pipe is not None:
397 warnings.warn("unclosed transport %r" % self, ResourceWarning,
398 source=self)
399 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100400
Victor Stinner0ee29c22014-02-19 01:40:41 +0100401 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200403 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
404 if self._loop.get_debug():
405 logger.debug("%r: %s", self, message, exc_info=True)
406 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500407 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100408 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500409 'exception': exc,
410 'transport': self,
411 'protocol': self._protocol,
412 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 self._close(exc)
414
415 def _close(self, exc):
416 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400417 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 self._loop.call_soon(self._call_connection_lost, exc)
419
420 def _call_connection_lost(self, exc):
421 try:
422 self._protocol.connection_lost(exc)
423 finally:
424 self._pipe.close()
425 self._pipe = None
426 self._protocol = None
427 self._loop = None
428
429
Yury Selivanov3cb99142014-02-18 18:41:13 -0500430class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800431 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100434 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 self._pipe = pipe
437 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400439 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 self._conn_lost = 0
441 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700442
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700443 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700444 is_char = stat.S_ISCHR(mode)
445 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700446 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700447 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700448 self._pipe = None
449 self._fileno = None
450 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100451 raise ValueError("Pipe transport is only for "
452 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700453
Andrew Svetlovcc839202017-11-29 18:23:43 +0200454 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100456
457 # On AIX, the reader trick (to be notified when the read end of the
458 # socket is closed) only works for sockets. On other platforms it
459 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700460 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100461 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400462 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100463 self._fileno, self._read_ready)
464
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100466 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500467 self._loop.call_soon(futures._set_result_unless_cancelled,
468 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Victor Stinnere912e652014-07-12 03:11:53 +0200470 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100471 info = [self.__class__.__name__]
472 if self._pipe is None:
473 info.append('closed')
474 elif self._closing:
475 info.append('closing')
476 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400477 selector = getattr(self._loop, '_selector', None)
478 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200479 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400480 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200481 self._fileno, selectors.EVENT_WRITE)
482 if polling:
483 info.append('polling')
484 else:
485 info.append('idle')
486
487 bufsize = self.get_write_buffer_size()
488 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400489 elif self._pipe is not None:
490 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200491 else:
492 info.append('closed')
493 return '<%s>' % ' '.join(info)
494
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800495 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400496 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700499 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200500 if self._loop.get_debug():
501 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100502 if self._buffer:
503 self._close(BrokenPipeError())
504 else:
505 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
507 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800508 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
509 if isinstance(data, bytearray):
510 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 if not data:
512 return
513
514 if self._conn_lost or self._closing:
515 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700516 logger.warning('pipe closed by peer or '
517 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 self._conn_lost += 1
519 return
520
521 if not self._buffer:
522 # Attempt to send it right away first.
523 try:
524 n = os.write(self._fileno, data)
525 except (BlockingIOError, InterruptedError):
526 n = 0
527 except Exception as exc:
528 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100529 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 return
531 if n == len(data):
532 return
533 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400534 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400535 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400537 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800538 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539
540 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400541 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400544 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400546 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400548 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 self._conn_lost += 1
550 # Remove writer here, _fatal_error() doesn't it
551 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400552 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100553 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400555 if n == len(self._buffer):
556 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400557 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800558 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400559 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400560 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 self._call_connection_lost(None)
562 return
563 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400564 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
566 def can_write_eof(self):
567 return True
568
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 def write_eof(self):
570 if self._closing:
571 return
572 assert self._pipe
573 self._closing = True
574 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400575 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 self._loop.call_soon(self._call_connection_lost, None)
577
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400578 def set_protocol(self, protocol):
579 self._protocol = protocol
580
581 def get_protocol(self):
582 return self._protocol
583
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500584 def is_closing(self):
585 return self._closing
586
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100588 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589 # write_eof is all what we needed to close the write pipe
590 self.write_eof()
591
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900592 def __del__(self):
593 if self._pipe is not None:
594 warnings.warn("unclosed transport %r" % self, ResourceWarning,
595 source=self)
596 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100597
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 def abort(self):
599 self._close(None)
600
Victor Stinner0ee29c22014-02-19 01:40:41 +0100601 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200603 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200604 if self._loop.get_debug():
605 logger.debug("%r: %s", self, message, exc_info=True)
606 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500607 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100608 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500609 'exception': exc,
610 'transport': self,
611 'protocol': self._protocol,
612 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 self._close(exc)
614
615 def _close(self, exc=None):
616 self._closing = True
617 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400618 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400620 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 self._loop.call_soon(self._call_connection_lost, exc)
622
623 def _call_connection_lost(self, exc):
624 try:
625 self._protocol.connection_lost(exc)
626 finally:
627 self._pipe.close()
628 self._pipe = None
629 self._protocol = None
630 self._loop = None
631
632
Guido van Rossum59691282013-10-30 14:52:03 -0700633class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634
Guido van Rossum59691282013-10-30 14:52:03 -0700635 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700636 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700638 # Use a socket pair for stdin, since not all platforms
639 # support selecting read events on the write end of a
640 # socket (which we use in order to detect closing of the
641 # other end). Notably this is needed on AIX, and works
642 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100643 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644 self._proc = subprocess.Popen(
645 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
646 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700647 if stdin_w is not None:
648 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200649 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800650
651
652class AbstractChildWatcher:
653 """Abstract base class for monitoring child processes.
654
655 Objects derived from this class monitor a collection of subprocesses and
656 report their termination or interruption by a signal.
657
658 New callbacks are registered with .add_child_handler(). Starting a new
659 process must be done within a 'with' block to allow the watcher to suspend
660 its activity until the new process if fully registered (this is needed to
661 prevent a race condition in some implementations).
662
663 Example:
664 with watcher:
665 proc = subprocess.Popen("sleep 1")
666 watcher.add_child_handler(proc.pid, callback)
667
668 Notes:
669 Implementations of this class must be thread-safe.
670
671 Since child watcher objects may catch the SIGCHLD signal and call
672 waitpid(-1), there should be only one active object per process.
673 """
674
675 def add_child_handler(self, pid, callback, *args):
676 """Register a new child handler.
677
678 Arrange for callback(pid, returncode, *args) to be called when
679 process 'pid' terminates. Specifying another callback for the same
680 process replaces the previous handler.
681
Victor Stinneracdb7822014-07-14 18:33:40 +0200682 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800683 """
684 raise NotImplementedError()
685
686 def remove_child_handler(self, pid):
687 """Removes the handler for process 'pid'.
688
689 The function returns True if the handler was successfully removed,
690 False if there was nothing to remove."""
691
692 raise NotImplementedError()
693
Guido van Rossum2bcae702013-11-13 15:50:08 -0800694 def attach_loop(self, loop):
695 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800696
Guido van Rossum2bcae702013-11-13 15:50:08 -0800697 If the watcher was previously attached to an event loop, then it is
698 first detached before attaching to the new loop.
699
700 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701 """
702 raise NotImplementedError()
703
704 def close(self):
705 """Close the watcher.
706
707 This must be called to make sure that any underlying resource is freed.
708 """
709 raise NotImplementedError()
710
711 def __enter__(self):
712 """Enter the watcher's context and allow starting new processes
713
714 This function must return self"""
715 raise NotImplementedError()
716
717 def __exit__(self, a, b, c):
718 """Exit the watcher's context"""
719 raise NotImplementedError()
720
721
722class BaseChildWatcher(AbstractChildWatcher):
723
Guido van Rossum2bcae702013-11-13 15:50:08 -0800724 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400726 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800727
728 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800729 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730
731 def _do_waitpid(self, expected_pid):
732 raise NotImplementedError()
733
734 def _do_waitpid_all(self):
735 raise NotImplementedError()
736
Guido van Rossum2bcae702013-11-13 15:50:08 -0800737 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800738 assert loop is None or isinstance(loop, events.AbstractEventLoop)
739
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400740 if self._loop is not None and loop is None and self._callbacks:
741 warnings.warn(
742 'A loop is being detached '
743 'from a child watcher with pending handlers',
744 RuntimeWarning)
745
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800746 if self._loop is not None:
747 self._loop.remove_signal_handler(signal.SIGCHLD)
748
749 self._loop = loop
750 if loop is not None:
751 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
752
753 # Prevent a race condition in case a child terminated
754 # during the switch.
755 self._do_waitpid_all()
756
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800757 def _sig_chld(self):
758 try:
759 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500760 except Exception as exc:
761 # self._loop should always be available here
762 # as '_sig_chld' is added as a signal handler
763 # in 'attach_loop'
764 self._loop.call_exception_handler({
765 'message': 'Unknown exception in SIGCHLD handler',
766 'exception': exc,
767 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800768
769 def _compute_returncode(self, status):
770 if os.WIFSIGNALED(status):
771 # The child process died because of a signal.
772 return -os.WTERMSIG(status)
773 elif os.WIFEXITED(status):
774 # The child process exited (e.g sys.exit()).
775 return os.WEXITSTATUS(status)
776 else:
777 # The child exited, but we don't understand its status.
778 # This shouldn't happen, but if it does, let's just
779 # return that status; perhaps that helps debug it.
780 return status
781
782
783class SafeChildWatcher(BaseChildWatcher):
784 """'Safe' child watcher implementation.
785
786 This implementation avoids disrupting other code spawning processes by
787 polling explicitly each process in the SIGCHLD handler instead of calling
788 os.waitpid(-1).
789
790 This is a safe solution but it has a significant overhead when handling a
791 big number of children (O(n) each time SIGCHLD is raised)
792 """
793
Guido van Rossum2bcae702013-11-13 15:50:08 -0800794 def close(self):
795 self._callbacks.clear()
796 super().close()
797
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800798 def __enter__(self):
799 return self
800
801 def __exit__(self, a, b, c):
802 pass
803
804 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400805 if self._loop is None:
806 raise RuntimeError(
807 "Cannot add child handler, "
808 "the child watcher does not have a loop attached")
809
Victor Stinner47cd10d2015-01-30 00:05:19 +0100810 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800811
812 # Prevent a race condition in case the child is already terminated.
813 self._do_waitpid(pid)
814
Guido van Rossum2bcae702013-11-13 15:50:08 -0800815 def remove_child_handler(self, pid):
816 try:
817 del self._callbacks[pid]
818 return True
819 except KeyError:
820 return False
821
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800822 def _do_waitpid_all(self):
823
824 for pid in list(self._callbacks):
825 self._do_waitpid(pid)
826
827 def _do_waitpid(self, expected_pid):
828 assert expected_pid > 0
829
830 try:
831 pid, status = os.waitpid(expected_pid, os.WNOHANG)
832 except ChildProcessError:
833 # The child process is already reaped
834 # (may happen if waitpid() is called elsewhere).
835 pid = expected_pid
836 returncode = 255
837 logger.warning(
838 "Unknown child process pid %d, will report returncode 255",
839 pid)
840 else:
841 if pid == 0:
842 # The child process is still alive.
843 return
844
845 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200846 if self._loop.get_debug():
847 logger.debug('process %s exited with returncode %s',
848 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849
850 try:
851 callback, args = self._callbacks.pop(pid)
852 except KeyError: # pragma: no cover
853 # May happen if .remove_child_handler() is called
854 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200855 if self._loop.get_debug():
856 logger.warning("Child watcher got an unexpected pid: %r",
857 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800858 else:
859 callback(pid, returncode, *args)
860
861
862class FastChildWatcher(BaseChildWatcher):
863 """'Fast' child watcher implementation.
864
865 This implementation reaps every terminated processes by calling
866 os.waitpid(-1) directly, possibly breaking other code spawning processes
867 and waiting for their termination.
868
869 There is no noticeable overhead when handling a big number of children
870 (O(1) each time a child terminates).
871 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800872 def __init__(self):
873 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800874 self._lock = threading.Lock()
875 self._zombies = {}
876 self._forks = 0
877
878 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800879 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800880 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800881 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800882
883 def __enter__(self):
884 with self._lock:
885 self._forks += 1
886
887 return self
888
889 def __exit__(self, a, b, c):
890 with self._lock:
891 self._forks -= 1
892
893 if self._forks or not self._zombies:
894 return
895
896 collateral_victims = str(self._zombies)
897 self._zombies.clear()
898
899 logger.warning(
900 "Caught subprocesses termination from unknown pids: %s",
901 collateral_victims)
902
903 def add_child_handler(self, pid, callback, *args):
904 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400905
906 if self._loop is None:
907 raise RuntimeError(
908 "Cannot add child handler, "
909 "the child watcher does not have a loop attached")
910
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800911 with self._lock:
912 try:
913 returncode = self._zombies.pop(pid)
914 except KeyError:
915 # The child is running.
916 self._callbacks[pid] = callback, args
917 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800918
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800919 # The child is dead already. We can fire the callback.
920 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921
Guido van Rossum2bcae702013-11-13 15:50:08 -0800922 def remove_child_handler(self, pid):
923 try:
924 del self._callbacks[pid]
925 return True
926 except KeyError:
927 return False
928
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800929 def _do_waitpid_all(self):
930 # Because of signal coalescing, we must keep calling waitpid() as
931 # long as we're able to reap a child.
932 while True:
933 try:
934 pid, status = os.waitpid(-1, os.WNOHANG)
935 except ChildProcessError:
936 # No more child processes exist.
937 return
938 else:
939 if pid == 0:
940 # A child process is still alive.
941 return
942
943 returncode = self._compute_returncode(status)
944
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800945 with self._lock:
946 try:
947 callback, args = self._callbacks.pop(pid)
948 except KeyError:
949 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800950 if self._forks:
951 # It may not be registered yet.
952 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200953 if self._loop.get_debug():
954 logger.debug('unknown process %s exited '
955 'with returncode %s',
956 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800957 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800958 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200959 else:
960 if self._loop.get_debug():
961 logger.debug('process %s exited with returncode %s',
962 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800963
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800964 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800965 logger.warning(
966 "Caught subprocess termination from unknown pid: "
967 "%d -> %d", pid, returncode)
968 else:
969 callback(pid, returncode, *args)
970
971
972class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100973 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974 _loop_factory = _UnixSelectorEventLoop
975
976 def __init__(self):
977 super().__init__()
978 self._watcher = None
979
980 def _init_watcher(self):
981 with events._lock:
982 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800983 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800984 if isinstance(threading.current_thread(),
985 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800986 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800987
988 def set_event_loop(self, loop):
989 """Set the event loop.
990
991 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800992 .set_event_loop() from the main thread will call .attach_loop(loop) on
993 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800994 """
995
996 super().set_event_loop(loop)
997
Andrew Svetlovcc839202017-11-29 18:23:43 +0200998 if (self._watcher is not None and
999 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001000 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001001
1002 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001003 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001004
1005 If not yet set, a SafeChildWatcher object is automatically created.
1006 """
1007 if self._watcher is None:
1008 self._init_watcher()
1009
1010 return self._watcher
1011
1012 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001013 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001014
1015 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1016
1017 if self._watcher is not None:
1018 self._watcher.close()
1019
1020 self._watcher = watcher
1021
1022SelectorEventLoop = _UnixSelectorEventLoop
1023DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy