blob: 4f6beb43650965b9634f9b4a412793329d3eb39f [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01005import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Yury Selivanovb057c522014-02-18 12:15:06 -050015from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070016from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070023from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
Yury Selivanov6370f342017-12-10 18:36:12 -050026__all__ = (
27 'SelectorEventLoop',
28 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30)
31
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020054 if not sys.is_finalizing():
55 for sig in list(self._signal_handlers):
56 self.remove_signal_handler(sig)
57 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020058 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020059 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020060 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020061 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020062 ResourceWarning,
63 source=self)
64 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080065
Victor Stinnerfe5649c2014-07-17 22:43:40 +020066 def _process_self_data(self, data):
67 for signum in data:
68 if not signum:
69 # ignore null bytes written by _write_to_self()
70 continue
71 self._handle_signal(signum)
72
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 def add_signal_handler(self, sig, callback, *args):
74 """Add a handler for a signal. UNIX only.
75
76 Raise ValueError if the signal number is invalid or uncatchable.
77 Raise RuntimeError if there is a problem setting up the handler.
78 """
Yury Selivanov6370f342017-12-10 18:36:12 -050079 if (coroutines.iscoroutine(callback) or
80 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010081 raise TypeError("coroutines cannot be used "
82 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010084 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 try:
86 # set_wakeup_fd() raises ValueError if this is not the
87 # main thread. By calling it early we ensure that an
88 # event loop running in another thread cannot add a signal
89 # handler.
90 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020091 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 raise RuntimeError(str(exc))
93
Yury Selivanov569efa22014-02-18 18:02:19 -050094 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 self._signal_handlers[sig] = handle
96
97 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020098 # Register a dummy signal handler to ask Python to write the signal
99 # number in the wakup file descriptor. _process_self_data() will
100 # read signal numbers from this file descriptor to handle signals.
101 signal.signal(sig, _sighandler_noop)
102
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100103 # Set SA_RESTART to limit EINTR occurrences.
104 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 except OSError as exc:
106 del self._signal_handlers[sig]
107 if not self._signal_handlers:
108 try:
109 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200110 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700111 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
113 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500114 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 else:
116 raise
117
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200118 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 """Internal helper that is the actual signal handler."""
120 handle = self._signal_handlers.get(sig)
121 if handle is None:
122 return # Assume it's some race condition.
123 if handle._cancelled:
124 self.remove_signal_handler(sig) # Remove it properly.
125 else:
126 self._add_callback_signalsafe(handle)
127
128 def remove_signal_handler(self, sig):
129 """Remove a handler for a signal. UNIX only.
130
131 Return True if a signal handler was removed, False if not.
132 """
133 self._check_signal(sig)
134 try:
135 del self._signal_handlers[sig]
136 except KeyError:
137 return False
138
139 if sig == signal.SIGINT:
140 handler = signal.default_int_handler
141 else:
142 handler = signal.SIG_DFL
143
144 try:
145 signal.signal(sig, handler)
146 except OSError as exc:
147 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500148 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 else:
150 raise
151
152 if not self._signal_handlers:
153 try:
154 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200155 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700156 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 return True
159
160 def _check_signal(self, sig):
161 """Internal helper to validate a signal.
162
163 Raise ValueError if the signal number is invalid or uncatchable.
164 Raise RuntimeError if there is a problem setting up the handler.
165 """
166 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500167 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500170 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
172 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
173 extra=None):
174 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
175
176 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
177 extra=None):
178 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
179
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200180 async def _make_subprocess_transport(self, protocol, args, shell,
181 stdin, stdout, stderr, bufsize,
182 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800183 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400184 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 transp = _UnixSubprocessTransport(self, protocol, args, shell,
186 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100187 waiter=waiter, extra=extra,
188 **kwargs)
189
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800190 watcher.add_child_handler(transp.get_pid(),
191 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100192 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200193 await waiter
194 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 await transp._wait()
197 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800198
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700199 return transp
200
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800201 def _child_watcher_callback(self, pid, returncode, transp):
202 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203
Neil Aspinallf7686c12017-12-19 19:45:42 +0000204 async def create_unix_connection(
205 self, protocol_factory, path=None, *,
206 ssl=None, sock=None,
207 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200208 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 assert server_hostname is None or isinstance(server_hostname, str)
210 if ssl:
211 if server_hostname is None:
212 raise ValueError(
213 'you have to pass server_hostname when using ssl')
214 else:
215 if server_hostname is not None:
216 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200217 if ssl_handshake_timeout is not None:
218 raise ValueError(
219 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Andrew Svetlovcc839202017-11-29 18:23:43 +0200226 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100227 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200230 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100231 except:
232 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500233 raise
234
235 else:
236 if sock is None:
237 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400238 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500239 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400240 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500241 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500242 sock.setblocking(False)
243
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200244 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000245 sock, protocol_factory, ssl, server_hostname,
246 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500247 return transport, protocol
248
Neil Aspinallf7686c12017-12-19 19:45:42 +0000249 async def create_unix_server(
250 self, protocol_factory, path=None, *,
251 sock=None, backlog=100, ssl=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200252 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500253 if isinstance(ssl, bool):
254 raise TypeError('ssl argument must be an SSLContext or None')
255
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200256 if ssl_handshake_timeout is not None and not ssl:
257 raise ValueError(
258 'ssl_handshake_timeout is only meaningful with ssl')
259
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200261 if sock is not None:
262 raise ValueError(
263 'path and sock can not be specified at the same time')
264
Andrew Svetlovcc839202017-11-29 18:23:43 +0200265 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500266 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
267
Yury Selivanov908d55d2016-10-09 12:15:08 -0400268 # Check for abstract socket. `str` and `bytes` paths are supported.
269 if path[0] not in (0, '\x00'):
270 try:
271 if stat.S_ISSOCK(os.stat(path).st_mode):
272 os.remove(path)
273 except FileNotFoundError:
274 pass
275 except OSError as err:
276 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200277 logger.error('Unable to check or remove stale UNIX socket '
278 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400279
Yury Selivanovb057c522014-02-18 12:15:06 -0500280 try:
281 sock.bind(path)
282 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100283 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 if exc.errno == errno.EADDRINUSE:
285 # Let's improve the error message by adding
286 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500287 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500288 raise OSError(errno.EADDRINUSE, msg) from None
289 else:
290 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200291 except:
292 sock.close()
293 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500294 else:
295 if sock is None:
296 raise ValueError(
297 'path was not specified, and no sock specified')
298
Yury Selivanov36e7e972016-10-07 12:39:57 -0400299 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500300 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500301 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500302 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500303
304 server = base_events.Server(self, [sock])
305 sock.listen(backlog)
306 sock.setblocking(False)
Neil Aspinallf7686c12017-12-19 19:45:42 +0000307 self._start_serving(protocol_factory, sock, ssl, server,
308 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500309 return server
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312class _UnixReadPipeTransport(transports.ReadTransport):
313
Yury Selivanovdec1a452014-02-18 22:27:48 -0500314 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315
316 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
317 super().__init__(extra)
318 self._extra['pipe'] = pipe
319 self._loop = loop
320 self._pipe = pipe
321 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700322 self._protocol = protocol
323 self._closing = False
324
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700325 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800326 if not (stat.S_ISFIFO(mode) or
327 stat.S_ISSOCK(mode) or
328 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700329 self._pipe = None
330 self._fileno = None
331 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700332 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700333
Andrew Svetlovcc839202017-11-29 18:23:43 +0200334 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100337 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400338 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100339 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100341 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500342 self._loop.call_soon(futures._set_result_unless_cancelled,
343 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344
Victor Stinnere912e652014-07-12 03:11:53 +0200345 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100346 info = [self.__class__.__name__]
347 if self._pipe is None:
348 info.append('closed')
349 elif self._closing:
350 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500351 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400352 selector = getattr(self._loop, '_selector', None)
353 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200354 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500355 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200356 if polling:
357 info.append('polling')
358 else:
359 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400360 elif self._pipe is not None:
361 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200362 else:
363 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500364 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200365
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 def _read_ready(self):
367 try:
368 data = os.read(self._fileno, self.max_size)
369 except (BlockingIOError, InterruptedError):
370 pass
371 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100372 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 else:
374 if data:
375 self._protocol.data_received(data)
376 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200377 if self._loop.get_debug():
378 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400380 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 self._loop.call_soon(self._protocol.eof_received)
382 self._loop.call_soon(self._call_connection_lost, None)
383
Guido van Rossum57497ad2013-10-18 07:58:20 -0700384 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400385 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386
Guido van Rossum57497ad2013-10-18 07:58:20 -0700387 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400388 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400390 def set_protocol(self, protocol):
391 self._protocol = protocol
392
393 def get_protocol(self):
394 return self._protocol
395
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500396 def is_closing(self):
397 return self._closing
398
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 def close(self):
400 if not self._closing:
401 self._close(None)
402
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900403 def __del__(self):
404 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500405 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900406 source=self)
407 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100408
Victor Stinner0ee29c22014-02-19 01:40:41 +0100409 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200411 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
412 if self._loop.get_debug():
413 logger.debug("%r: %s", self, message, exc_info=True)
414 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500415 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100416 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500417 'exception': exc,
418 'transport': self,
419 'protocol': self._protocol,
420 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._close(exc)
422
423 def _close(self, exc):
424 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400425 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 self._loop.call_soon(self._call_connection_lost, exc)
427
428 def _call_connection_lost(self, exc):
429 try:
430 self._protocol.connection_lost(exc)
431 finally:
432 self._pipe.close()
433 self._pipe = None
434 self._protocol = None
435 self._loop = None
436
437
Yury Selivanov3cb99142014-02-18 18:41:13 -0500438class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800439 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100442 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 self._pipe = pipe
445 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400447 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 self._conn_lost = 0
449 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700450
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700451 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700452 is_char = stat.S_ISCHR(mode)
453 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700454 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700455 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700456 self._pipe = None
457 self._fileno = None
458 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100459 raise ValueError("Pipe transport is only for "
460 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700461
Andrew Svetlovcc839202017-11-29 18:23:43 +0200462 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100464
465 # On AIX, the reader trick (to be notified when the read end of the
466 # socket is closed) only works for sockets. On other platforms it
467 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700468 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100469 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400470 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100471 self._fileno, self._read_ready)
472
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100474 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500475 self._loop.call_soon(futures._set_result_unless_cancelled,
476 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477
Victor Stinnere912e652014-07-12 03:11:53 +0200478 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100479 info = [self.__class__.__name__]
480 if self._pipe is None:
481 info.append('closed')
482 elif self._closing:
483 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500484 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400485 selector = getattr(self._loop, '_selector', None)
486 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200487 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500488 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200489 if polling:
490 info.append('polling')
491 else:
492 info.append('idle')
493
494 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500495 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400496 elif self._pipe is not None:
497 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200498 else:
499 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500500 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200501
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800502 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400503 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800504
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700506 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200507 if self._loop.get_debug():
508 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100509 if self._buffer:
510 self._close(BrokenPipeError())
511 else:
512 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513
514 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800515 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
516 if isinstance(data, bytearray):
517 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 if not data:
519 return
520
521 if self._conn_lost or self._closing:
522 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700523 logger.warning('pipe closed by peer or '
524 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._conn_lost += 1
526 return
527
528 if not self._buffer:
529 # Attempt to send it right away first.
530 try:
531 n = os.write(self._fileno, data)
532 except (BlockingIOError, InterruptedError):
533 n = 0
534 except Exception as exc:
535 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100536 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 return
538 if n == len(data):
539 return
540 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400541 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400542 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400544 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800545 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
547 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400548 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400551 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400553 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400555 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._conn_lost += 1
557 # Remove writer here, _fatal_error() doesn't it
558 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400559 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100560 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 if n == len(self._buffer):
563 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400564 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800565 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400566 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400567 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 self._call_connection_lost(None)
569 return
570 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400571 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572
573 def can_write_eof(self):
574 return True
575
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 def write_eof(self):
577 if self._closing:
578 return
579 assert self._pipe
580 self._closing = True
581 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400582 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 self._loop.call_soon(self._call_connection_lost, None)
584
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400585 def set_protocol(self, protocol):
586 self._protocol = protocol
587
588 def get_protocol(self):
589 return self._protocol
590
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500591 def is_closing(self):
592 return self._closing
593
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100595 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 # write_eof is all what we needed to close the write pipe
597 self.write_eof()
598
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900599 def __del__(self):
600 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500601 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900602 source=self)
603 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100604
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 def abort(self):
606 self._close(None)
607
Victor Stinner0ee29c22014-02-19 01:40:41 +0100608 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200610 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200611 if self._loop.get_debug():
612 logger.debug("%r: %s", self, message, exc_info=True)
613 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500614 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100615 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500616 'exception': exc,
617 'transport': self,
618 'protocol': self._protocol,
619 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._close(exc)
621
622 def _close(self, exc=None):
623 self._closing = True
624 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400625 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400627 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 self._loop.call_soon(self._call_connection_lost, exc)
629
630 def _call_connection_lost(self, exc):
631 try:
632 self._protocol.connection_lost(exc)
633 finally:
634 self._pipe.close()
635 self._pipe = None
636 self._protocol = None
637 self._loop = None
638
639
Guido van Rossum59691282013-10-30 14:52:03 -0700640class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641
Guido van Rossum59691282013-10-30 14:52:03 -0700642 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700643 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700645 # Use a socket pair for stdin, since not all platforms
646 # support selecting read events on the write end of a
647 # socket (which we use in order to detect closing of the
648 # other end). Notably this is needed on AIX, and works
649 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100650 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 self._proc = subprocess.Popen(
652 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
653 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700654 if stdin_w is not None:
655 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200656 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800657
658
659class AbstractChildWatcher:
660 """Abstract base class for monitoring child processes.
661
662 Objects derived from this class monitor a collection of subprocesses and
663 report their termination or interruption by a signal.
664
665 New callbacks are registered with .add_child_handler(). Starting a new
666 process must be done within a 'with' block to allow the watcher to suspend
667 its activity until the new process if fully registered (this is needed to
668 prevent a race condition in some implementations).
669
670 Example:
671 with watcher:
672 proc = subprocess.Popen("sleep 1")
673 watcher.add_child_handler(proc.pid, callback)
674
675 Notes:
676 Implementations of this class must be thread-safe.
677
678 Since child watcher objects may catch the SIGCHLD signal and call
679 waitpid(-1), there should be only one active object per process.
680 """
681
682 def add_child_handler(self, pid, callback, *args):
683 """Register a new child handler.
684
685 Arrange for callback(pid, returncode, *args) to be called when
686 process 'pid' terminates. Specifying another callback for the same
687 process replaces the previous handler.
688
Victor Stinneracdb7822014-07-14 18:33:40 +0200689 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800690 """
691 raise NotImplementedError()
692
693 def remove_child_handler(self, pid):
694 """Removes the handler for process 'pid'.
695
696 The function returns True if the handler was successfully removed,
697 False if there was nothing to remove."""
698
699 raise NotImplementedError()
700
Guido van Rossum2bcae702013-11-13 15:50:08 -0800701 def attach_loop(self, loop):
702 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800703
Guido van Rossum2bcae702013-11-13 15:50:08 -0800704 If the watcher was previously attached to an event loop, then it is
705 first detached before attaching to the new loop.
706
707 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800708 """
709 raise NotImplementedError()
710
711 def close(self):
712 """Close the watcher.
713
714 This must be called to make sure that any underlying resource is freed.
715 """
716 raise NotImplementedError()
717
718 def __enter__(self):
719 """Enter the watcher's context and allow starting new processes
720
721 This function must return self"""
722 raise NotImplementedError()
723
724 def __exit__(self, a, b, c):
725 """Exit the watcher's context"""
726 raise NotImplementedError()
727
728
729class BaseChildWatcher(AbstractChildWatcher):
730
Guido van Rossum2bcae702013-11-13 15:50:08 -0800731 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800732 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400733 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800734
735 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800736 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800737
738 def _do_waitpid(self, expected_pid):
739 raise NotImplementedError()
740
741 def _do_waitpid_all(self):
742 raise NotImplementedError()
743
Guido van Rossum2bcae702013-11-13 15:50:08 -0800744 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800745 assert loop is None or isinstance(loop, events.AbstractEventLoop)
746
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400747 if self._loop is not None and loop is None and self._callbacks:
748 warnings.warn(
749 'A loop is being detached '
750 'from a child watcher with pending handlers',
751 RuntimeWarning)
752
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753 if self._loop is not None:
754 self._loop.remove_signal_handler(signal.SIGCHLD)
755
756 self._loop = loop
757 if loop is not None:
758 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
759
760 # Prevent a race condition in case a child terminated
761 # during the switch.
762 self._do_waitpid_all()
763
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800764 def _sig_chld(self):
765 try:
766 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500767 except Exception as exc:
768 # self._loop should always be available here
769 # as '_sig_chld' is added as a signal handler
770 # in 'attach_loop'
771 self._loop.call_exception_handler({
772 'message': 'Unknown exception in SIGCHLD handler',
773 'exception': exc,
774 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800775
776 def _compute_returncode(self, status):
777 if os.WIFSIGNALED(status):
778 # The child process died because of a signal.
779 return -os.WTERMSIG(status)
780 elif os.WIFEXITED(status):
781 # The child process exited (e.g sys.exit()).
782 return os.WEXITSTATUS(status)
783 else:
784 # The child exited, but we don't understand its status.
785 # This shouldn't happen, but if it does, let's just
786 # return that status; perhaps that helps debug it.
787 return status
788
789
790class SafeChildWatcher(BaseChildWatcher):
791 """'Safe' child watcher implementation.
792
793 This implementation avoids disrupting other code spawning processes by
794 polling explicitly each process in the SIGCHLD handler instead of calling
795 os.waitpid(-1).
796
797 This is a safe solution but it has a significant overhead when handling a
798 big number of children (O(n) each time SIGCHLD is raised)
799 """
800
Guido van Rossum2bcae702013-11-13 15:50:08 -0800801 def close(self):
802 self._callbacks.clear()
803 super().close()
804
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800805 def __enter__(self):
806 return self
807
808 def __exit__(self, a, b, c):
809 pass
810
811 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400812 if self._loop is None:
813 raise RuntimeError(
814 "Cannot add child handler, "
815 "the child watcher does not have a loop attached")
816
Victor Stinner47cd10d2015-01-30 00:05:19 +0100817 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800818
819 # Prevent a race condition in case the child is already terminated.
820 self._do_waitpid(pid)
821
Guido van Rossum2bcae702013-11-13 15:50:08 -0800822 def remove_child_handler(self, pid):
823 try:
824 del self._callbacks[pid]
825 return True
826 except KeyError:
827 return False
828
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800829 def _do_waitpid_all(self):
830
831 for pid in list(self._callbacks):
832 self._do_waitpid(pid)
833
834 def _do_waitpid(self, expected_pid):
835 assert expected_pid > 0
836
837 try:
838 pid, status = os.waitpid(expected_pid, os.WNOHANG)
839 except ChildProcessError:
840 # The child process is already reaped
841 # (may happen if waitpid() is called elsewhere).
842 pid = expected_pid
843 returncode = 255
844 logger.warning(
845 "Unknown child process pid %d, will report returncode 255",
846 pid)
847 else:
848 if pid == 0:
849 # The child process is still alive.
850 return
851
852 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200853 if self._loop.get_debug():
854 logger.debug('process %s exited with returncode %s',
855 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856
857 try:
858 callback, args = self._callbacks.pop(pid)
859 except KeyError: # pragma: no cover
860 # May happen if .remove_child_handler() is called
861 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200862 if self._loop.get_debug():
863 logger.warning("Child watcher got an unexpected pid: %r",
864 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 else:
866 callback(pid, returncode, *args)
867
868
869class FastChildWatcher(BaseChildWatcher):
870 """'Fast' child watcher implementation.
871
872 This implementation reaps every terminated processes by calling
873 os.waitpid(-1) directly, possibly breaking other code spawning processes
874 and waiting for their termination.
875
876 There is no noticeable overhead when handling a big number of children
877 (O(1) each time a child terminates).
878 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800879 def __init__(self):
880 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800881 self._lock = threading.Lock()
882 self._zombies = {}
883 self._forks = 0
884
885 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800886 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800887 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800888 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800889
890 def __enter__(self):
891 with self._lock:
892 self._forks += 1
893
894 return self
895
896 def __exit__(self, a, b, c):
897 with self._lock:
898 self._forks -= 1
899
900 if self._forks or not self._zombies:
901 return
902
903 collateral_victims = str(self._zombies)
904 self._zombies.clear()
905
906 logger.warning(
907 "Caught subprocesses termination from unknown pids: %s",
908 collateral_victims)
909
910 def add_child_handler(self, pid, callback, *args):
911 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400912
913 if self._loop is None:
914 raise RuntimeError(
915 "Cannot add child handler, "
916 "the child watcher does not have a loop attached")
917
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800918 with self._lock:
919 try:
920 returncode = self._zombies.pop(pid)
921 except KeyError:
922 # The child is running.
923 self._callbacks[pid] = callback, args
924 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800926 # The child is dead already. We can fire the callback.
927 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800928
Guido van Rossum2bcae702013-11-13 15:50:08 -0800929 def remove_child_handler(self, pid):
930 try:
931 del self._callbacks[pid]
932 return True
933 except KeyError:
934 return False
935
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800936 def _do_waitpid_all(self):
937 # Because of signal coalescing, we must keep calling waitpid() as
938 # long as we're able to reap a child.
939 while True:
940 try:
941 pid, status = os.waitpid(-1, os.WNOHANG)
942 except ChildProcessError:
943 # No more child processes exist.
944 return
945 else:
946 if pid == 0:
947 # A child process is still alive.
948 return
949
950 returncode = self._compute_returncode(status)
951
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800952 with self._lock:
953 try:
954 callback, args = self._callbacks.pop(pid)
955 except KeyError:
956 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800957 if self._forks:
958 # It may not be registered yet.
959 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200960 if self._loop.get_debug():
961 logger.debug('unknown process %s exited '
962 'with returncode %s',
963 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800964 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800965 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200966 else:
967 if self._loop.get_debug():
968 logger.debug('process %s exited with returncode %s',
969 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800970
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800971 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 logger.warning(
973 "Caught subprocess termination from unknown pid: "
974 "%d -> %d", pid, returncode)
975 else:
976 callback(pid, returncode, *args)
977
978
979class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100980 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981 _loop_factory = _UnixSelectorEventLoop
982
983 def __init__(self):
984 super().__init__()
985 self._watcher = None
986
987 def _init_watcher(self):
988 with events._lock:
989 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800990 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800991 if isinstance(threading.current_thread(),
992 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800993 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800994
995 def set_event_loop(self, loop):
996 """Set the event loop.
997
998 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800999 .set_event_loop() from the main thread will call .attach_loop(loop) on
1000 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001001 """
1002
1003 super().set_event_loop(loop)
1004
Andrew Svetlovcc839202017-11-29 18:23:43 +02001005 if (self._watcher is not None and
1006 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001007 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008
1009 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001010 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001011
1012 If not yet set, a SafeChildWatcher object is automatically created.
1013 """
1014 if self._watcher is None:
1015 self._init_watcher()
1016
1017 return self._watcher
1018
1019 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001020 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001021
1022 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1023
1024 if self._watcher is not None:
1025 self._watcher.close()
1026
1027 self._watcher = watcher
1028
Yury Selivanov6370f342017-12-10 18:36:12 -05001029
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001030SelectorEventLoop = _UnixSelectorEventLoop
1031DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy