blob: cbbb1065aedad7da92f8d0f04c173a6948f92643 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Andrew Svetlov0d671c02019-06-30 12:54:59 +03005import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01007import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import signal
9import socket
10import stat
11import subprocess
12import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
Andrew Svetlov0d671c02019-06-30 12:54:59 +030032 'FastChildWatcher',
33 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050035)
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
Victor Stinnerfe5649c2014-07-17 22:43:40 +020042def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080047class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050048 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
Yury Selivanovb057c522014-02-18 12:15:06 -050050 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 """
52
53 def __init__(self, selector=None):
54 super().__init__(selector)
55 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020058 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020059 if not sys.is_finalizing():
60 for sig in list(self._signal_handlers):
61 self.remove_signal_handler(sig)
62 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020066 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020067 ResourceWarning,
68 source=self)
69 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080070
Victor Stinnerfe5649c2014-07-17 22:43:40 +020071 def _process_self_data(self, data):
72 for signum in data:
73 if not signum:
74 # ignore null bytes written by _write_to_self()
75 continue
76 self._handle_signal(signum)
77
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 def add_signal_handler(self, sig, callback, *args):
79 """Add a handler for a signal. UNIX only.
80
81 Raise ValueError if the signal number is invalid or uncatchable.
82 Raise RuntimeError if there is a problem setting up the handler.
83 """
Yury Selivanov6370f342017-12-10 18:36:12 -050084 if (coroutines.iscoroutine(callback) or
85 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010086 raise TypeError("coroutines cannot be used "
87 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010089 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 try:
91 # set_wakeup_fd() raises ValueError if this is not the
92 # main thread. By calling it early we ensure that an
93 # event loop running in another thread cannot add a signal
94 # handler.
95 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020096 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 raise RuntimeError(str(exc))
98
Yury Selivanovf23746a2018-01-22 19:11:18 -050099 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 self._signal_handlers[sig] = handle
101
102 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 # Register a dummy signal handler to ask Python to write the signal
104 # number in the wakup file descriptor. _process_self_data() will
105 # read signal numbers from this file descriptor to handle signals.
106 signal.signal(sig, _sighandler_noop)
107
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100108 # Set SA_RESTART to limit EINTR occurrences.
109 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 except OSError as exc:
111 del self._signal_handlers[sig]
112 if not self._signal_handlers:
113 try:
114 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200115 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700116 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
118 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500119 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 else:
121 raise
122
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200123 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 """Internal helper that is the actual signal handler."""
125 handle = self._signal_handlers.get(sig)
126 if handle is None:
127 return # Assume it's some race condition.
128 if handle._cancelled:
129 self.remove_signal_handler(sig) # Remove it properly.
130 else:
131 self._add_callback_signalsafe(handle)
132
133 def remove_signal_handler(self, sig):
134 """Remove a handler for a signal. UNIX only.
135
136 Return True if a signal handler was removed, False if not.
137 """
138 self._check_signal(sig)
139 try:
140 del self._signal_handlers[sig]
141 except KeyError:
142 return False
143
144 if sig == signal.SIGINT:
145 handler = signal.default_int_handler
146 else:
147 handler = signal.SIG_DFL
148
149 try:
150 signal.signal(sig, handler)
151 except OSError as exc:
152 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500153 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 else:
155 raise
156
157 if not self._signal_handlers:
158 try:
159 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200160 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700161 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162
163 return True
164
165 def _check_signal(self, sig):
166 """Internal helper to validate a signal.
167
168 Raise ValueError if the signal number is invalid or uncatchable.
169 Raise RuntimeError if there is a problem setting up the handler.
170 """
171 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500172 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200174 if sig not in signal.valid_signals():
175 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176
177 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
180
181 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
182 extra=None):
183 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
184
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200185 async def _make_subprocess_transport(self, protocol, args, shell,
186 stdin, stdout, stderr, bufsize,
187 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 with events.get_child_watcher() as watcher:
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300189 if not watcher.is_active():
190 # Check early.
191 # Raising exception before process creation
192 # prevents subprocess execution if the watcher
193 # is not ready to handle it.
194 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
195 "subprocess support is not installed.")
Yury Selivanov7661db62016-05-16 15:38:39 -0400196 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800197 transp = _UnixSubprocessTransport(self, protocol, args, shell,
198 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 waiter=waiter, extra=extra,
200 **kwargs)
201
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800202 watcher.add_child_handler(transp.get_pid(),
203 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100204 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200205 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200206 except (SystemExit, KeyboardInterrupt):
207 raise
208 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100209 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200210 await transp._wait()
211 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800212
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 return transp
214
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800215 def _child_watcher_callback(self, pid, returncode, transp):
216 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
Neil Aspinallf7686c12017-12-19 19:45:42 +0000218 async def create_unix_connection(
219 self, protocol_factory, path=None, *,
220 ssl=None, sock=None,
221 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200222 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500223 assert server_hostname is None or isinstance(server_hostname, str)
224 if ssl:
225 if server_hostname is None:
226 raise ValueError(
227 'you have to pass server_hostname when using ssl')
228 else:
229 if server_hostname is not None:
230 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200231 if ssl_handshake_timeout is not None:
232 raise ValueError(
233 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500234
235 if path is not None:
236 if sock is not None:
237 raise ValueError(
238 'path and sock can not be specified at the same time')
239
Andrew Svetlovcc839202017-11-29 18:23:43 +0200240 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100241 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500242 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200244 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100245 except:
246 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500247 raise
248
249 else:
250 if sock is None:
251 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400252 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500253 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400254 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500255 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 sock.setblocking(False)
257
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200258 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000259 sock, protocol_factory, ssl, server_hostname,
260 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500261 return transport, protocol
262
Neil Aspinallf7686c12017-12-19 19:45:42 +0000263 async def create_unix_server(
264 self, protocol_factory, path=None, *,
265 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500266 ssl_handshake_timeout=None,
267 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500268 if isinstance(ssl, bool):
269 raise TypeError('ssl argument must be an SSLContext or None')
270
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200271 if ssl_handshake_timeout is not None and not ssl:
272 raise ValueError(
273 'ssl_handshake_timeout is only meaningful with ssl')
274
Yury Selivanovb057c522014-02-18 12:15:06 -0500275 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200276 if sock is not None:
277 raise ValueError(
278 'path and sock can not be specified at the same time')
279
Andrew Svetlovcc839202017-11-29 18:23:43 +0200280 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500281 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
282
Yury Selivanov908d55d2016-10-09 12:15:08 -0400283 # Check for abstract socket. `str` and `bytes` paths are supported.
284 if path[0] not in (0, '\x00'):
285 try:
286 if stat.S_ISSOCK(os.stat(path).st_mode):
287 os.remove(path)
288 except FileNotFoundError:
289 pass
290 except OSError as err:
291 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200292 logger.error('Unable to check or remove stale UNIX socket '
293 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400294
Yury Selivanovb057c522014-02-18 12:15:06 -0500295 try:
296 sock.bind(path)
297 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100298 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500299 if exc.errno == errno.EADDRINUSE:
300 # Let's improve the error message by adding
301 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500302 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500303 raise OSError(errno.EADDRINUSE, msg) from None
304 else:
305 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200306 except:
307 sock.close()
308 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500309 else:
310 if sock is None:
311 raise ValueError(
312 'path was not specified, and no sock specified')
313
Yury Selivanov36e7e972016-10-07 12:39:57 -0400314 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500315 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500316 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500317 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500318
Yury Selivanovb057c522014-02-18 12:15:06 -0500319 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500320 server = base_events.Server(self, [sock], protocol_factory,
321 ssl, backlog, ssl_handshake_timeout)
322 if start_serving:
323 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400324 # Skip one loop iteration so that all 'loop.add_reader'
325 # go through.
326 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500327
Yury Selivanovb057c522014-02-18 12:15:06 -0500328 return server
329
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200330 async def _sock_sendfile_native(self, sock, file, offset, count):
331 try:
332 os.sendfile
333 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700334 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200335 "os.sendfile() is not available")
336 try:
337 fileno = file.fileno()
338 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700339 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200340 try:
341 fsize = os.fstat(fileno).st_size
342 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700343 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200344 blocksize = count if count else fsize
345 if not blocksize:
346 return 0 # empty file
347
348 fut = self.create_future()
349 self._sock_sendfile_native_impl(fut, None, sock, fileno,
350 offset, count, blocksize, 0)
351 return await fut
352
353 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
354 offset, count, blocksize, total_sent):
355 fd = sock.fileno()
356 if registered_fd is not None:
357 # Remove the callback early. It should be rare that the
358 # selector says the fd is ready but the call still returns
359 # EAGAIN, and I am willing to take a hit in that case in
360 # order to simplify the common case.
361 self.remove_writer(registered_fd)
362 if fut.cancelled():
363 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
364 return
365 if count:
366 blocksize = count - total_sent
367 if blocksize <= 0:
368 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
369 fut.set_result(total_sent)
370 return
371
372 try:
373 sent = os.sendfile(fd, fileno, offset, blocksize)
374 except (BlockingIOError, InterruptedError):
375 if registered_fd is None:
376 self._sock_add_cancellation_callback(fut, sock)
377 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
378 fd, sock, fileno,
379 offset, count, blocksize, total_sent)
380 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500381 if (registered_fd is not None and
382 exc.errno == errno.ENOTCONN and
383 type(exc) is not ConnectionError):
384 # If we have an ENOTCONN and this isn't a first call to
385 # sendfile(), i.e. the connection was closed in the middle
386 # of the operation, normalize the error to ConnectionError
387 # to make it consistent across all Posix systems.
388 new_exc = ConnectionError(
389 "socket is not connected", errno.ENOTCONN)
390 new_exc.__cause__ = exc
391 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200392 if total_sent == 0:
393 # We can get here for different reasons, the main
394 # one being 'file' is not a regular mmap(2)-like
395 # file, in which case we'll fall back on using
396 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700397 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200398 "os.sendfile call failed")
399 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400 fut.set_exception(err)
401 else:
402 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
403 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200404 except (SystemExit, KeyboardInterrupt):
405 raise
406 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200407 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
408 fut.set_exception(exc)
409 else:
410 if sent == 0:
411 # EOF
412 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413 fut.set_result(total_sent)
414 else:
415 offset += sent
416 total_sent += sent
417 if registered_fd is None:
418 self._sock_add_cancellation_callback(fut, sock)
419 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
420 fd, sock, fileno,
421 offset, count, blocksize, total_sent)
422
423 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
424 if total_sent > 0:
425 os.lseek(fileno, offset, os.SEEK_SET)
426
427 def _sock_add_cancellation_callback(self, fut, sock):
428 def cb(fut):
429 if fut.cancelled():
430 fd = sock.fileno()
431 if fd != -1:
432 self.remove_writer(fd)
433 fut.add_done_callback(cb)
434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436class _UnixReadPipeTransport(transports.ReadTransport):
437
Yury Selivanovdec1a452014-02-18 22:27:48 -0500438 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
440 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441 super().__init__(extra)
442 self._extra['pipe'] = pipe
443 self._loop = loop
444 self._pipe = pipe
445 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700446 self._protocol = protocol
447 self._closing = False
448
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700449 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800450 if not (stat.S_ISFIFO(mode) or
451 stat.S_ISSOCK(mode) or
452 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700453 self._pipe = None
454 self._fileno = None
455 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700456 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700457
Andrew Svetlovcc839202017-11-29 18:23:43 +0200458 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100461 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400462 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100463 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100465 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500466 self._loop.call_soon(futures._set_result_unless_cancelled,
467 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468
Victor Stinnere912e652014-07-12 03:11:53 +0200469 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100470 info = [self.__class__.__name__]
471 if self._pipe is None:
472 info.append('closed')
473 elif self._closing:
474 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500475 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400476 selector = getattr(self._loop, '_selector', None)
477 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200478 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500479 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200480 if polling:
481 info.append('polling')
482 else:
483 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400484 elif self._pipe is not None:
485 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200486 else:
487 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500488 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200489
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 def _read_ready(self):
491 try:
492 data = os.read(self._fileno, self.max_size)
493 except (BlockingIOError, InterruptedError):
494 pass
495 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100496 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497 else:
498 if data:
499 self._protocol.data_received(data)
500 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200501 if self._loop.get_debug():
502 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400504 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 self._loop.call_soon(self._protocol.eof_received)
506 self._loop.call_soon(self._call_connection_lost, None)
507
Guido van Rossum57497ad2013-10-18 07:58:20 -0700508 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400509 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510
Guido van Rossum57497ad2013-10-18 07:58:20 -0700511 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400512 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400514 def set_protocol(self, protocol):
515 self._protocol = protocol
516
517 def get_protocol(self):
518 return self._protocol
519
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500520 def is_closing(self):
521 return self._closing
522
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 def close(self):
524 if not self._closing:
525 self._close(None)
526
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100527 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900528 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100529 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900530 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100531
Victor Stinner0ee29c22014-02-19 01:40:41 +0100532 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200534 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
535 if self._loop.get_debug():
536 logger.debug("%r: %s", self, message, exc_info=True)
537 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500538 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100539 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500540 'exception': exc,
541 'transport': self,
542 'protocol': self._protocol,
543 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 self._close(exc)
545
546 def _close(self, exc):
547 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400548 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 self._loop.call_soon(self._call_connection_lost, exc)
550
551 def _call_connection_lost(self, exc):
552 try:
553 self._protocol.connection_lost(exc)
554 finally:
555 self._pipe.close()
556 self._pipe = None
557 self._protocol = None
558 self._loop = None
559
560
Yury Selivanov3cb99142014-02-18 18:41:13 -0500561class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800562 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563
564 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100565 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 self._pipe = pipe
568 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400570 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 self._conn_lost = 0
572 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700573
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700574 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700575 is_char = stat.S_ISCHR(mode)
576 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700577 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700578 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700579 self._pipe = None
580 self._fileno = None
581 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100582 raise ValueError("Pipe transport is only for "
583 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700584
Andrew Svetlovcc839202017-11-29 18:23:43 +0200585 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100587
588 # On AIX, the reader trick (to be notified when the read end of the
589 # socket is closed) only works for sockets. On other platforms it
590 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700591 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100592 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400593 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100594 self._fileno, self._read_ready)
595
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100597 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500598 self._loop.call_soon(futures._set_result_unless_cancelled,
599 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600
Victor Stinnere912e652014-07-12 03:11:53 +0200601 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100602 info = [self.__class__.__name__]
603 if self._pipe is None:
604 info.append('closed')
605 elif self._closing:
606 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500607 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400608 selector = getattr(self._loop, '_selector', None)
609 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200610 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500611 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200612 if polling:
613 info.append('polling')
614 else:
615 info.append('idle')
616
617 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500618 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400619 elif self._pipe is not None:
620 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200621 else:
622 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500623 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200624
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800625 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400626 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800627
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700629 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200630 if self._loop.get_debug():
631 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100632 if self._buffer:
633 self._close(BrokenPipeError())
634 else:
635 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636
637 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800638 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
639 if isinstance(data, bytearray):
640 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 if not data:
642 return
643
644 if self._conn_lost or self._closing:
645 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700646 logger.warning('pipe closed by peer or '
647 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 self._conn_lost += 1
649 return
650
651 if not self._buffer:
652 # Attempt to send it right away first.
653 try:
654 n = os.write(self._fileno, data)
655 except (BlockingIOError, InterruptedError):
656 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200657 except (SystemExit, KeyboardInterrupt):
658 raise
659 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100661 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 return
663 if n == len(data):
664 return
665 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400666 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400667 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400669 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800670 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
672 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400673 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400676 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400678 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200679 except (SystemExit, KeyboardInterrupt):
680 raise
681 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400682 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 self._conn_lost += 1
684 # Remove writer here, _fatal_error() doesn't it
685 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400686 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100687 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400689 if n == len(self._buffer):
690 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400691 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800692 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400693 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400694 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 self._call_connection_lost(None)
696 return
697 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400698 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699
700 def can_write_eof(self):
701 return True
702
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 def write_eof(self):
704 if self._closing:
705 return
706 assert self._pipe
707 self._closing = True
708 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400709 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 self._loop.call_soon(self._call_connection_lost, None)
711
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400712 def set_protocol(self, protocol):
713 self._protocol = protocol
714
715 def get_protocol(self):
716 return self._protocol
717
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500718 def is_closing(self):
719 return self._closing
720
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100722 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 # write_eof is all what we needed to close the write pipe
724 self.write_eof()
725
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100726 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900727 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100728 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900729 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100730
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 def abort(self):
732 self._close(None)
733
Victor Stinner0ee29c22014-02-19 01:40:41 +0100734 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300736 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200737 if self._loop.get_debug():
738 logger.debug("%r: %s", self, message, exc_info=True)
739 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500740 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100741 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500742 'exception': exc,
743 'transport': self,
744 'protocol': self._protocol,
745 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 self._close(exc)
747
748 def _close(self, exc=None):
749 self._closing = True
750 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400751 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400753 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 self._loop.call_soon(self._call_connection_lost, exc)
755
756 def _call_connection_lost(self, exc):
757 try:
758 self._protocol.connection_lost(exc)
759 finally:
760 self._pipe.close()
761 self._pipe = None
762 self._protocol = None
763 self._loop = None
764
765
Guido van Rossum59691282013-10-30 14:52:03 -0700766class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767
Guido van Rossum59691282013-10-30 14:52:03 -0700768 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700769 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700770 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700771 # Use a socket pair for stdin, since not all platforms
772 # support selecting read events on the write end of a
773 # socket (which we use in order to detect closing of the
774 # other end). Notably this is needed on AIX, and works
775 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100776 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200777 try:
778 self._proc = subprocess.Popen(
779 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
780 universal_newlines=False, bufsize=bufsize, **kwargs)
781 if stdin_w is not None:
782 stdin.close()
783 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
784 stdin_w = None
785 finally:
786 if stdin_w is not None:
787 stdin.close()
788 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789
790
791class AbstractChildWatcher:
792 """Abstract base class for monitoring child processes.
793
794 Objects derived from this class monitor a collection of subprocesses and
795 report their termination or interruption by a signal.
796
797 New callbacks are registered with .add_child_handler(). Starting a new
798 process must be done within a 'with' block to allow the watcher to suspend
799 its activity until the new process if fully registered (this is needed to
800 prevent a race condition in some implementations).
801
802 Example:
803 with watcher:
804 proc = subprocess.Popen("sleep 1")
805 watcher.add_child_handler(proc.pid, callback)
806
807 Notes:
808 Implementations of this class must be thread-safe.
809
810 Since child watcher objects may catch the SIGCHLD signal and call
811 waitpid(-1), there should be only one active object per process.
812 """
813
814 def add_child_handler(self, pid, callback, *args):
815 """Register a new child handler.
816
817 Arrange for callback(pid, returncode, *args) to be called when
818 process 'pid' terminates. Specifying another callback for the same
819 process replaces the previous handler.
820
Victor Stinneracdb7822014-07-14 18:33:40 +0200821 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800822 """
823 raise NotImplementedError()
824
825 def remove_child_handler(self, pid):
826 """Removes the handler for process 'pid'.
827
828 The function returns True if the handler was successfully removed,
829 False if there was nothing to remove."""
830
831 raise NotImplementedError()
832
Guido van Rossum2bcae702013-11-13 15:50:08 -0800833 def attach_loop(self, loop):
834 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800835
Guido van Rossum2bcae702013-11-13 15:50:08 -0800836 If the watcher was previously attached to an event loop, then it is
837 first detached before attaching to the new loop.
838
839 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840 """
841 raise NotImplementedError()
842
843 def close(self):
844 """Close the watcher.
845
846 This must be called to make sure that any underlying resource is freed.
847 """
848 raise NotImplementedError()
849
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300850 def is_active(self):
851 """Return ``True`` if the watcher is active and is used by the event loop.
852
853 Return True if the watcher is installed and ready to handle process exit
854 notifications.
855
856 """
857 raise NotImplementedError()
858
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859 def __enter__(self):
860 """Enter the watcher's context and allow starting new processes
861
862 This function must return self"""
863 raise NotImplementedError()
864
865 def __exit__(self, a, b, c):
866 """Exit the watcher's context"""
867 raise NotImplementedError()
868
869
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300870def _compute_returncode(status):
871 if os.WIFSIGNALED(status):
872 # The child process died because of a signal.
873 return -os.WTERMSIG(status)
874 elif os.WIFEXITED(status):
875 # The child process exited (e.g sys.exit()).
876 return os.WEXITSTATUS(status)
877 else:
878 # The child exited, but we don't understand its status.
879 # This shouldn't happen, but if it does, let's just
880 # return that status; perhaps that helps debug it.
881 return status
882
883
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884class BaseChildWatcher(AbstractChildWatcher):
885
Guido van Rossum2bcae702013-11-13 15:50:08 -0800886 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800887 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400888 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800889
890 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800891 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300893 def is_active(self):
894 return self._loop is not None and self._loop.is_running()
895
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800896 def _do_waitpid(self, expected_pid):
897 raise NotImplementedError()
898
899 def _do_waitpid_all(self):
900 raise NotImplementedError()
901
Guido van Rossum2bcae702013-11-13 15:50:08 -0800902 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903 assert loop is None or isinstance(loop, events.AbstractEventLoop)
904
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400905 if self._loop is not None and loop is None and self._callbacks:
906 warnings.warn(
907 'A loop is being detached '
908 'from a child watcher with pending handlers',
909 RuntimeWarning)
910
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911 if self._loop is not None:
912 self._loop.remove_signal_handler(signal.SIGCHLD)
913
914 self._loop = loop
915 if loop is not None:
916 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
917
918 # Prevent a race condition in case a child terminated
919 # during the switch.
920 self._do_waitpid_all()
921
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800922 def _sig_chld(self):
923 try:
924 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +0200925 except (SystemExit, KeyboardInterrupt):
926 raise
927 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -0500928 # self._loop should always be available here
929 # as '_sig_chld' is added as a signal handler
930 # in 'attach_loop'
931 self._loop.call_exception_handler({
932 'message': 'Unknown exception in SIGCHLD handler',
933 'exception': exc,
934 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800935
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800936
937class SafeChildWatcher(BaseChildWatcher):
938 """'Safe' child watcher implementation.
939
940 This implementation avoids disrupting other code spawning processes by
941 polling explicitly each process in the SIGCHLD handler instead of calling
942 os.waitpid(-1).
943
944 This is a safe solution but it has a significant overhead when handling a
945 big number of children (O(n) each time SIGCHLD is raised)
946 """
947
Guido van Rossum2bcae702013-11-13 15:50:08 -0800948 def close(self):
949 self._callbacks.clear()
950 super().close()
951
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800952 def __enter__(self):
953 return self
954
955 def __exit__(self, a, b, c):
956 pass
957
958 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100959 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800960
961 # Prevent a race condition in case the child is already terminated.
962 self._do_waitpid(pid)
963
Guido van Rossum2bcae702013-11-13 15:50:08 -0800964 def remove_child_handler(self, pid):
965 try:
966 del self._callbacks[pid]
967 return True
968 except KeyError:
969 return False
970
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800971 def _do_waitpid_all(self):
972
973 for pid in list(self._callbacks):
974 self._do_waitpid(pid)
975
976 def _do_waitpid(self, expected_pid):
977 assert expected_pid > 0
978
979 try:
980 pid, status = os.waitpid(expected_pid, os.WNOHANG)
981 except ChildProcessError:
982 # The child process is already reaped
983 # (may happen if waitpid() is called elsewhere).
984 pid = expected_pid
985 returncode = 255
986 logger.warning(
987 "Unknown child process pid %d, will report returncode 255",
988 pid)
989 else:
990 if pid == 0:
991 # The child process is still alive.
992 return
993
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300994 returncode = _compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200995 if self._loop.get_debug():
996 logger.debug('process %s exited with returncode %s',
997 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800998
999 try:
1000 callback, args = self._callbacks.pop(pid)
1001 except KeyError: # pragma: no cover
1002 # May happen if .remove_child_handler() is called
1003 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +02001004 if self._loop.get_debug():
1005 logger.warning("Child watcher got an unexpected pid: %r",
1006 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001007 else:
1008 callback(pid, returncode, *args)
1009
1010
1011class FastChildWatcher(BaseChildWatcher):
1012 """'Fast' child watcher implementation.
1013
1014 This implementation reaps every terminated processes by calling
1015 os.waitpid(-1) directly, possibly breaking other code spawning processes
1016 and waiting for their termination.
1017
1018 There is no noticeable overhead when handling a big number of children
1019 (O(1) each time a child terminates).
1020 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001021 def __init__(self):
1022 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001023 self._lock = threading.Lock()
1024 self._zombies = {}
1025 self._forks = 0
1026
1027 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001028 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001029 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001030 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001031
1032 def __enter__(self):
1033 with self._lock:
1034 self._forks += 1
1035
1036 return self
1037
1038 def __exit__(self, a, b, c):
1039 with self._lock:
1040 self._forks -= 1
1041
1042 if self._forks or not self._zombies:
1043 return
1044
1045 collateral_victims = str(self._zombies)
1046 self._zombies.clear()
1047
1048 logger.warning(
1049 "Caught subprocesses termination from unknown pids: %s",
1050 collateral_victims)
1051
1052 def add_child_handler(self, pid, callback, *args):
1053 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001054
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001055 with self._lock:
1056 try:
1057 returncode = self._zombies.pop(pid)
1058 except KeyError:
1059 # The child is running.
1060 self._callbacks[pid] = callback, args
1061 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001062
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001063 # The child is dead already. We can fire the callback.
1064 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001065
Guido van Rossum2bcae702013-11-13 15:50:08 -08001066 def remove_child_handler(self, pid):
1067 try:
1068 del self._callbacks[pid]
1069 return True
1070 except KeyError:
1071 return False
1072
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001073 def _do_waitpid_all(self):
1074 # Because of signal coalescing, we must keep calling waitpid() as
1075 # long as we're able to reap a child.
1076 while True:
1077 try:
1078 pid, status = os.waitpid(-1, os.WNOHANG)
1079 except ChildProcessError:
1080 # No more child processes exist.
1081 return
1082 else:
1083 if pid == 0:
1084 # A child process is still alive.
1085 return
1086
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001087 returncode = _compute_returncode(status)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001088
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001089 with self._lock:
1090 try:
1091 callback, args = self._callbacks.pop(pid)
1092 except KeyError:
1093 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001094 if self._forks:
1095 # It may not be registered yet.
1096 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001097 if self._loop.get_debug():
1098 logger.debug('unknown process %s exited '
1099 'with returncode %s',
1100 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001101 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001102 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001103 else:
1104 if self._loop.get_debug():
1105 logger.debug('process %s exited with returncode %s',
1106 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001107
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001108 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001109 logger.warning(
1110 "Caught subprocess termination from unknown pid: "
1111 "%d -> %d", pid, returncode)
1112 else:
1113 callback(pid, returncode, *args)
1114
1115
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001116class MultiLoopChildWatcher(AbstractChildWatcher):
1117 """A watcher that doesn't require running loop in the main thread.
1118
1119 This implementation registers a SIGCHLD signal handler on
1120 instantiation (which may conflict with other code that
1121 install own handler for this signal).
1122
1123 The solution is safe but it has a significant overhead when
1124 handling a big number of processes (*O(n)* each time a
1125 SIGCHLD is received).
1126 """
1127
1128 # Implementation note:
1129 # The class keeps compatibility with AbstractChildWatcher ABC
1130 # To achieve this it has empty attach_loop() method
1131 # and doesn't accept explicit loop argument
1132 # for add_child_handler()/remove_child_handler()
1133 # but retrieves the current loop by get_running_loop()
1134
1135 def __init__(self):
1136 self._callbacks = {}
1137 self._saved_sighandler = None
1138
1139 def is_active(self):
1140 return self._saved_sighandler is not None
1141
1142 def close(self):
1143 self._callbacks.clear()
1144 if self._saved_sighandler is not None:
1145 handler = signal.getsignal(signal.SIGCHLD)
1146 if handler != self._sig_chld:
1147 logger.warning("SIGCHLD handler was changed by outside code")
1148 else:
1149 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1150 self._saved_sighandler = None
1151
1152 def __enter__(self):
1153 return self
1154
1155 def __exit__(self, exc_type, exc_val, exc_tb):
1156 pass
1157
1158 def add_child_handler(self, pid, callback, *args):
1159 loop = events.get_running_loop()
1160 self._callbacks[pid] = (loop, callback, args)
1161
1162 # Prevent a race condition in case the child is already terminated.
1163 self._do_waitpid(pid)
1164
1165 def remove_child_handler(self, pid):
1166 try:
1167 del self._callbacks[pid]
1168 return True
1169 except KeyError:
1170 return False
1171
1172 def attach_loop(self, loop):
1173 # Don't save the loop but initialize itself if called first time
1174 # The reason to do it here is that attach_loop() is called from
1175 # unix policy only for the main thread.
1176 # Main thread is required for subscription on SIGCHLD signal
1177 if self._saved_sighandler is None:
1178 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1179 if self._saved_sighandler is None:
1180 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1181 "restore to default handler on watcher close.")
1182 self._saved_sighandler = signal.SIG_DFL
1183
1184 # Set SA_RESTART to limit EINTR occurrences.
1185 signal.siginterrupt(signal.SIGCHLD, False)
1186
1187 def _do_waitpid_all(self):
1188 for pid in list(self._callbacks):
1189 self._do_waitpid(pid)
1190
1191 def _do_waitpid(self, expected_pid):
1192 assert expected_pid > 0
1193
1194 try:
1195 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1196 except ChildProcessError:
1197 # The child process is already reaped
1198 # (may happen if waitpid() is called elsewhere).
1199 pid = expected_pid
1200 returncode = 255
1201 logger.warning(
1202 "Unknown child process pid %d, will report returncode 255",
1203 pid)
1204 debug_log = False
1205 else:
1206 if pid == 0:
1207 # The child process is still alive.
1208 return
1209
1210 returncode = _compute_returncode(status)
1211 debug_log = True
1212 try:
1213 loop, callback, args = self._callbacks.pop(pid)
1214 except KeyError: # pragma: no cover
1215 # May happen if .remove_child_handler() is called
1216 # after os.waitpid() returns.
1217 logger.warning("Child watcher got an unexpected pid: %r",
1218 pid, exc_info=True)
1219 else:
1220 if loop.is_closed():
1221 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1222 else:
1223 if debug_log and loop.get_debug():
1224 logger.debug('process %s exited with returncode %s',
1225 expected_pid, returncode)
1226 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1227
1228 def _sig_chld(self, signum, frame):
1229 try:
1230 self._do_waitpid_all()
1231 except (SystemExit, KeyboardInterrupt):
1232 raise
1233 except BaseException:
1234 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1235
1236
1237class ThreadedChildWatcher(AbstractChildWatcher):
1238 """Threaded child watcher implementation.
1239
1240 The watcher uses a thread per process
1241 for waiting for the process finish.
1242
1243 It doesn't require subscription on POSIX signal
1244 but a thread creation is not free.
1245
Min ho Kim96e12d52019-07-22 06:12:33 +10001246 The watcher has O(1) complexity, its performance doesn't depend
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001247 on amount of spawn processes.
1248 """
1249
1250 def __init__(self):
1251 self._pid_counter = itertools.count(0)
1252 self._threads = {}
1253
1254 def is_active(self):
1255 return True
1256
1257 def close(self):
1258 pass
1259
1260 def __enter__(self):
1261 return self
1262
1263 def __exit__(self, exc_type, exc_val, exc_tb):
1264 pass
1265
1266 def __del__(self, _warn=warnings.warn):
1267 threads = [thread for thread in list(self._threads.values())
1268 if thread.is_alive()]
1269 if threads:
1270 _warn(f"{self.__class__} has registered but not finished child processes",
1271 ResourceWarning,
1272 source=self)
1273
1274 def add_child_handler(self, pid, callback, *args):
1275 loop = events.get_running_loop()
1276 thread = threading.Thread(target=self._do_waitpid,
1277 name=f"waitpid-{next(self._pid_counter)}",
1278 args=(loop, pid, callback, args),
1279 daemon=True)
1280 self._threads[pid] = thread
1281 thread.start()
1282
1283 def remove_child_handler(self, pid):
1284 # asyncio never calls remove_child_handler() !!!
1285 # The method is no-op but is implemented because
1286 # abstract base classe requires it
1287 return True
1288
1289 def attach_loop(self, loop):
1290 pass
1291
1292 def _do_waitpid(self, loop, expected_pid, callback, args):
1293 assert expected_pid > 0
1294
1295 try:
1296 pid, status = os.waitpid(expected_pid, 0)
1297 except ChildProcessError:
1298 # The child process is already reaped
1299 # (may happen if waitpid() is called elsewhere).
1300 pid = expected_pid
1301 returncode = 255
1302 logger.warning(
1303 "Unknown child process pid %d, will report returncode 255",
1304 pid)
1305 else:
1306 returncode = _compute_returncode(status)
1307 if loop.get_debug():
1308 logger.debug('process %s exited with returncode %s',
1309 expected_pid, returncode)
1310
1311 if loop.is_closed():
1312 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1313 else:
1314 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1315
1316 self._threads.pop(expected_pid)
1317
1318
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001319class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001320 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001321 _loop_factory = _UnixSelectorEventLoop
1322
1323 def __init__(self):
1324 super().__init__()
1325 self._watcher = None
1326
1327 def _init_watcher(self):
1328 with events._lock:
1329 if self._watcher is None: # pragma: no branch
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001330 self._watcher = ThreadedChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001331 if isinstance(threading.current_thread(),
1332 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001333 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001334
1335 def set_event_loop(self, loop):
1336 """Set the event loop.
1337
1338 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001339 .set_event_loop() from the main thread will call .attach_loop(loop) on
1340 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001341 """
1342
1343 super().set_event_loop(loop)
1344
Andrew Svetlovcc839202017-11-29 18:23:43 +02001345 if (self._watcher is not None and
1346 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001347 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001348
1349 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001350 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001351
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001352 If not yet set, a ThreadedChildWatcher object is automatically created.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001353 """
1354 if self._watcher is None:
1355 self._init_watcher()
1356
1357 return self._watcher
1358
1359 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001360 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001361
1362 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1363
1364 if self._watcher is not None:
1365 self._watcher.close()
1366
1367 self._watcher = watcher
1368
Yury Selivanov6370f342017-12-10 18:36:12 -05001369
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001370SelectorEventLoop = _UnixSelectorEventLoop
1371DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy