blob: 181e1885152fabb4059297d174ac33c4a4a385fb [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Andrew Svetlov0d671c02019-06-30 12:54:59 +03005import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01007import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import signal
9import socket
10import stat
11import subprocess
12import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
Kyle Stanley3f8cebd2019-11-14 21:47:56 -050032 'FastChildWatcher', 'PidfdChildWatcher',
Andrew Svetlov0d671c02019-06-30 12:54:59 +030033 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050035)
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
Victor Stinnerfe5649c2014-07-17 22:43:40 +020042def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
Victor Stinner99d28c52020-12-16 12:11:24 +010047def waitstatus_to_exitcode(status):
48 try:
49 return os.waitstatus_to_exitcode(status)
50 except ValueError:
51 # The child exited, but we don't understand its status.
52 # This shouldn't happen, but if it does, let's just
53 # return that status; perhaps that helps debug it.
54 return status
55
56
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080057class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050058 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Yury Selivanovb057c522014-02-18 12:15:06 -050060 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061 """
62
63 def __init__(self, selector=None):
64 super().__init__(selector)
65 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080067 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020068 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020069 if not sys.is_finalizing():
70 for sig in list(self._signal_handlers):
71 self.remove_signal_handler(sig)
72 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020073 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020074 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020075 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020076 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020077 ResourceWarning,
78 source=self)
79 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080080
Victor Stinnerfe5649c2014-07-17 22:43:40 +020081 def _process_self_data(self, data):
82 for signum in data:
83 if not signum:
84 # ignore null bytes written by _write_to_self()
85 continue
86 self._handle_signal(signum)
87
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 def add_signal_handler(self, sig, callback, *args):
89 """Add a handler for a signal. UNIX only.
90
91 Raise ValueError if the signal number is invalid or uncatchable.
92 Raise RuntimeError if there is a problem setting up the handler.
93 """
Yury Selivanov6370f342017-12-10 18:36:12 -050094 if (coroutines.iscoroutine(callback) or
95 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010096 raise TypeError("coroutines cannot be used "
97 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010099 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 try:
101 # set_wakeup_fd() raises ValueError if this is not the
102 # main thread. By calling it early we ensure that an
103 # event loop running in another thread cannot add a signal
104 # handler.
105 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +0200106 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 raise RuntimeError(str(exc))
108
Yury Selivanovf23746a2018-01-22 19:11:18 -0500109 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 self._signal_handlers[sig] = handle
111
112 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200113 # Register a dummy signal handler to ask Python to write the signal
Kunal Bhallaf2947e32020-05-20 13:12:37 -0400114 # number in the wakeup file descriptor. _process_self_data() will
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200115 # read signal numbers from this file descriptor to handle signals.
116 signal.signal(sig, _sighandler_noop)
117
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100118 # Set SA_RESTART to limit EINTR occurrences.
119 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 except OSError as exc:
121 del self._signal_handlers[sig]
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200125 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 if exc.errno == errno.EINVAL:
Roberto Hueso55e5c682021-05-01 22:34:29 +0200129 raise RuntimeError(f'sig {sig:d} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130 else:
131 raise
132
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200133 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134 """Internal helper that is the actual signal handler."""
135 handle = self._signal_handlers.get(sig)
136 if handle is None:
137 return # Assume it's some race condition.
138 if handle._cancelled:
139 self.remove_signal_handler(sig) # Remove it properly.
140 else:
141 self._add_callback_signalsafe(handle)
142
143 def remove_signal_handler(self, sig):
144 """Remove a handler for a signal. UNIX only.
145
146 Return True if a signal handler was removed, False if not.
147 """
148 self._check_signal(sig)
149 try:
150 del self._signal_handlers[sig]
151 except KeyError:
152 return False
153
154 if sig == signal.SIGINT:
155 handler = signal.default_int_handler
156 else:
157 handler = signal.SIG_DFL
158
159 try:
160 signal.signal(sig, handler)
161 except OSError as exc:
162 if exc.errno == errno.EINVAL:
Roberto Hueso55e5c682021-05-01 22:34:29 +0200163 raise RuntimeError(f'sig {sig:d} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 else:
165 raise
166
167 if not self._signal_handlers:
168 try:
169 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200170 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700171 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 return True
174
175 def _check_signal(self, sig):
176 """Internal helper to validate a signal.
177
178 Raise ValueError if the signal number is invalid or uncatchable.
179 Raise RuntimeError if there is a problem setting up the handler.
180 """
181 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500182 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200184 if sig not in signal.valid_signals():
185 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188 extra=None):
189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192 extra=None):
193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200195 async def _make_subprocess_transport(self, protocol, args, shell,
196 stdin, stdout, stderr, bufsize,
197 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800198 with events.get_child_watcher() as watcher:
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300199 if not watcher.is_active():
200 # Check early.
201 # Raising exception before process creation
202 # prevents subprocess execution if the watcher
203 # is not ready to handle it.
204 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
205 "subprocess support is not installed.")
Yury Selivanov7661db62016-05-16 15:38:39 -0400206 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800207 transp = _UnixSubprocessTransport(self, protocol, args, shell,
208 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100209 waiter=waiter, extra=extra,
210 **kwargs)
211
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800212 watcher.add_child_handler(transp.get_pid(),
213 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100214 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200215 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200216 except (SystemExit, KeyboardInterrupt):
217 raise
218 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100219 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200220 await transp._wait()
221 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800222
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 return transp
224
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800225 def _child_watcher_callback(self, pid, returncode, transp):
226 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
Neil Aspinallf7686c12017-12-19 19:45:42 +0000228 async def create_unix_connection(
229 self, protocol_factory, path=None, *,
230 ssl=None, sock=None,
231 server_hostname=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300232 ssl_handshake_timeout=None,
233 ssl_shutdown_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 assert server_hostname is None or isinstance(server_hostname, str)
235 if ssl:
236 if server_hostname is None:
237 raise ValueError(
238 'you have to pass server_hostname when using ssl')
239 else:
240 if server_hostname is not None:
241 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200242 if ssl_handshake_timeout is not None:
243 raise ValueError(
244 'ssl_handshake_timeout is only meaningful with ssl')
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300245 if ssl_shutdown_timeout is not None:
246 raise ValueError(
247 'ssl_shutdown_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500248
249 if path is not None:
250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Andrew Svetlovcc839202017-11-29 18:23:43 +0200254 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100255 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200258 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100259 except:
260 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500261 raise
262
263 else:
264 if sock is None:
265 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400266 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500267 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400268 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500269 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 sock.setblocking(False)
271
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200272 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000273 sock, protocol_factory, ssl, server_hostname,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300274 ssl_handshake_timeout=ssl_handshake_timeout,
275 ssl_shutdown_timeout=ssl_shutdown_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500276 return transport, protocol
277
Neil Aspinallf7686c12017-12-19 19:45:42 +0000278 async def create_unix_server(
279 self, protocol_factory, path=None, *,
280 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500281 ssl_handshake_timeout=None,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300282 ssl_shutdown_timeout=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500283 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 if isinstance(ssl, bool):
285 raise TypeError('ssl argument must be an SSLContext or None')
286
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200287 if ssl_handshake_timeout is not None and not ssl:
288 raise ValueError(
289 'ssl_handshake_timeout is only meaningful with ssl')
290
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300291 if ssl_shutdown_timeout is not None and not ssl:
292 raise ValueError(
293 'ssl_shutdown_timeout is only meaningful with ssl')
294
Yury Selivanovb057c522014-02-18 12:15:06 -0500295 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200296 if sock is not None:
297 raise ValueError(
298 'path and sock can not be specified at the same time')
299
Andrew Svetlovcc839202017-11-29 18:23:43 +0200300 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500301 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
302
Yury Selivanov908d55d2016-10-09 12:15:08 -0400303 # Check for abstract socket. `str` and `bytes` paths are supported.
304 if path[0] not in (0, '\x00'):
305 try:
306 if stat.S_ISSOCK(os.stat(path).st_mode):
307 os.remove(path)
308 except FileNotFoundError:
309 pass
310 except OSError as err:
311 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200312 logger.error('Unable to check or remove stale UNIX socket '
313 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400314
Yury Selivanovb057c522014-02-18 12:15:06 -0500315 try:
316 sock.bind(path)
317 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100318 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500319 if exc.errno == errno.EADDRINUSE:
320 # Let's improve the error message by adding
321 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500322 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500323 raise OSError(errno.EADDRINUSE, msg) from None
324 else:
325 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200326 except:
327 sock.close()
328 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500329 else:
330 if sock is None:
331 raise ValueError(
332 'path was not specified, and no sock specified')
333
Yury Selivanov36e7e972016-10-07 12:39:57 -0400334 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500335 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500336 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500337 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500338
Yury Selivanovb057c522014-02-18 12:15:06 -0500339 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500340 server = base_events.Server(self, [sock], protocol_factory,
Andrew Svetlov5fb06ed2021-05-03 00:34:15 +0300341 ssl, backlog, ssl_handshake_timeout,
342 ssl_shutdown_timeout)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500343 if start_serving:
344 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400345 # Skip one loop iteration so that all 'loop.add_reader'
346 # go through.
Yurii Karabase4fe3032020-11-28 10:21:17 +0200347 await tasks.sleep(0)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500348
Yury Selivanovb057c522014-02-18 12:15:06 -0500349 return server
350
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200351 async def _sock_sendfile_native(self, sock, file, offset, count):
352 try:
353 os.sendfile
Pablo Galindo293dd232019-11-19 21:34:03 +0000354 except AttributeError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700355 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200356 "os.sendfile() is not available")
357 try:
358 fileno = file.fileno()
359 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700360 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200361 try:
362 fsize = os.fstat(fileno).st_size
Pablo Galindo293dd232019-11-19 21:34:03 +0000363 except OSError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700364 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200365 blocksize = count if count else fsize
366 if not blocksize:
367 return 0 # empty file
368
369 fut = self.create_future()
370 self._sock_sendfile_native_impl(fut, None, sock, fileno,
371 offset, count, blocksize, 0)
372 return await fut
373
374 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
375 offset, count, blocksize, total_sent):
376 fd = sock.fileno()
377 if registered_fd is not None:
378 # Remove the callback early. It should be rare that the
379 # selector says the fd is ready but the call still returns
380 # EAGAIN, and I am willing to take a hit in that case in
381 # order to simplify the common case.
382 self.remove_writer(registered_fd)
383 if fut.cancelled():
384 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
385 return
386 if count:
387 blocksize = count - total_sent
388 if blocksize <= 0:
389 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
390 fut.set_result(total_sent)
391 return
392
393 try:
394 sent = os.sendfile(fd, fileno, offset, blocksize)
395 except (BlockingIOError, InterruptedError):
396 if registered_fd is None:
397 self._sock_add_cancellation_callback(fut, sock)
398 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
399 fd, sock, fileno,
400 offset, count, blocksize, total_sent)
401 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500402 if (registered_fd is not None and
403 exc.errno == errno.ENOTCONN and
404 type(exc) is not ConnectionError):
405 # If we have an ENOTCONN and this isn't a first call to
406 # sendfile(), i.e. the connection was closed in the middle
407 # of the operation, normalize the error to ConnectionError
408 # to make it consistent across all Posix systems.
409 new_exc = ConnectionError(
410 "socket is not connected", errno.ENOTCONN)
411 new_exc.__cause__ = exc
412 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200413 if total_sent == 0:
414 # We can get here for different reasons, the main
415 # one being 'file' is not a regular mmap(2)-like
416 # file, in which case we'll fall back on using
417 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700418 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200419 "os.sendfile call failed")
420 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
421 fut.set_exception(err)
422 else:
423 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
424 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200425 except (SystemExit, KeyboardInterrupt):
426 raise
427 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200428 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
429 fut.set_exception(exc)
430 else:
431 if sent == 0:
432 # EOF
433 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
434 fut.set_result(total_sent)
435 else:
436 offset += sent
437 total_sent += sent
438 if registered_fd is None:
439 self._sock_add_cancellation_callback(fut, sock)
440 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
441 fd, sock, fileno,
442 offset, count, blocksize, total_sent)
443
444 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
445 if total_sent > 0:
446 os.lseek(fileno, offset, os.SEEK_SET)
447
448 def _sock_add_cancellation_callback(self, fut, sock):
449 def cb(fut):
450 if fut.cancelled():
451 fd = sock.fileno()
452 if fd != -1:
453 self.remove_writer(fd)
454 fut.add_done_callback(cb)
455
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457class _UnixReadPipeTransport(transports.ReadTransport):
458
Yury Selivanovdec1a452014-02-18 22:27:48 -0500459 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
462 super().__init__(extra)
463 self._extra['pipe'] = pipe
464 self._loop = loop
465 self._pipe = pipe
466 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700467 self._protocol = protocol
468 self._closing = False
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300469 self._paused = False
Guido van Rossum47867872016-08-31 09:42:38 -0700470
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700471 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800472 if not (stat.S_ISFIFO(mode) or
473 stat.S_ISSOCK(mode) or
474 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700475 self._pipe = None
476 self._fileno = None
477 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700478 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700479
Andrew Svetlovcc839202017-11-29 18:23:43 +0200480 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700481
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100483 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400484 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100485 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100487 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500488 self._loop.call_soon(futures._set_result_unless_cancelled,
489 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490
Victor Stinnere912e652014-07-12 03:11:53 +0200491 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100492 info = [self.__class__.__name__]
493 if self._pipe is None:
494 info.append('closed')
495 elif self._closing:
496 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500497 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400498 selector = getattr(self._loop, '_selector', None)
499 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200500 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500501 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200502 if polling:
503 info.append('polling')
504 else:
505 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400506 elif self._pipe is not None:
507 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200508 else:
509 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500510 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200511
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512 def _read_ready(self):
513 try:
514 data = os.read(self._fileno, self.max_size)
515 except (BlockingIOError, InterruptedError):
516 pass
517 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100518 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 else:
520 if data:
521 self._protocol.data_received(data)
522 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200523 if self._loop.get_debug():
524 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400526 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 self._loop.call_soon(self._protocol.eof_received)
528 self._loop.call_soon(self._call_connection_lost, None)
529
Guido van Rossum57497ad2013-10-18 07:58:20 -0700530 def pause_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300531 if self._closing or self._paused:
532 return
533 self._paused = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400534 self._loop._remove_reader(self._fileno)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300535 if self._loop.get_debug():
536 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
Guido van Rossum57497ad2013-10-18 07:58:20 -0700538 def resume_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300539 if self._closing or not self._paused:
540 return
541 self._paused = False
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400542 self._loop._add_reader(self._fileno, self._read_ready)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300543 if self._loop.get_debug():
544 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400546 def set_protocol(self, protocol):
547 self._protocol = protocol
548
549 def get_protocol(self):
550 return self._protocol
551
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500552 def is_closing(self):
553 return self._closing
554
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 def close(self):
556 if not self._closing:
557 self._close(None)
558
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100559 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900560 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100561 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900562 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100563
Victor Stinner0ee29c22014-02-19 01:40:41 +0100564 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200566 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
567 if self._loop.get_debug():
568 logger.debug("%r: %s", self, message, exc_info=True)
569 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500570 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100571 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500572 'exception': exc,
573 'transport': self,
574 'protocol': self._protocol,
575 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 self._close(exc)
577
578 def _close(self, exc):
579 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 self._loop.call_soon(self._call_connection_lost, exc)
582
583 def _call_connection_lost(self, exc):
584 try:
585 self._protocol.connection_lost(exc)
586 finally:
587 self._pipe.close()
588 self._pipe = None
589 self._protocol = None
590 self._loop = None
591
592
Yury Selivanov3cb99142014-02-18 18:41:13 -0500593class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800594 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595
596 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100597 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 self._pipe = pipe
600 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400602 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 self._conn_lost = 0
604 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700605
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700606 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700607 is_char = stat.S_ISCHR(mode)
608 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700609 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700610 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700611 self._pipe = None
612 self._fileno = None
613 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100614 raise ValueError("Pipe transport is only for "
615 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700616
Andrew Svetlovcc839202017-11-29 18:23:43 +0200617 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100619
620 # On AIX, the reader trick (to be notified when the read end of the
621 # socket is closed) only works for sockets. On other platforms it
622 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700623 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100624 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400625 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100626 self._fileno, self._read_ready)
627
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100629 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500630 self._loop.call_soon(futures._set_result_unless_cancelled,
631 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632
Victor Stinnere912e652014-07-12 03:11:53 +0200633 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100634 info = [self.__class__.__name__]
635 if self._pipe is None:
636 info.append('closed')
637 elif self._closing:
638 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500639 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400640 selector = getattr(self._loop, '_selector', None)
641 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200642 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500643 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200644 if polling:
645 info.append('polling')
646 else:
647 info.append('idle')
648
649 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500650 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400651 elif self._pipe is not None:
652 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200653 else:
654 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500655 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200656
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800657 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400658 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800659
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700661 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200662 if self._loop.get_debug():
663 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100664 if self._buffer:
665 self._close(BrokenPipeError())
666 else:
667 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
669 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800670 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
671 if isinstance(data, bytearray):
672 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 if not data:
674 return
675
676 if self._conn_lost or self._closing:
677 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700678 logger.warning('pipe closed by peer or '
679 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 self._conn_lost += 1
681 return
682
683 if not self._buffer:
684 # Attempt to send it right away first.
685 try:
686 n = os.write(self._fileno, data)
687 except (BlockingIOError, InterruptedError):
688 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200689 except (SystemExit, KeyboardInterrupt):
690 raise
691 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100693 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 return
695 if n == len(data):
696 return
697 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400698 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400699 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400701 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800702 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703
704 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400705 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400708 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400710 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200711 except (SystemExit, KeyboardInterrupt):
712 raise
713 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400714 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 self._conn_lost += 1
716 # Remove writer here, _fatal_error() doesn't it
717 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400718 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100719 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400721 if n == len(self._buffer):
722 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400723 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800724 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400725 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400726 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727 self._call_connection_lost(None)
728 return
729 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400730 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731
732 def can_write_eof(self):
733 return True
734
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 def write_eof(self):
736 if self._closing:
737 return
738 assert self._pipe
739 self._closing = True
740 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400741 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 self._loop.call_soon(self._call_connection_lost, None)
743
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400744 def set_protocol(self, protocol):
745 self._protocol = protocol
746
747 def get_protocol(self):
748 return self._protocol
749
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500750 def is_closing(self):
751 return self._closing
752
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100754 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 # write_eof is all what we needed to close the write pipe
756 self.write_eof()
757
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100758 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900759 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100760 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900761 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100762
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700763 def abort(self):
764 self._close(None)
765
Victor Stinner0ee29c22014-02-19 01:40:41 +0100766 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300768 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200769 if self._loop.get_debug():
770 logger.debug("%r: %s", self, message, exc_info=True)
771 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500772 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100773 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500774 'exception': exc,
775 'transport': self,
776 'protocol': self._protocol,
777 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 self._close(exc)
779
780 def _close(self, exc=None):
781 self._closing = True
782 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400783 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400785 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700786 self._loop.call_soon(self._call_connection_lost, exc)
787
788 def _call_connection_lost(self, exc):
789 try:
790 self._protocol.connection_lost(exc)
791 finally:
792 self._pipe.close()
793 self._pipe = None
794 self._protocol = None
795 self._loop = None
796
797
Guido van Rossum59691282013-10-30 14:52:03 -0700798class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799
Guido van Rossum59691282013-10-30 14:52:03 -0700800 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700801 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700802 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700803 # Use a socket pair for stdin, since not all platforms
804 # support selecting read events on the write end of a
805 # socket (which we use in order to detect closing of the
806 # other end). Notably this is needed on AIX, and works
807 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100808 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200809 try:
810 self._proc = subprocess.Popen(
811 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
812 universal_newlines=False, bufsize=bufsize, **kwargs)
813 if stdin_w is not None:
814 stdin.close()
815 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
816 stdin_w = None
817 finally:
818 if stdin_w is not None:
819 stdin.close()
820 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821
822
823class AbstractChildWatcher:
824 """Abstract base class for monitoring child processes.
825
826 Objects derived from this class monitor a collection of subprocesses and
827 report their termination or interruption by a signal.
828
829 New callbacks are registered with .add_child_handler(). Starting a new
830 process must be done within a 'with' block to allow the watcher to suspend
831 its activity until the new process if fully registered (this is needed to
832 prevent a race condition in some implementations).
833
834 Example:
835 with watcher:
836 proc = subprocess.Popen("sleep 1")
837 watcher.add_child_handler(proc.pid, callback)
838
839 Notes:
840 Implementations of this class must be thread-safe.
841
842 Since child watcher objects may catch the SIGCHLD signal and call
843 waitpid(-1), there should be only one active object per process.
844 """
845
846 def add_child_handler(self, pid, callback, *args):
847 """Register a new child handler.
848
849 Arrange for callback(pid, returncode, *args) to be called when
850 process 'pid' terminates. Specifying another callback for the same
851 process replaces the previous handler.
852
Victor Stinneracdb7822014-07-14 18:33:40 +0200853 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854 """
855 raise NotImplementedError()
856
857 def remove_child_handler(self, pid):
858 """Removes the handler for process 'pid'.
859
860 The function returns True if the handler was successfully removed,
861 False if there was nothing to remove."""
862
863 raise NotImplementedError()
864
Guido van Rossum2bcae702013-11-13 15:50:08 -0800865 def attach_loop(self, loop):
866 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800867
Guido van Rossum2bcae702013-11-13 15:50:08 -0800868 If the watcher was previously attached to an event loop, then it is
869 first detached before attaching to the new loop.
870
871 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800872 """
873 raise NotImplementedError()
874
875 def close(self):
876 """Close the watcher.
877
878 This must be called to make sure that any underlying resource is freed.
879 """
880 raise NotImplementedError()
881
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300882 def is_active(self):
883 """Return ``True`` if the watcher is active and is used by the event loop.
884
885 Return True if the watcher is installed and ready to handle process exit
886 notifications.
887
888 """
889 raise NotImplementedError()
890
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891 def __enter__(self):
892 """Enter the watcher's context and allow starting new processes
893
894 This function must return self"""
895 raise NotImplementedError()
896
897 def __exit__(self, a, b, c):
898 """Exit the watcher's context"""
899 raise NotImplementedError()
900
901
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800902class PidfdChildWatcher(AbstractChildWatcher):
903 """Child watcher implementation using Linux's pid file descriptors.
904
905 This child watcher polls process file descriptors (pidfds) to await child
906 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
907 child watcher implementation. It doesn't require signals or threads, doesn't
908 interfere with any processes launched outside the event loop, and scales
909 linearly with the number of subprocesses launched by the event loop. The
910 main disadvantage is that pidfds are specific to Linux, and only work on
911 recent (5.3+) kernels.
912 """
913
914 def __init__(self):
915 self._loop = None
916 self._callbacks = {}
917
918 def __enter__(self):
919 return self
920
921 def __exit__(self, exc_type, exc_value, exc_traceback):
922 pass
923
924 def is_active(self):
925 return self._loop is not None and self._loop.is_running()
926
927 def close(self):
928 self.attach_loop(None)
929
930 def attach_loop(self, loop):
931 if self._loop is not None and loop is None and self._callbacks:
932 warnings.warn(
933 'A loop is being detached '
934 'from a child watcher with pending handlers',
935 RuntimeWarning)
936 for pidfd, _, _ in self._callbacks.values():
937 self._loop._remove_reader(pidfd)
938 os.close(pidfd)
939 self._callbacks.clear()
940 self._loop = loop
941
942 def add_child_handler(self, pid, callback, *args):
943 existing = self._callbacks.get(pid)
944 if existing is not None:
945 self._callbacks[pid] = existing[0], callback, args
946 else:
947 pidfd = os.pidfd_open(pid)
948 self._loop._add_reader(pidfd, self._do_wait, pid)
949 self._callbacks[pid] = pidfd, callback, args
950
951 def _do_wait(self, pid):
952 pidfd, callback, args = self._callbacks.pop(pid)
953 self._loop._remove_reader(pidfd)
Victor Stinnere76ee1a2019-12-06 16:32:41 +0100954 try:
955 _, status = os.waitpid(pid, 0)
956 except ChildProcessError:
957 # The child process is already reaped
958 # (may happen if waitpid() is called elsewhere).
959 returncode = 255
960 logger.warning(
961 "child process pid %d exit status already read: "
962 " will report returncode 255",
963 pid)
964 else:
Victor Stinner99d28c52020-12-16 12:11:24 +0100965 returncode = waitstatus_to_exitcode(status)
Victor Stinnere76ee1a2019-12-06 16:32:41 +0100966
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800967 os.close(pidfd)
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800968 callback(pid, returncode, *args)
969
970 def remove_child_handler(self, pid):
971 try:
972 pidfd, _, _ = self._callbacks.pop(pid)
973 except KeyError:
974 return False
975 self._loop._remove_reader(pidfd)
976 os.close(pidfd)
977 return True
978
979
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980class BaseChildWatcher(AbstractChildWatcher):
981
Guido van Rossum2bcae702013-11-13 15:50:08 -0800982 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800983 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400984 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800985
986 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800987 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800988
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300989 def is_active(self):
990 return self._loop is not None and self._loop.is_running()
991
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800992 def _do_waitpid(self, expected_pid):
993 raise NotImplementedError()
994
995 def _do_waitpid_all(self):
996 raise NotImplementedError()
997
Guido van Rossum2bcae702013-11-13 15:50:08 -0800998 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800999 assert loop is None or isinstance(loop, events.AbstractEventLoop)
1000
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001001 if self._loop is not None and loop is None and self._callbacks:
1002 warnings.warn(
1003 'A loop is being detached '
1004 'from a child watcher with pending handlers',
1005 RuntimeWarning)
1006
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001007 if self._loop is not None:
1008 self._loop.remove_signal_handler(signal.SIGCHLD)
1009
1010 self._loop = loop
1011 if loop is not None:
1012 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1013
1014 # Prevent a race condition in case a child terminated
1015 # during the switch.
1016 self._do_waitpid_all()
1017
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001018 def _sig_chld(self):
1019 try:
1020 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +02001021 except (SystemExit, KeyboardInterrupt):
1022 raise
1023 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001024 # self._loop should always be available here
1025 # as '_sig_chld' is added as a signal handler
1026 # in 'attach_loop'
1027 self._loop.call_exception_handler({
1028 'message': 'Unknown exception in SIGCHLD handler',
1029 'exception': exc,
1030 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001031
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001032
1033class SafeChildWatcher(BaseChildWatcher):
1034 """'Safe' child watcher implementation.
1035
1036 This implementation avoids disrupting other code spawning processes by
1037 polling explicitly each process in the SIGCHLD handler instead of calling
1038 os.waitpid(-1).
1039
1040 This is a safe solution but it has a significant overhead when handling a
1041 big number of children (O(n) each time SIGCHLD is raised)
1042 """
1043
Guido van Rossum2bcae702013-11-13 15:50:08 -08001044 def close(self):
1045 self._callbacks.clear()
1046 super().close()
1047
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001048 def __enter__(self):
1049 return self
1050
1051 def __exit__(self, a, b, c):
1052 pass
1053
1054 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +01001055 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001056
1057 # Prevent a race condition in case the child is already terminated.
1058 self._do_waitpid(pid)
1059
Guido van Rossum2bcae702013-11-13 15:50:08 -08001060 def remove_child_handler(self, pid):
1061 try:
1062 del self._callbacks[pid]
1063 return True
1064 except KeyError:
1065 return False
1066
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001067 def _do_waitpid_all(self):
1068
1069 for pid in list(self._callbacks):
1070 self._do_waitpid(pid)
1071
1072 def _do_waitpid(self, expected_pid):
1073 assert expected_pid > 0
1074
1075 try:
1076 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1077 except ChildProcessError:
1078 # The child process is already reaped
1079 # (may happen if waitpid() is called elsewhere).
1080 pid = expected_pid
1081 returncode = 255
1082 logger.warning(
1083 "Unknown child process pid %d, will report returncode 255",
1084 pid)
1085 else:
1086 if pid == 0:
1087 # The child process is still alive.
1088 return
1089
Victor Stinner99d28c52020-12-16 12:11:24 +01001090 returncode = waitstatus_to_exitcode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +02001091 if self._loop.get_debug():
1092 logger.debug('process %s exited with returncode %s',
1093 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001094
1095 try:
1096 callback, args = self._callbacks.pop(pid)
1097 except KeyError: # pragma: no cover
1098 # May happen if .remove_child_handler() is called
1099 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +02001100 if self._loop.get_debug():
1101 logger.warning("Child watcher got an unexpected pid: %r",
1102 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001103 else:
1104 callback(pid, returncode, *args)
1105
1106
1107class FastChildWatcher(BaseChildWatcher):
1108 """'Fast' child watcher implementation.
1109
1110 This implementation reaps every terminated processes by calling
1111 os.waitpid(-1) directly, possibly breaking other code spawning processes
1112 and waiting for their termination.
1113
1114 There is no noticeable overhead when handling a big number of children
1115 (O(1) each time a child terminates).
1116 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001117 def __init__(self):
1118 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001119 self._lock = threading.Lock()
1120 self._zombies = {}
1121 self._forks = 0
1122
1123 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001124 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001125 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001126 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001127
1128 def __enter__(self):
1129 with self._lock:
1130 self._forks += 1
1131
1132 return self
1133
1134 def __exit__(self, a, b, c):
1135 with self._lock:
1136 self._forks -= 1
1137
1138 if self._forks or not self._zombies:
1139 return
1140
1141 collateral_victims = str(self._zombies)
1142 self._zombies.clear()
1143
1144 logger.warning(
1145 "Caught subprocesses termination from unknown pids: %s",
1146 collateral_victims)
1147
1148 def add_child_handler(self, pid, callback, *args):
1149 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001150
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001151 with self._lock:
1152 try:
1153 returncode = self._zombies.pop(pid)
1154 except KeyError:
1155 # The child is running.
1156 self._callbacks[pid] = callback, args
1157 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001158
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001159 # The child is dead already. We can fire the callback.
1160 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001161
Guido van Rossum2bcae702013-11-13 15:50:08 -08001162 def remove_child_handler(self, pid):
1163 try:
1164 del self._callbacks[pid]
1165 return True
1166 except KeyError:
1167 return False
1168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001169 def _do_waitpid_all(self):
1170 # Because of signal coalescing, we must keep calling waitpid() as
1171 # long as we're able to reap a child.
1172 while True:
1173 try:
1174 pid, status = os.waitpid(-1, os.WNOHANG)
1175 except ChildProcessError:
1176 # No more child processes exist.
1177 return
1178 else:
1179 if pid == 0:
1180 # A child process is still alive.
1181 return
1182
Victor Stinner99d28c52020-12-16 12:11:24 +01001183 returncode = waitstatus_to_exitcode(status)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001184
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001185 with self._lock:
1186 try:
1187 callback, args = self._callbacks.pop(pid)
1188 except KeyError:
1189 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001190 if self._forks:
1191 # It may not be registered yet.
1192 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001193 if self._loop.get_debug():
1194 logger.debug('unknown process %s exited '
1195 'with returncode %s',
1196 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001197 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001198 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001199 else:
1200 if self._loop.get_debug():
1201 logger.debug('process %s exited with returncode %s',
1202 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001203
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001204 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001205 logger.warning(
1206 "Caught subprocess termination from unknown pid: "
1207 "%d -> %d", pid, returncode)
1208 else:
1209 callback(pid, returncode, *args)
1210
1211
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001212class MultiLoopChildWatcher(AbstractChildWatcher):
1213 """A watcher that doesn't require running loop in the main thread.
1214
1215 This implementation registers a SIGCHLD signal handler on
1216 instantiation (which may conflict with other code that
1217 install own handler for this signal).
1218
1219 The solution is safe but it has a significant overhead when
1220 handling a big number of processes (*O(n)* each time a
1221 SIGCHLD is received).
1222 """
1223
1224 # Implementation note:
1225 # The class keeps compatibility with AbstractChildWatcher ABC
1226 # To achieve this it has empty attach_loop() method
1227 # and doesn't accept explicit loop argument
1228 # for add_child_handler()/remove_child_handler()
1229 # but retrieves the current loop by get_running_loop()
1230
1231 def __init__(self):
1232 self._callbacks = {}
1233 self._saved_sighandler = None
1234
1235 def is_active(self):
1236 return self._saved_sighandler is not None
1237
1238 def close(self):
1239 self._callbacks.clear()
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001240 if self._saved_sighandler is None:
1241 return
1242
1243 handler = signal.getsignal(signal.SIGCHLD)
1244 if handler != self._sig_chld:
1245 logger.warning("SIGCHLD handler was changed by outside code")
1246 else:
1247 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1248 self._saved_sighandler = None
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001249
1250 def __enter__(self):
1251 return self
1252
1253 def __exit__(self, exc_type, exc_val, exc_tb):
1254 pass
1255
1256 def add_child_handler(self, pid, callback, *args):
1257 loop = events.get_running_loop()
1258 self._callbacks[pid] = (loop, callback, args)
1259
1260 # Prevent a race condition in case the child is already terminated.
1261 self._do_waitpid(pid)
1262
1263 def remove_child_handler(self, pid):
1264 try:
1265 del self._callbacks[pid]
1266 return True
1267 except KeyError:
1268 return False
1269
1270 def attach_loop(self, loop):
1271 # Don't save the loop but initialize itself if called first time
1272 # The reason to do it here is that attach_loop() is called from
1273 # unix policy only for the main thread.
1274 # Main thread is required for subscription on SIGCHLD signal
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001275 if self._saved_sighandler is not None:
1276 return
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001277
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001278 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1279 if self._saved_sighandler is None:
1280 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1281 "restore to default handler on watcher close.")
1282 self._saved_sighandler = signal.SIG_DFL
1283
1284 # Set SA_RESTART to limit EINTR occurrences.
1285 signal.siginterrupt(signal.SIGCHLD, False)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001286
1287 def _do_waitpid_all(self):
1288 for pid in list(self._callbacks):
1289 self._do_waitpid(pid)
1290
1291 def _do_waitpid(self, expected_pid):
1292 assert expected_pid > 0
1293
1294 try:
1295 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1296 except ChildProcessError:
1297 # The child process is already reaped
1298 # (may happen if waitpid() is called elsewhere).
1299 pid = expected_pid
1300 returncode = 255
1301 logger.warning(
1302 "Unknown child process pid %d, will report returncode 255",
1303 pid)
1304 debug_log = False
1305 else:
1306 if pid == 0:
1307 # The child process is still alive.
1308 return
1309
Victor Stinner99d28c52020-12-16 12:11:24 +01001310 returncode = waitstatus_to_exitcode(status)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001311 debug_log = True
1312 try:
1313 loop, callback, args = self._callbacks.pop(pid)
1314 except KeyError: # pragma: no cover
1315 # May happen if .remove_child_handler() is called
1316 # after os.waitpid() returns.
1317 logger.warning("Child watcher got an unexpected pid: %r",
1318 pid, exc_info=True)
1319 else:
1320 if loop.is_closed():
1321 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1322 else:
1323 if debug_log and loop.get_debug():
1324 logger.debug('process %s exited with returncode %s',
1325 expected_pid, returncode)
1326 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1327
1328 def _sig_chld(self, signum, frame):
1329 try:
1330 self._do_waitpid_all()
1331 except (SystemExit, KeyboardInterrupt):
1332 raise
1333 except BaseException:
1334 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1335
1336
1337class ThreadedChildWatcher(AbstractChildWatcher):
1338 """Threaded child watcher implementation.
1339
1340 The watcher uses a thread per process
1341 for waiting for the process finish.
1342
1343 It doesn't require subscription on POSIX signal
1344 but a thread creation is not free.
1345
Min ho Kim96e12d52019-07-22 06:12:33 +10001346 The watcher has O(1) complexity, its performance doesn't depend
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001347 on amount of spawn processes.
1348 """
1349
1350 def __init__(self):
1351 self._pid_counter = itertools.count(0)
1352 self._threads = {}
1353
1354 def is_active(self):
1355 return True
1356
1357 def close(self):
Kyle Stanley0ca7cc72020-01-12 06:02:50 -05001358 self._join_threads()
1359
1360 def _join_threads(self):
1361 """Internal: Join all non-daemon threads"""
1362 threads = [thread for thread in list(self._threads.values())
1363 if thread.is_alive() and not thread.daemon]
1364 for thread in threads:
1365 thread.join()
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001366
1367 def __enter__(self):
1368 return self
1369
1370 def __exit__(self, exc_type, exc_val, exc_tb):
1371 pass
1372
1373 def __del__(self, _warn=warnings.warn):
1374 threads = [thread for thread in list(self._threads.values())
1375 if thread.is_alive()]
1376 if threads:
1377 _warn(f"{self.__class__} has registered but not finished child processes",
1378 ResourceWarning,
1379 source=self)
1380
1381 def add_child_handler(self, pid, callback, *args):
1382 loop = events.get_running_loop()
1383 thread = threading.Thread(target=self._do_waitpid,
1384 name=f"waitpid-{next(self._pid_counter)}",
1385 args=(loop, pid, callback, args),
1386 daemon=True)
1387 self._threads[pid] = thread
1388 thread.start()
1389
1390 def remove_child_handler(self, pid):
1391 # asyncio never calls remove_child_handler() !!!
1392 # The method is no-op but is implemented because
1393 # abstract base classe requires it
1394 return True
1395
1396 def attach_loop(self, loop):
1397 pass
1398
1399 def _do_waitpid(self, loop, expected_pid, callback, args):
1400 assert expected_pid > 0
1401
1402 try:
1403 pid, status = os.waitpid(expected_pid, 0)
1404 except ChildProcessError:
1405 # The child process is already reaped
1406 # (may happen if waitpid() is called elsewhere).
1407 pid = expected_pid
1408 returncode = 255
1409 logger.warning(
1410 "Unknown child process pid %d, will report returncode 255",
1411 pid)
1412 else:
Victor Stinner99d28c52020-12-16 12:11:24 +01001413 returncode = waitstatus_to_exitcode(status)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001414 if loop.get_debug():
1415 logger.debug('process %s exited with returncode %s',
1416 expected_pid, returncode)
1417
1418 if loop.is_closed():
1419 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1420 else:
1421 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1422
1423 self._threads.pop(expected_pid)
1424
1425
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001426class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001427 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001428 _loop_factory = _UnixSelectorEventLoop
1429
1430 def __init__(self):
1431 super().__init__()
1432 self._watcher = None
1433
1434 def _init_watcher(self):
1435 with events._lock:
1436 if self._watcher is None: # pragma: no branch
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001437 self._watcher = ThreadedChildWatcher()
Hill Ma99eb70a2019-12-05 04:40:12 -08001438 if threading.current_thread() is threading.main_thread():
Guido van Rossum2bcae702013-11-13 15:50:08 -08001439 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001440
1441 def set_event_loop(self, loop):
1442 """Set the event loop.
1443
1444 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001445 .set_event_loop() from the main thread will call .attach_loop(loop) on
1446 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001447 """
1448
1449 super().set_event_loop(loop)
1450
Andrew Svetlovcc839202017-11-29 18:23:43 +02001451 if (self._watcher is not None and
Hill Ma99eb70a2019-12-05 04:40:12 -08001452 threading.current_thread() is threading.main_thread()):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001453 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001454
1455 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001456 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001457
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001458 If not yet set, a ThreadedChildWatcher object is automatically created.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001459 """
1460 if self._watcher is None:
1461 self._init_watcher()
1462
1463 return self._watcher
1464
1465 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001466 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001467
1468 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1469
1470 if self._watcher is not None:
1471 self._watcher.close()
1472
1473 self._watcher = watcher
1474
Yury Selivanov6370f342017-12-10 18:36:12 -05001475
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001476SelectorEventLoop = _UnixSelectorEventLoop
1477DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy