blob: 6714542e4e33615ecb594bc598037640d5c7854a [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Andrew Svetlov13ed0792019-06-02 13:56:38 +03005import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01007import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import signal
9import socket
10import stat
11import subprocess
12import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
16
Yury Selivanovb057c522014-02-18 12:15:06 -050017from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070018from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080020from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070022from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010023from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040025from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070027from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29
Yury Selivanov6370f342017-12-10 18:36:12 -050030__all__ = (
31 'SelectorEventLoop',
32 'AbstractChildWatcher', 'SafeChildWatcher',
Andrew Svetlov13ed0792019-06-02 13:56:38 +030033 'FastChildWatcher',
34 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
35 'DefaultEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050036)
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039if sys.platform == 'win32': # pragma: no cover
40 raise ImportError('Signals are not really supported on Windows')
41
42
Victor Stinnerfe5649c2014-07-17 22:43:40 +020043def _sighandler_noop(signum, frame):
44 """Dummy signal handler."""
45 pass
46
47
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080048class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050049 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Yury Selivanovb057c522014-02-18 12:15:06 -050051 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 """
53
54 def __init__(self, selector=None):
55 super().__init__(selector)
56 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080058 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020059 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020060 if not sys.is_finalizing():
61 for sig in list(self._signal_handlers):
62 self.remove_signal_handler(sig)
63 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020064 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020065 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020066 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020067 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020068 ResourceWarning,
69 source=self)
70 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080071
Victor Stinnerfe5649c2014-07-17 22:43:40 +020072 def _process_self_data(self, data):
73 for signum in data:
74 if not signum:
75 # ignore null bytes written by _write_to_self()
76 continue
77 self._handle_signal(signum)
78
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 def add_signal_handler(self, sig, callback, *args):
80 """Add a handler for a signal. UNIX only.
81
82 Raise ValueError if the signal number is invalid or uncatchable.
83 Raise RuntimeError if there is a problem setting up the handler.
84 """
Yury Selivanov6370f342017-12-10 18:36:12 -050085 if (coroutines.iscoroutine(callback) or
86 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010087 raise TypeError("coroutines cannot be used "
88 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010090 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 try:
92 # set_wakeup_fd() raises ValueError if this is not the
93 # main thread. By calling it early we ensure that an
94 # event loop running in another thread cannot add a signal
95 # handler.
96 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020097 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 raise RuntimeError(str(exc))
99
Yury Selivanovf23746a2018-01-22 19:11:18 -0500100 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101 self._signal_handlers[sig] = handle
102
103 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200104 # Register a dummy signal handler to ask Python to write the signal
105 # number in the wakup file descriptor. _process_self_data() will
106 # read signal numbers from this file descriptor to handle signals.
107 signal.signal(sig, _sighandler_noop)
108
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100109 # Set SA_RESTART to limit EINTR occurrences.
110 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111 except OSError as exc:
112 del self._signal_handlers[sig]
113 if not self._signal_handlers:
114 try:
115 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200116 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700117 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118
119 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500120 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 else:
122 raise
123
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200124 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 """Internal helper that is the actual signal handler."""
126 handle = self._signal_handlers.get(sig)
127 if handle is None:
128 return # Assume it's some race condition.
129 if handle._cancelled:
130 self.remove_signal_handler(sig) # Remove it properly.
131 else:
132 self._add_callback_signalsafe(handle)
133
134 def remove_signal_handler(self, sig):
135 """Remove a handler for a signal. UNIX only.
136
137 Return True if a signal handler was removed, False if not.
138 """
139 self._check_signal(sig)
140 try:
141 del self._signal_handlers[sig]
142 except KeyError:
143 return False
144
145 if sig == signal.SIGINT:
146 handler = signal.default_int_handler
147 else:
148 handler = signal.SIG_DFL
149
150 try:
151 signal.signal(sig, handler)
152 except OSError as exc:
153 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500154 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 else:
156 raise
157
158 if not self._signal_handlers:
159 try:
160 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200161 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700162 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164 return True
165
166 def _check_signal(self, sig):
167 """Internal helper to validate a signal.
168
169 Raise ValueError if the signal number is invalid or uncatchable.
170 Raise RuntimeError if there is a problem setting up the handler.
171 """
172 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500173 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200175 if sig not in signal.valid_signals():
176 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
178 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
179 extra=None):
180 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
181
182 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
183 extra=None):
184 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
185
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200186 async def _make_subprocess_transport(self, protocol, args, shell,
187 stdin, stdout, stderr, bufsize,
188 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800189 with events.get_child_watcher() as watcher:
Andrew Svetlov13ed0792019-06-02 13:56:38 +0300190 if not watcher.is_active():
191 # Check early.
192 # Raising exception before process creation
193 # prevents subprocess execution if the watcher
194 # is not ready to handle it.
195 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
196 "subproccess support is not installed.")
Yury Selivanov7661db62016-05-16 15:38:39 -0400197 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800198 transp = _UnixSubprocessTransport(self, protocol, args, shell,
199 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100200 waiter=waiter, extra=extra,
201 **kwargs)
202
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800203 watcher.add_child_handler(transp.get_pid(),
204 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100205 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200206 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200207 except (SystemExit, KeyboardInterrupt):
208 raise
209 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100210 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211 await transp._wait()
212 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800213
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 return transp
215
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800216 def _child_watcher_callback(self, pid, returncode, transp):
217 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218
Neil Aspinallf7686c12017-12-19 19:45:42 +0000219 async def create_unix_connection(
220 self, protocol_factory, path=None, *,
221 ssl=None, sock=None,
222 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200223 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500224 assert server_hostname is None or isinstance(server_hostname, str)
225 if ssl:
226 if server_hostname is None:
227 raise ValueError(
228 'you have to pass server_hostname when using ssl')
229 else:
230 if server_hostname is not None:
231 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200232 if ssl_handshake_timeout is not None:
233 raise ValueError(
234 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500235
236 if path is not None:
237 if sock is not None:
238 raise ValueError(
239 'path and sock can not be specified at the same time')
240
Andrew Svetlovcc839202017-11-29 18:23:43 +0200241 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100242 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200245 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100246 except:
247 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 raise
249
250 else:
251 if sock is None:
252 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400253 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500254 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400255 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500256 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 sock.setblocking(False)
258
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200259 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000260 sock, protocol_factory, ssl, server_hostname,
261 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500262 return transport, protocol
263
Neil Aspinallf7686c12017-12-19 19:45:42 +0000264 async def create_unix_server(
265 self, protocol_factory, path=None, *,
266 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500267 ssl_handshake_timeout=None,
268 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500269 if isinstance(ssl, bool):
270 raise TypeError('ssl argument must be an SSLContext or None')
271
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200272 if ssl_handshake_timeout is not None and not ssl:
273 raise ValueError(
274 'ssl_handshake_timeout is only meaningful with ssl')
275
Yury Selivanovb057c522014-02-18 12:15:06 -0500276 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200277 if sock is not None:
278 raise ValueError(
279 'path and sock can not be specified at the same time')
280
Andrew Svetlovcc839202017-11-29 18:23:43 +0200281 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500282 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
283
Yury Selivanov908d55d2016-10-09 12:15:08 -0400284 # Check for abstract socket. `str` and `bytes` paths are supported.
285 if path[0] not in (0, '\x00'):
286 try:
287 if stat.S_ISSOCK(os.stat(path).st_mode):
288 os.remove(path)
289 except FileNotFoundError:
290 pass
291 except OSError as err:
292 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200293 logger.error('Unable to check or remove stale UNIX socket '
294 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400295
Yury Selivanovb057c522014-02-18 12:15:06 -0500296 try:
297 sock.bind(path)
298 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100299 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500300 if exc.errno == errno.EADDRINUSE:
301 # Let's improve the error message by adding
302 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500303 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500304 raise OSError(errno.EADDRINUSE, msg) from None
305 else:
306 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200307 except:
308 sock.close()
309 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500310 else:
311 if sock is None:
312 raise ValueError(
313 'path was not specified, and no sock specified')
314
Yury Selivanov36e7e972016-10-07 12:39:57 -0400315 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500316 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500317 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500318 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500319
Yury Selivanovb057c522014-02-18 12:15:06 -0500320 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500321 server = base_events.Server(self, [sock], protocol_factory,
322 ssl, backlog, ssl_handshake_timeout)
323 if start_serving:
324 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400325 # Skip one loop iteration so that all 'loop.add_reader'
326 # go through.
327 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500328
Yury Selivanovb057c522014-02-18 12:15:06 -0500329 return server
330
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200331 async def _sock_sendfile_native(self, sock, file, offset, count):
332 try:
333 os.sendfile
334 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700335 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200336 "os.sendfile() is not available")
337 try:
338 fileno = file.fileno()
339 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700340 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200341 try:
342 fsize = os.fstat(fileno).st_size
343 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700344 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200345 blocksize = count if count else fsize
346 if not blocksize:
347 return 0 # empty file
348
349 fut = self.create_future()
350 self._sock_sendfile_native_impl(fut, None, sock, fileno,
351 offset, count, blocksize, 0)
352 return await fut
353
354 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
355 offset, count, blocksize, total_sent):
356 fd = sock.fileno()
357 if registered_fd is not None:
358 # Remove the callback early. It should be rare that the
359 # selector says the fd is ready but the call still returns
360 # EAGAIN, and I am willing to take a hit in that case in
361 # order to simplify the common case.
362 self.remove_writer(registered_fd)
363 if fut.cancelled():
364 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
365 return
366 if count:
367 blocksize = count - total_sent
368 if blocksize <= 0:
369 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
370 fut.set_result(total_sent)
371 return
372
373 try:
374 sent = os.sendfile(fd, fileno, offset, blocksize)
375 except (BlockingIOError, InterruptedError):
376 if registered_fd is None:
377 self._sock_add_cancellation_callback(fut, sock)
378 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
379 fd, sock, fileno,
380 offset, count, blocksize, total_sent)
381 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500382 if (registered_fd is not None and
383 exc.errno == errno.ENOTCONN and
384 type(exc) is not ConnectionError):
385 # If we have an ENOTCONN and this isn't a first call to
386 # sendfile(), i.e. the connection was closed in the middle
387 # of the operation, normalize the error to ConnectionError
388 # to make it consistent across all Posix systems.
389 new_exc = ConnectionError(
390 "socket is not connected", errno.ENOTCONN)
391 new_exc.__cause__ = exc
392 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200393 if total_sent == 0:
394 # We can get here for different reasons, the main
395 # one being 'file' is not a regular mmap(2)-like
396 # file, in which case we'll fall back on using
397 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700398 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200399 "os.sendfile call failed")
400 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
401 fut.set_exception(err)
402 else:
403 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
404 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200405 except (SystemExit, KeyboardInterrupt):
406 raise
407 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200408 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
409 fut.set_exception(exc)
410 else:
411 if sent == 0:
412 # EOF
413 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
414 fut.set_result(total_sent)
415 else:
416 offset += sent
417 total_sent += sent
418 if registered_fd is None:
419 self._sock_add_cancellation_callback(fut, sock)
420 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
421 fd, sock, fileno,
422 offset, count, blocksize, total_sent)
423
424 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
425 if total_sent > 0:
426 os.lseek(fileno, offset, os.SEEK_SET)
427
428 def _sock_add_cancellation_callback(self, fut, sock):
429 def cb(fut):
430 if fut.cancelled():
431 fd = sock.fileno()
432 if fd != -1:
433 self.remove_writer(fd)
434 fut.add_done_callback(cb)
435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437class _UnixReadPipeTransport(transports.ReadTransport):
438
Yury Selivanovdec1a452014-02-18 22:27:48 -0500439 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440
441 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
442 super().__init__(extra)
443 self._extra['pipe'] = pipe
444 self._loop = loop
445 self._pipe = pipe
446 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700447 self._protocol = protocol
448 self._closing = False
449
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700450 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800451 if not (stat.S_ISFIFO(mode) or
452 stat.S_ISSOCK(mode) or
453 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700454 self._pipe = None
455 self._fileno = None
456 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700457 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700458
Andrew Svetlovcc839202017-11-29 18:23:43 +0200459 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700460
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100462 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400463 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100464 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100466 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500467 self._loop.call_soon(futures._set_result_unless_cancelled,
468 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Victor Stinnere912e652014-07-12 03:11:53 +0200470 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100471 info = [self.__class__.__name__]
472 if self._pipe is None:
473 info.append('closed')
474 elif self._closing:
475 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500476 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400477 selector = getattr(self._loop, '_selector', None)
478 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200479 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500480 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200481 if polling:
482 info.append('polling')
483 else:
484 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400485 elif self._pipe is not None:
486 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200487 else:
488 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500489 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200490
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 def _read_ready(self):
492 try:
493 data = os.read(self._fileno, self.max_size)
494 except (BlockingIOError, InterruptedError):
495 pass
496 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100497 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 else:
499 if data:
500 self._protocol.data_received(data)
501 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200502 if self._loop.get_debug():
503 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400505 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 self._loop.call_soon(self._protocol.eof_received)
507 self._loop.call_soon(self._call_connection_lost, None)
508
Guido van Rossum57497ad2013-10-18 07:58:20 -0700509 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400510 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511
Guido van Rossum57497ad2013-10-18 07:58:20 -0700512 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400513 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400515 def set_protocol(self, protocol):
516 self._protocol = protocol
517
518 def get_protocol(self):
519 return self._protocol
520
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500521 def is_closing(self):
522 return self._closing
523
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 def close(self):
525 if not self._closing:
526 self._close(None)
527
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100528 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900529 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100530 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900531 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100532
Victor Stinner0ee29c22014-02-19 01:40:41 +0100533 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200535 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
536 if self._loop.get_debug():
537 logger.debug("%r: %s", self, message, exc_info=True)
538 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500539 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100540 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500541 'exception': exc,
542 'transport': self,
543 'protocol': self._protocol,
544 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 self._close(exc)
546
547 def _close(self, exc):
548 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400549 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 self._loop.call_soon(self._call_connection_lost, exc)
551
552 def _call_connection_lost(self, exc):
553 try:
554 self._protocol.connection_lost(exc)
555 finally:
556 self._pipe.close()
557 self._pipe = None
558 self._protocol = None
559 self._loop = None
560
561
Yury Selivanov3cb99142014-02-18 18:41:13 -0500562class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800563 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564
565 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100566 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 self._pipe = pipe
569 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400571 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 self._conn_lost = 0
573 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700574
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700575 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700576 is_char = stat.S_ISCHR(mode)
577 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700578 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700579 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700580 self._pipe = None
581 self._fileno = None
582 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100583 raise ValueError("Pipe transport is only for "
584 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700585
Andrew Svetlovcc839202017-11-29 18:23:43 +0200586 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100588
589 # On AIX, the reader trick (to be notified when the read end of the
590 # socket is closed) only works for sockets. On other platforms it
591 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700592 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100593 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400594 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100595 self._fileno, self._read_ready)
596
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100598 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500599 self._loop.call_soon(futures._set_result_unless_cancelled,
600 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601
Victor Stinnere912e652014-07-12 03:11:53 +0200602 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100603 info = [self.__class__.__name__]
604 if self._pipe is None:
605 info.append('closed')
606 elif self._closing:
607 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500608 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400609 selector = getattr(self._loop, '_selector', None)
610 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200611 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500612 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200613 if polling:
614 info.append('polling')
615 else:
616 info.append('idle')
617
618 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500619 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400620 elif self._pipe is not None:
621 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200622 else:
623 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500624 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200625
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800626 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400627 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800628
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700629 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700630 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200631 if self._loop.get_debug():
632 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100633 if self._buffer:
634 self._close(BrokenPipeError())
635 else:
636 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637
638 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800639 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
640 if isinstance(data, bytearray):
641 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 if not data:
643 return
644
645 if self._conn_lost or self._closing:
646 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700647 logger.warning('pipe closed by peer or '
648 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 self._conn_lost += 1
650 return
651
652 if not self._buffer:
653 # Attempt to send it right away first.
654 try:
655 n = os.write(self._fileno, data)
656 except (BlockingIOError, InterruptedError):
657 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200658 except (SystemExit, KeyboardInterrupt):
659 raise
660 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100662 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 return
664 if n == len(data):
665 return
666 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400667 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400668 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400670 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800671 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672
673 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400674 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400677 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400679 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200680 except (SystemExit, KeyboardInterrupt):
681 raise
682 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400683 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 self._conn_lost += 1
685 # Remove writer here, _fatal_error() doesn't it
686 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400687 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100688 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400690 if n == len(self._buffer):
691 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400692 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800693 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400694 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400695 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 self._call_connection_lost(None)
697 return
698 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400699 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700
701 def can_write_eof(self):
702 return True
703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 def write_eof(self):
705 if self._closing:
706 return
707 assert self._pipe
708 self._closing = True
709 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400710 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 self._loop.call_soon(self._call_connection_lost, None)
712
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400713 def set_protocol(self, protocol):
714 self._protocol = protocol
715
716 def get_protocol(self):
717 return self._protocol
718
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500719 def is_closing(self):
720 return self._closing
721
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100723 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 # write_eof is all what we needed to close the write pipe
725 self.write_eof()
726
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100727 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900728 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100729 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900730 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100731
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 def abort(self):
733 self._close(None)
734
Victor Stinner0ee29c22014-02-19 01:40:41 +0100735 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300737 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200738 if self._loop.get_debug():
739 logger.debug("%r: %s", self, message, exc_info=True)
740 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500741 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100742 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500743 'exception': exc,
744 'transport': self,
745 'protocol': self._protocol,
746 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747 self._close(exc)
748
749 def _close(self, exc=None):
750 self._closing = True
751 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400752 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400754 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 self._loop.call_soon(self._call_connection_lost, exc)
756
757 def _call_connection_lost(self, exc):
758 try:
759 self._protocol.connection_lost(exc)
760 finally:
761 self._pipe.close()
762 self._pipe = None
763 self._protocol = None
764 self._loop = None
765
766
Guido van Rossum59691282013-10-30 14:52:03 -0700767class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768
Guido van Rossum59691282013-10-30 14:52:03 -0700769 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700770 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700772 # Use a socket pair for stdin, since not all platforms
773 # support selecting read events on the write end of a
774 # socket (which we use in order to detect closing of the
775 # other end). Notably this is needed on AIX, and works
776 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100777 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200778 try:
779 self._proc = subprocess.Popen(
780 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
781 universal_newlines=False, bufsize=bufsize, **kwargs)
782 if stdin_w is not None:
783 stdin.close()
784 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
785 stdin_w = None
786 finally:
787 if stdin_w is not None:
788 stdin.close()
789 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800790
791
792class AbstractChildWatcher:
793 """Abstract base class for monitoring child processes.
794
795 Objects derived from this class monitor a collection of subprocesses and
796 report their termination or interruption by a signal.
797
798 New callbacks are registered with .add_child_handler(). Starting a new
799 process must be done within a 'with' block to allow the watcher to suspend
800 its activity until the new process if fully registered (this is needed to
801 prevent a race condition in some implementations).
802
803 Example:
804 with watcher:
805 proc = subprocess.Popen("sleep 1")
806 watcher.add_child_handler(proc.pid, callback)
807
808 Notes:
809 Implementations of this class must be thread-safe.
810
811 Since child watcher objects may catch the SIGCHLD signal and call
812 waitpid(-1), there should be only one active object per process.
813 """
814
815 def add_child_handler(self, pid, callback, *args):
816 """Register a new child handler.
817
818 Arrange for callback(pid, returncode, *args) to be called when
819 process 'pid' terminates. Specifying another callback for the same
820 process replaces the previous handler.
821
Victor Stinneracdb7822014-07-14 18:33:40 +0200822 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800823 """
824 raise NotImplementedError()
825
826 def remove_child_handler(self, pid):
827 """Removes the handler for process 'pid'.
828
829 The function returns True if the handler was successfully removed,
830 False if there was nothing to remove."""
831
832 raise NotImplementedError()
833
Guido van Rossum2bcae702013-11-13 15:50:08 -0800834 def attach_loop(self, loop):
835 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800836
Guido van Rossum2bcae702013-11-13 15:50:08 -0800837 If the watcher was previously attached to an event loop, then it is
838 first detached before attaching to the new loop.
839
840 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800841 """
842 raise NotImplementedError()
843
844 def close(self):
845 """Close the watcher.
846
847 This must be called to make sure that any underlying resource is freed.
848 """
849 raise NotImplementedError()
850
Andrew Svetlov13ed0792019-06-02 13:56:38 +0300851 def is_active(self):
852 """Watcher status.
853
854 Return True if the watcher is installed and ready to handle process exit
855 notifications.
856
857 """
858 raise NotImplementedError()
859
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800860 def __enter__(self):
861 """Enter the watcher's context and allow starting new processes
862
863 This function must return self"""
864 raise NotImplementedError()
865
866 def __exit__(self, a, b, c):
867 """Exit the watcher's context"""
868 raise NotImplementedError()
869
870
Andrew Svetlov13ed0792019-06-02 13:56:38 +0300871def _compute_returncode(status):
872 if os.WIFSIGNALED(status):
873 # The child process died because of a signal.
874 return -os.WTERMSIG(status)
875 elif os.WIFEXITED(status):
876 # The child process exited (e.g sys.exit()).
877 return os.WEXITSTATUS(status)
878 else:
879 # The child exited, but we don't understand its status.
880 # This shouldn't happen, but if it does, let's just
881 # return that status; perhaps that helps debug it.
882 return status
883
884
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885class BaseChildWatcher(AbstractChildWatcher):
886
Guido van Rossum2bcae702013-11-13 15:50:08 -0800887 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800888 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400889 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800890
891 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800892 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800893
Andrew Svetlov13ed0792019-06-02 13:56:38 +0300894 def is_active(self):
895 return self._loop is not None and self._loop.is_running()
896
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800897 def _do_waitpid(self, expected_pid):
898 raise NotImplementedError()
899
900 def _do_waitpid_all(self):
901 raise NotImplementedError()
902
Guido van Rossum2bcae702013-11-13 15:50:08 -0800903 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904 assert loop is None or isinstance(loop, events.AbstractEventLoop)
905
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400906 if self._loop is not None and loop is None and self._callbacks:
907 warnings.warn(
908 'A loop is being detached '
909 'from a child watcher with pending handlers',
910 RuntimeWarning)
911
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800912 if self._loop is not None:
913 self._loop.remove_signal_handler(signal.SIGCHLD)
914
915 self._loop = loop
916 if loop is not None:
917 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
918
919 # Prevent a race condition in case a child terminated
920 # during the switch.
921 self._do_waitpid_all()
922
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800923 def _sig_chld(self):
924 try:
925 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +0200926 except (SystemExit, KeyboardInterrupt):
927 raise
928 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -0500929 # self._loop should always be available here
930 # as '_sig_chld' is added as a signal handler
931 # in 'attach_loop'
932 self._loop.call_exception_handler({
933 'message': 'Unknown exception in SIGCHLD handler',
934 'exception': exc,
935 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800936
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800937
938class SafeChildWatcher(BaseChildWatcher):
939 """'Safe' child watcher implementation.
940
941 This implementation avoids disrupting other code spawning processes by
942 polling explicitly each process in the SIGCHLD handler instead of calling
943 os.waitpid(-1).
944
945 This is a safe solution but it has a significant overhead when handling a
946 big number of children (O(n) each time SIGCHLD is raised)
947 """
948
Guido van Rossum2bcae702013-11-13 15:50:08 -0800949 def close(self):
950 self._callbacks.clear()
951 super().close()
952
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800953 def __enter__(self):
954 return self
955
956 def __exit__(self, a, b, c):
957 pass
958
959 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100960 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800961
962 # Prevent a race condition in case the child is already terminated.
963 self._do_waitpid(pid)
964
Guido van Rossum2bcae702013-11-13 15:50:08 -0800965 def remove_child_handler(self, pid):
966 try:
967 del self._callbacks[pid]
968 return True
969 except KeyError:
970 return False
971
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 def _do_waitpid_all(self):
973
974 for pid in list(self._callbacks):
975 self._do_waitpid(pid)
976
977 def _do_waitpid(self, expected_pid):
978 assert expected_pid > 0
979
980 try:
981 pid, status = os.waitpid(expected_pid, os.WNOHANG)
982 except ChildProcessError:
983 # The child process is already reaped
984 # (may happen if waitpid() is called elsewhere).
985 pid = expected_pid
986 returncode = 255
987 logger.warning(
988 "Unknown child process pid %d, will report returncode 255",
989 pid)
990 else:
991 if pid == 0:
992 # The child process is still alive.
993 return
994
Andrew Svetlov13ed0792019-06-02 13:56:38 +0300995 returncode = _compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200996 if self._loop.get_debug():
997 logger.debug('process %s exited with returncode %s',
998 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800999
1000 try:
1001 callback, args = self._callbacks.pop(pid)
1002 except KeyError: # pragma: no cover
1003 # May happen if .remove_child_handler() is called
1004 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +02001005 if self._loop.get_debug():
1006 logger.warning("Child watcher got an unexpected pid: %r",
1007 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008 else:
1009 callback(pid, returncode, *args)
1010
1011
1012class FastChildWatcher(BaseChildWatcher):
1013 """'Fast' child watcher implementation.
1014
1015 This implementation reaps every terminated processes by calling
1016 os.waitpid(-1) directly, possibly breaking other code spawning processes
1017 and waiting for their termination.
1018
1019 There is no noticeable overhead when handling a big number of children
1020 (O(1) each time a child terminates).
1021 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001022 def __init__(self):
1023 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001024 self._lock = threading.Lock()
1025 self._zombies = {}
1026 self._forks = 0
1027
1028 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001029 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001030 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001031 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001032
1033 def __enter__(self):
1034 with self._lock:
1035 self._forks += 1
1036
1037 return self
1038
1039 def __exit__(self, a, b, c):
1040 with self._lock:
1041 self._forks -= 1
1042
1043 if self._forks or not self._zombies:
1044 return
1045
1046 collateral_victims = str(self._zombies)
1047 self._zombies.clear()
1048
1049 logger.warning(
1050 "Caught subprocesses termination from unknown pids: %s",
1051 collateral_victims)
1052
1053 def add_child_handler(self, pid, callback, *args):
1054 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001055
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001056 with self._lock:
1057 try:
1058 returncode = self._zombies.pop(pid)
1059 except KeyError:
1060 # The child is running.
1061 self._callbacks[pid] = callback, args
1062 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001063
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001064 # The child is dead already. We can fire the callback.
1065 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001066
Guido van Rossum2bcae702013-11-13 15:50:08 -08001067 def remove_child_handler(self, pid):
1068 try:
1069 del self._callbacks[pid]
1070 return True
1071 except KeyError:
1072 return False
1073
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001074 def _do_waitpid_all(self):
1075 # Because of signal coalescing, we must keep calling waitpid() as
1076 # long as we're able to reap a child.
1077 while True:
1078 try:
1079 pid, status = os.waitpid(-1, os.WNOHANG)
1080 except ChildProcessError:
1081 # No more child processes exist.
1082 return
1083 else:
1084 if pid == 0:
1085 # A child process is still alive.
1086 return
1087
Andrew Svetlov13ed0792019-06-02 13:56:38 +03001088 returncode = _compute_returncode(status)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001089
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001090 with self._lock:
1091 try:
1092 callback, args = self._callbacks.pop(pid)
1093 except KeyError:
1094 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001095 if self._forks:
1096 # It may not be registered yet.
1097 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001098 if self._loop.get_debug():
1099 logger.debug('unknown process %s exited '
1100 'with returncode %s',
1101 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001102 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001103 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001104 else:
1105 if self._loop.get_debug():
1106 logger.debug('process %s exited with returncode %s',
1107 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001108
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001109 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001110 logger.warning(
1111 "Caught subprocess termination from unknown pid: "
1112 "%d -> %d", pid, returncode)
1113 else:
1114 callback(pid, returncode, *args)
1115
1116
Andrew Svetlov13ed0792019-06-02 13:56:38 +03001117class MultiLoopChildWatcher(AbstractChildWatcher):
1118 # The class keeps compatibility with AbstractChildWatcher ABC
1119 # To achieve this it has empty attach_loop() method
1120 # and doesn't accept explicit loop argument
1121 # for add_child_handler()/remove_child_handler()
1122 # but retrieves the current loop by get_running_loop()
1123
1124 def __init__(self):
1125 self._callbacks = {}
1126 self._saved_sighandler = None
1127
1128 def is_active(self):
1129 return self._saved_sighandler is not None
1130
1131 def close(self):
1132 self._callbacks.clear()
1133 if self._saved_sighandler is not None:
1134 handler = signal.getsignal(signal.SIGCHLD)
1135 if handler != self._sig_chld:
1136 logger.warning("SIGCHLD handler was changed by outside code")
1137 else:
1138 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1139 self._saved_sighandler = None
1140
1141 def __enter__(self):
1142 return self
1143
1144 def __exit__(self, exc_type, exc_val, exc_tb):
1145 pass
1146
1147 def add_child_handler(self, pid, callback, *args):
1148 loop = events.get_running_loop()
1149 self._callbacks[pid] = (loop, callback, args)
1150
1151 # Prevent a race condition in case the child is already terminated.
1152 self._do_waitpid(pid)
1153
1154 def remove_child_handler(self, pid):
1155 try:
1156 del self._callbacks[pid]
1157 return True
1158 except KeyError:
1159 return False
1160
1161 def attach_loop(self, loop):
1162 # Don't save the loop but initialize itself if called first time
1163 # The reason to do it here is that attach_loop() is called from
1164 # unix policy only for the main thread.
1165 # Main thread is required for subscription on SIGCHLD signal
1166 if self._saved_sighandler is None:
1167 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1168 if self._saved_sighandler is None:
1169 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1170 "restore to default handler on watcher close.")
1171 self._saved_sighandler = signal.SIG_DFL
1172
1173 # Set SA_RESTART to limit EINTR occurrences.
1174 signal.siginterrupt(signal.SIGCHLD, False)
1175
1176 def _do_waitpid_all(self):
1177 for pid in list(self._callbacks):
1178 self._do_waitpid(pid)
1179
1180 def _do_waitpid(self, expected_pid):
1181 assert expected_pid > 0
1182
1183 try:
1184 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1185 except ChildProcessError:
1186 # The child process is already reaped
1187 # (may happen if waitpid() is called elsewhere).
1188 pid = expected_pid
1189 returncode = 255
1190 logger.warning(
1191 "Unknown child process pid %d, will report returncode 255",
1192 pid)
1193 debug_log = False
1194 else:
1195 if pid == 0:
1196 # The child process is still alive.
1197 return
1198
1199 returncode = _compute_returncode(status)
1200 debug_log = True
1201 try:
1202 loop, callback, args = self._callbacks.pop(pid)
1203 except KeyError: # pragma: no cover
1204 # May happen if .remove_child_handler() is called
1205 # after os.waitpid() returns.
1206 logger.warning("Child watcher got an unexpected pid: %r",
1207 pid, exc_info=True)
1208 else:
1209 if loop.is_closed():
1210 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1211 else:
1212 if debug_log and loop.get_debug():
1213 logger.debug('process %s exited with returncode %s',
1214 expected_pid, returncode)
1215 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1216
1217 def _sig_chld(self, signum, frame):
1218 try:
1219 self._do_waitpid_all()
1220 except (SystemExit, KeyboardInterrupt):
1221 raise
1222 except BaseException:
1223 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1224
1225
1226class ThreadedChildWatcher(AbstractChildWatcher):
1227 # The watcher uses a thread per process
1228 # for waiting for the process finish.
1229 # It doesn't require subscription on POSIX signal
1230
1231 def __init__(self):
1232 self._pid_counter = itertools.count(0)
1233
1234 def is_active(self):
1235 return True
1236
1237 def close(self):
1238 pass
1239
1240 def __enter__(self):
1241 return self
1242
1243 def __exit__(self, exc_type, exc_val, exc_tb):
1244 pass
1245
1246 def add_child_handler(self, pid, callback, *args):
1247 loop = events.get_running_loop()
1248 thread = threading.Thread(target=self._do_waitpid,
1249 name=f"waitpid-{next(self._pid_counter)}",
1250 args=(loop, pid, callback, args),
1251 daemon=True)
1252 thread.start()
1253
1254 def remove_child_handler(self, pid):
1255 # asyncio never calls remove_child_handler() !!!
1256 # The method is no-op but is implemented because
1257 # abstract base classe requires it
1258 return True
1259
1260 def attach_loop(self, loop):
1261 pass
1262
1263 def _do_waitpid(self, loop, expected_pid, callback, args):
1264 assert expected_pid > 0
1265
1266 try:
1267 pid, status = os.waitpid(expected_pid, 0)
1268 except ChildProcessError:
1269 # The child process is already reaped
1270 # (may happen if waitpid() is called elsewhere).
1271 pid = expected_pid
1272 returncode = 255
1273 logger.warning(
1274 "Unknown child process pid %d, will report returncode 255",
1275 pid)
1276 else:
1277 returncode = _compute_returncode(status)
1278 if loop.get_debug():
1279 logger.debug('process %s exited with returncode %s',
1280 expected_pid, returncode)
1281
1282 if loop.is_closed():
1283 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1284 else:
1285 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1286
1287
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001288class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001289 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001290 _loop_factory = _UnixSelectorEventLoop
1291
1292 def __init__(self):
1293 super().__init__()
1294 self._watcher = None
1295
1296 def _init_watcher(self):
1297 with events._lock:
1298 if self._watcher is None: # pragma: no branch
Andrew Svetlov13ed0792019-06-02 13:56:38 +03001299 self._watcher = ThreadedChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001300 if isinstance(threading.current_thread(),
1301 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001302 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001303
1304 def set_event_loop(self, loop):
1305 """Set the event loop.
1306
1307 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001308 .set_event_loop() from the main thread will call .attach_loop(loop) on
1309 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001310 """
1311
1312 super().set_event_loop(loop)
1313
Andrew Svetlovcc839202017-11-29 18:23:43 +02001314 if (self._watcher is not None and
1315 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001316 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001317
1318 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001319 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001320
Andrew Svetlov13ed0792019-06-02 13:56:38 +03001321 If not yet set, a ThreadedChildWatcher object is automatically created.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001322 """
1323 if self._watcher is None:
1324 self._init_watcher()
1325
1326 return self._watcher
1327
1328 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001329 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001330
1331 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1332
1333 if self._watcher is not None:
1334 self._watcher.close()
1335
1336 self._watcher = watcher
1337
Yury Selivanov6370f342017-12-10 18:36:12 -05001338
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001339SelectorEventLoop = _UnixSelectorEventLoop
1340DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy