blob: 2ab6b154b15517af4e7e78af3ba2dddc1f355f89 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01005import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Yury Selivanovb057c522014-02-18 12:15:06 -050015from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070016from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070023from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
Yury Selivanov6370f342017-12-10 18:36:12 -050026__all__ = (
27 'SelectorEventLoop',
28 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30)
31
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Yury Selivanov6370f342017-12-10 18:36:12 -050070 if (coroutines.iscoroutine(callback) or
71 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010072 raise TypeError("coroutines cannot be used "
73 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010075 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 try:
77 # set_wakeup_fd() raises ValueError if this is not the
78 # main thread. By calling it early we ensure that an
79 # event loop running in another thread cannot add a signal
80 # handler.
81 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020082 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 raise RuntimeError(str(exc))
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._signal_handlers[sig] = handle
87
88 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020089 # Register a dummy signal handler to ask Python to write the signal
90 # number in the wakup file descriptor. _process_self_data() will
91 # read signal numbers from this file descriptor to handle signals.
92 signal.signal(sig, _sighandler_noop)
93
Charles-François Natali74e7cf32013-12-05 22:47:19 +010094 # Set SA_RESTART to limit EINTR occurrences.
95 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 except OSError as exc:
97 del self._signal_handlers[sig]
98 if not self._signal_handlers:
99 try:
100 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200101 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700102 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500105 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 else:
107 raise
108
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 """Internal helper that is the actual signal handler."""
111 handle = self._signal_handlers.get(sig)
112 if handle is None:
113 return # Assume it's some race condition.
114 if handle._cancelled:
115 self.remove_signal_handler(sig) # Remove it properly.
116 else:
117 self._add_callback_signalsafe(handle)
118
119 def remove_signal_handler(self, sig):
120 """Remove a handler for a signal. UNIX only.
121
122 Return True if a signal handler was removed, False if not.
123 """
124 self._check_signal(sig)
125 try:
126 del self._signal_handlers[sig]
127 except KeyError:
128 return False
129
130 if sig == signal.SIGINT:
131 handler = signal.default_int_handler
132 else:
133 handler = signal.SIG_DFL
134
135 try:
136 signal.signal(sig, handler)
137 except OSError as exc:
138 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500139 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140 else:
141 raise
142
143 if not self._signal_handlers:
144 try:
145 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200146 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700147 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 return True
150
151 def _check_signal(self, sig):
152 """Internal helper to validate a signal.
153
154 Raise ValueError if the signal number is invalid or uncatchable.
155 Raise RuntimeError if there is a problem setting up the handler.
156 """
157 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500158 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500161 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162
163 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
166
167 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
170
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200171 async def _make_subprocess_transport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
173 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800174 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400175 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 transp = _UnixSubprocessTransport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100178 waiter=waiter, extra=extra,
179 **kwargs)
180
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 watcher.add_child_handler(transp.get_pid(),
182 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200184 await waiter
185 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100186 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200187 await transp._wait()
188 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800189
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 return transp
191
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800192 def _child_watcher_callback(self, pid, returncode, transp):
193 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200195 async def create_unix_connection(self, protocol_factory, path=None, *,
196 ssl=None, sock=None,
197 server_hostname=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500198 assert server_hostname is None or isinstance(server_hostname, str)
199 if ssl:
200 if server_hostname is None:
201 raise ValueError(
202 'you have to pass server_hostname when using ssl')
203 else:
204 if server_hostname is not None:
205 raise ValueError('server_hostname is only meaningful with ssl')
206
207 if path is not None:
208 if sock is not None:
209 raise ValueError(
210 'path and sock can not be specified at the same time')
211
Andrew Svetlovcc839202017-11-29 18:23:43 +0200212 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100213 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500214 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500215 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200216 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100217 except:
218 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 raise
220
221 else:
222 if sock is None:
223 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400224 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500225 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400226 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500227 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200230 transport, protocol = await self._create_connection_transport(
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 sock, protocol_factory, ssl, server_hostname)
232 return transport, protocol
233
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200234 async def create_unix_server(self, protocol_factory, path=None, *,
235 sock=None, backlog=100, ssl=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 if isinstance(ssl, bool):
237 raise TypeError('ssl argument must be an SSLContext or None')
238
239 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200240 if sock is not None:
241 raise ValueError(
242 'path and sock can not be specified at the same time')
243
Andrew Svetlovcc839202017-11-29 18:23:43 +0200244 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
246
Yury Selivanov908d55d2016-10-09 12:15:08 -0400247 # Check for abstract socket. `str` and `bytes` paths are supported.
248 if path[0] not in (0, '\x00'):
249 try:
250 if stat.S_ISSOCK(os.stat(path).st_mode):
251 os.remove(path)
252 except FileNotFoundError:
253 pass
254 except OSError as err:
255 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200256 logger.error('Unable to check or remove stale UNIX socket '
257 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400258
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 try:
260 sock.bind(path)
261 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100262 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500263 if exc.errno == errno.EADDRINUSE:
264 # Let's improve the error message by adding
265 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500266 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500267 raise OSError(errno.EADDRINUSE, msg) from None
268 else:
269 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200270 except:
271 sock.close()
272 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500273 else:
274 if sock is None:
275 raise ValueError(
276 'path was not specified, and no sock specified')
277
Yury Selivanov36e7e972016-10-07 12:39:57 -0400278 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500279 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500280 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500281 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500282
283 server = base_events.Server(self, [sock])
284 sock.listen(backlog)
285 sock.setblocking(False)
286 self._start_serving(protocol_factory, sock, ssl, server)
287 return server
288
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290class _UnixReadPipeTransport(transports.ReadTransport):
291
Yury Selivanovdec1a452014-02-18 22:27:48 -0500292 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700293
294 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
295 super().__init__(extra)
296 self._extra['pipe'] = pipe
297 self._loop = loop
298 self._pipe = pipe
299 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700300 self._protocol = protocol
301 self._closing = False
302
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700303 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800304 if not (stat.S_ISFIFO(mode) or
305 stat.S_ISSOCK(mode) or
306 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700307 self._pipe = None
308 self._fileno = None
309 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700310 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700311
Andrew Svetlovcc839202017-11-29 18:23:43 +0200312 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700313
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100315 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400316 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100317 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100319 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500320 self._loop.call_soon(futures._set_result_unless_cancelled,
321 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
Victor Stinnere912e652014-07-12 03:11:53 +0200323 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100324 info = [self.__class__.__name__]
325 if self._pipe is None:
326 info.append('closed')
327 elif self._closing:
328 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500329 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400330 selector = getattr(self._loop, '_selector', None)
331 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200332 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500333 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200334 if polling:
335 info.append('polling')
336 else:
337 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400338 elif self._pipe is not None:
339 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200340 else:
341 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500342 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200343
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 def _read_ready(self):
345 try:
346 data = os.read(self._fileno, self.max_size)
347 except (BlockingIOError, InterruptedError):
348 pass
349 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100350 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 else:
352 if data:
353 self._protocol.data_received(data)
354 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200355 if self._loop.get_debug():
356 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400358 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 self._loop.call_soon(self._protocol.eof_received)
360 self._loop.call_soon(self._call_connection_lost, None)
361
Guido van Rossum57497ad2013-10-18 07:58:20 -0700362 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400363 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364
Guido van Rossum57497ad2013-10-18 07:58:20 -0700365 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400366 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400368 def set_protocol(self, protocol):
369 self._protocol = protocol
370
371 def get_protocol(self):
372 return self._protocol
373
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500374 def is_closing(self):
375 return self._closing
376
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 def close(self):
378 if not self._closing:
379 self._close(None)
380
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900381 def __del__(self):
382 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500383 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900384 source=self)
385 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100386
Victor Stinner0ee29c22014-02-19 01:40:41 +0100387 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200389 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
390 if self._loop.get_debug():
391 logger.debug("%r: %s", self, message, exc_info=True)
392 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500393 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100394 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500395 'exception': exc,
396 'transport': self,
397 'protocol': self._protocol,
398 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 self._close(exc)
400
401 def _close(self, exc):
402 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400403 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 self._loop.call_soon(self._call_connection_lost, exc)
405
406 def _call_connection_lost(self, exc):
407 try:
408 self._protocol.connection_lost(exc)
409 finally:
410 self._pipe.close()
411 self._pipe = None
412 self._protocol = None
413 self._loop = None
414
415
Yury Selivanov3cb99142014-02-18 18:41:13 -0500416class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800417 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
419 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100420 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 self._pipe = pipe
423 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400425 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 self._conn_lost = 0
427 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700428
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700429 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700430 is_char = stat.S_ISCHR(mode)
431 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700432 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700433 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700434 self._pipe = None
435 self._fileno = None
436 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100437 raise ValueError("Pipe transport is only for "
438 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700439
Andrew Svetlovcc839202017-11-29 18:23:43 +0200440 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100442
443 # On AIX, the reader trick (to be notified when the read end of the
444 # socket is closed) only works for sockets. On other platforms it
445 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700446 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100447 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400448 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100449 self._fileno, self._read_ready)
450
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100452 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500453 self._loop.call_soon(futures._set_result_unless_cancelled,
454 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
Victor Stinnere912e652014-07-12 03:11:53 +0200456 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100457 info = [self.__class__.__name__]
458 if self._pipe is None:
459 info.append('closed')
460 elif self._closing:
461 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500462 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400463 selector = getattr(self._loop, '_selector', None)
464 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200465 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200467 if polling:
468 info.append('polling')
469 else:
470 info.append('idle')
471
472 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500473 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400474 elif self._pipe is not None:
475 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200476 else:
477 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500478 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200479
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800480 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400481 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800482
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700484 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200485 if self._loop.get_debug():
486 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100487 if self._buffer:
488 self._close(BrokenPipeError())
489 else:
490 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491
492 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800493 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
494 if isinstance(data, bytearray):
495 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 if not data:
497 return
498
499 if self._conn_lost or self._closing:
500 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700501 logger.warning('pipe closed by peer or '
502 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 self._conn_lost += 1
504 return
505
506 if not self._buffer:
507 # Attempt to send it right away first.
508 try:
509 n = os.write(self._fileno, data)
510 except (BlockingIOError, InterruptedError):
511 n = 0
512 except Exception as exc:
513 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100514 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 return
516 if n == len(data):
517 return
518 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400519 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400520 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400522 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800523 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524
525 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400526 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400529 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400531 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400533 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 self._conn_lost += 1
535 # Remove writer here, _fatal_error() doesn't it
536 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400537 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100538 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400540 if n == len(self._buffer):
541 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400542 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800543 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400544 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400545 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 self._call_connection_lost(None)
547 return
548 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400549 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def can_write_eof(self):
552 return True
553
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 def write_eof(self):
555 if self._closing:
556 return
557 assert self._pipe
558 self._closing = True
559 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400560 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 self._loop.call_soon(self._call_connection_lost, None)
562
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400563 def set_protocol(self, protocol):
564 self._protocol = protocol
565
566 def get_protocol(self):
567 return self._protocol
568
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500569 def is_closing(self):
570 return self._closing
571
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100573 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 # write_eof is all what we needed to close the write pipe
575 self.write_eof()
576
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900577 def __del__(self):
578 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500579 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900580 source=self)
581 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100582
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 def abort(self):
584 self._close(None)
585
Victor Stinner0ee29c22014-02-19 01:40:41 +0100586 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200588 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200589 if self._loop.get_debug():
590 logger.debug("%r: %s", self, message, exc_info=True)
591 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500592 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100593 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500594 'exception': exc,
595 'transport': self,
596 'protocol': self._protocol,
597 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 self._close(exc)
599
600 def _close(self, exc=None):
601 self._closing = True
602 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400603 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400605 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 self._loop.call_soon(self._call_connection_lost, exc)
607
608 def _call_connection_lost(self, exc):
609 try:
610 self._protocol.connection_lost(exc)
611 finally:
612 self._pipe.close()
613 self._pipe = None
614 self._protocol = None
615 self._loop = None
616
617
Guido van Rossum59691282013-10-30 14:52:03 -0700618class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619
Guido van Rossum59691282013-10-30 14:52:03 -0700620 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700621 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700623 # Use a socket pair for stdin, since not all platforms
624 # support selecting read events on the write end of a
625 # socket (which we use in order to detect closing of the
626 # other end). Notably this is needed on AIX, and works
627 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100628 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 self._proc = subprocess.Popen(
630 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
631 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700632 if stdin_w is not None:
633 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200634 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800635
636
637class AbstractChildWatcher:
638 """Abstract base class for monitoring child processes.
639
640 Objects derived from this class monitor a collection of subprocesses and
641 report their termination or interruption by a signal.
642
643 New callbacks are registered with .add_child_handler(). Starting a new
644 process must be done within a 'with' block to allow the watcher to suspend
645 its activity until the new process if fully registered (this is needed to
646 prevent a race condition in some implementations).
647
648 Example:
649 with watcher:
650 proc = subprocess.Popen("sleep 1")
651 watcher.add_child_handler(proc.pid, callback)
652
653 Notes:
654 Implementations of this class must be thread-safe.
655
656 Since child watcher objects may catch the SIGCHLD signal and call
657 waitpid(-1), there should be only one active object per process.
658 """
659
660 def add_child_handler(self, pid, callback, *args):
661 """Register a new child handler.
662
663 Arrange for callback(pid, returncode, *args) to be called when
664 process 'pid' terminates. Specifying another callback for the same
665 process replaces the previous handler.
666
Victor Stinneracdb7822014-07-14 18:33:40 +0200667 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800668 """
669 raise NotImplementedError()
670
671 def remove_child_handler(self, pid):
672 """Removes the handler for process 'pid'.
673
674 The function returns True if the handler was successfully removed,
675 False if there was nothing to remove."""
676
677 raise NotImplementedError()
678
Guido van Rossum2bcae702013-11-13 15:50:08 -0800679 def attach_loop(self, loop):
680 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800681
Guido van Rossum2bcae702013-11-13 15:50:08 -0800682 If the watcher was previously attached to an event loop, then it is
683 first detached before attaching to the new loop.
684
685 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800686 """
687 raise NotImplementedError()
688
689 def close(self):
690 """Close the watcher.
691
692 This must be called to make sure that any underlying resource is freed.
693 """
694 raise NotImplementedError()
695
696 def __enter__(self):
697 """Enter the watcher's context and allow starting new processes
698
699 This function must return self"""
700 raise NotImplementedError()
701
702 def __exit__(self, a, b, c):
703 """Exit the watcher's context"""
704 raise NotImplementedError()
705
706
707class BaseChildWatcher(AbstractChildWatcher):
708
Guido van Rossum2bcae702013-11-13 15:50:08 -0800709 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400711 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800712
713 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800714 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800715
716 def _do_waitpid(self, expected_pid):
717 raise NotImplementedError()
718
719 def _do_waitpid_all(self):
720 raise NotImplementedError()
721
Guido van Rossum2bcae702013-11-13 15:50:08 -0800722 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723 assert loop is None or isinstance(loop, events.AbstractEventLoop)
724
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400725 if self._loop is not None and loop is None and self._callbacks:
726 warnings.warn(
727 'A loop is being detached '
728 'from a child watcher with pending handlers',
729 RuntimeWarning)
730
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800731 if self._loop is not None:
732 self._loop.remove_signal_handler(signal.SIGCHLD)
733
734 self._loop = loop
735 if loop is not None:
736 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
737
738 # Prevent a race condition in case a child terminated
739 # during the switch.
740 self._do_waitpid_all()
741
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800742 def _sig_chld(self):
743 try:
744 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500745 except Exception as exc:
746 # self._loop should always be available here
747 # as '_sig_chld' is added as a signal handler
748 # in 'attach_loop'
749 self._loop.call_exception_handler({
750 'message': 'Unknown exception in SIGCHLD handler',
751 'exception': exc,
752 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753
754 def _compute_returncode(self, status):
755 if os.WIFSIGNALED(status):
756 # The child process died because of a signal.
757 return -os.WTERMSIG(status)
758 elif os.WIFEXITED(status):
759 # The child process exited (e.g sys.exit()).
760 return os.WEXITSTATUS(status)
761 else:
762 # The child exited, but we don't understand its status.
763 # This shouldn't happen, but if it does, let's just
764 # return that status; perhaps that helps debug it.
765 return status
766
767
768class SafeChildWatcher(BaseChildWatcher):
769 """'Safe' child watcher implementation.
770
771 This implementation avoids disrupting other code spawning processes by
772 polling explicitly each process in the SIGCHLD handler instead of calling
773 os.waitpid(-1).
774
775 This is a safe solution but it has a significant overhead when handling a
776 big number of children (O(n) each time SIGCHLD is raised)
777 """
778
Guido van Rossum2bcae702013-11-13 15:50:08 -0800779 def close(self):
780 self._callbacks.clear()
781 super().close()
782
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800783 def __enter__(self):
784 return self
785
786 def __exit__(self, a, b, c):
787 pass
788
789 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400790 if self._loop is None:
791 raise RuntimeError(
792 "Cannot add child handler, "
793 "the child watcher does not have a loop attached")
794
Victor Stinner47cd10d2015-01-30 00:05:19 +0100795 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800796
797 # Prevent a race condition in case the child is already terminated.
798 self._do_waitpid(pid)
799
Guido van Rossum2bcae702013-11-13 15:50:08 -0800800 def remove_child_handler(self, pid):
801 try:
802 del self._callbacks[pid]
803 return True
804 except KeyError:
805 return False
806
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800807 def _do_waitpid_all(self):
808
809 for pid in list(self._callbacks):
810 self._do_waitpid(pid)
811
812 def _do_waitpid(self, expected_pid):
813 assert expected_pid > 0
814
815 try:
816 pid, status = os.waitpid(expected_pid, os.WNOHANG)
817 except ChildProcessError:
818 # The child process is already reaped
819 # (may happen if waitpid() is called elsewhere).
820 pid = expected_pid
821 returncode = 255
822 logger.warning(
823 "Unknown child process pid %d, will report returncode 255",
824 pid)
825 else:
826 if pid == 0:
827 # The child process is still alive.
828 return
829
830 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200831 if self._loop.get_debug():
832 logger.debug('process %s exited with returncode %s',
833 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800834
835 try:
836 callback, args = self._callbacks.pop(pid)
837 except KeyError: # pragma: no cover
838 # May happen if .remove_child_handler() is called
839 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200840 if self._loop.get_debug():
841 logger.warning("Child watcher got an unexpected pid: %r",
842 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800843 else:
844 callback(pid, returncode, *args)
845
846
847class FastChildWatcher(BaseChildWatcher):
848 """'Fast' child watcher implementation.
849
850 This implementation reaps every terminated processes by calling
851 os.waitpid(-1) directly, possibly breaking other code spawning processes
852 and waiting for their termination.
853
854 There is no noticeable overhead when handling a big number of children
855 (O(1) each time a child terminates).
856 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800857 def __init__(self):
858 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859 self._lock = threading.Lock()
860 self._zombies = {}
861 self._forks = 0
862
863 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800864 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800866 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800867
868 def __enter__(self):
869 with self._lock:
870 self._forks += 1
871
872 return self
873
874 def __exit__(self, a, b, c):
875 with self._lock:
876 self._forks -= 1
877
878 if self._forks or not self._zombies:
879 return
880
881 collateral_victims = str(self._zombies)
882 self._zombies.clear()
883
884 logger.warning(
885 "Caught subprocesses termination from unknown pids: %s",
886 collateral_victims)
887
888 def add_child_handler(self, pid, callback, *args):
889 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400890
891 if self._loop is None:
892 raise RuntimeError(
893 "Cannot add child handler, "
894 "the child watcher does not have a loop attached")
895
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800896 with self._lock:
897 try:
898 returncode = self._zombies.pop(pid)
899 except KeyError:
900 # The child is running.
901 self._callbacks[pid] = callback, args
902 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800904 # The child is dead already. We can fire the callback.
905 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800906
Guido van Rossum2bcae702013-11-13 15:50:08 -0800907 def remove_child_handler(self, pid):
908 try:
909 del self._callbacks[pid]
910 return True
911 except KeyError:
912 return False
913
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800914 def _do_waitpid_all(self):
915 # Because of signal coalescing, we must keep calling waitpid() as
916 # long as we're able to reap a child.
917 while True:
918 try:
919 pid, status = os.waitpid(-1, os.WNOHANG)
920 except ChildProcessError:
921 # No more child processes exist.
922 return
923 else:
924 if pid == 0:
925 # A child process is still alive.
926 return
927
928 returncode = self._compute_returncode(status)
929
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800930 with self._lock:
931 try:
932 callback, args = self._callbacks.pop(pid)
933 except KeyError:
934 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800935 if self._forks:
936 # It may not be registered yet.
937 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200938 if self._loop.get_debug():
939 logger.debug('unknown process %s exited '
940 'with returncode %s',
941 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800942 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800943 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200944 else:
945 if self._loop.get_debug():
946 logger.debug('process %s exited with returncode %s',
947 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800948
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800949 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800950 logger.warning(
951 "Caught subprocess termination from unknown pid: "
952 "%d -> %d", pid, returncode)
953 else:
954 callback(pid, returncode, *args)
955
956
957class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100958 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800959 _loop_factory = _UnixSelectorEventLoop
960
961 def __init__(self):
962 super().__init__()
963 self._watcher = None
964
965 def _init_watcher(self):
966 with events._lock:
967 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800968 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969 if isinstance(threading.current_thread(),
970 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800971 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972
973 def set_event_loop(self, loop):
974 """Set the event loop.
975
976 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800977 .set_event_loop() from the main thread will call .attach_loop(loop) on
978 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800979 """
980
981 super().set_event_loop(loop)
982
Andrew Svetlovcc839202017-11-29 18:23:43 +0200983 if (self._watcher is not None and
984 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800985 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800986
987 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200988 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800989
990 If not yet set, a SafeChildWatcher object is automatically created.
991 """
992 if self._watcher is None:
993 self._init_watcher()
994
995 return self._watcher
996
997 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200998 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800999
1000 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1001
1002 if self._watcher is not None:
1003 self._watcher.close()
1004
1005 self._watcher = watcher
1006
Yury Selivanov6370f342017-12-10 18:36:12 -05001007
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008SelectorEventLoop = _UnixSelectorEventLoop
1009DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy