blob: e4f445e95026b55e7060a496d4da9841f1bc0e12 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Andrew Svetlov0d671c02019-06-30 12:54:59 +03005import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01007import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import signal
9import socket
10import stat
11import subprocess
12import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
Kyle Stanley3f8cebd2019-11-14 21:47:56 -050032 'FastChildWatcher', 'PidfdChildWatcher',
Andrew Svetlov0d671c02019-06-30 12:54:59 +030033 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050035)
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
Victor Stinnerfe5649c2014-07-17 22:43:40 +020042def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
Victor Stinner99d28c52020-12-16 12:11:24 +010047def waitstatus_to_exitcode(status):
48 try:
49 return os.waitstatus_to_exitcode(status)
50 except ValueError:
51 # The child exited, but we don't understand its status.
52 # This shouldn't happen, but if it does, let's just
53 # return that status; perhaps that helps debug it.
54 return status
55
56
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080057class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050058 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Yury Selivanovb057c522014-02-18 12:15:06 -050060 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070061 """
62
63 def __init__(self, selector=None):
64 super().__init__(selector)
65 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080067 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020068 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020069 if not sys.is_finalizing():
70 for sig in list(self._signal_handlers):
71 self.remove_signal_handler(sig)
72 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020073 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020074 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020075 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020076 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020077 ResourceWarning,
78 source=self)
79 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080080
Victor Stinnerfe5649c2014-07-17 22:43:40 +020081 def _process_self_data(self, data):
82 for signum in data:
83 if not signum:
84 # ignore null bytes written by _write_to_self()
85 continue
86 self._handle_signal(signum)
87
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 def add_signal_handler(self, sig, callback, *args):
89 """Add a handler for a signal. UNIX only.
90
91 Raise ValueError if the signal number is invalid or uncatchable.
92 Raise RuntimeError if there is a problem setting up the handler.
93 """
Yury Selivanov6370f342017-12-10 18:36:12 -050094 if (coroutines.iscoroutine(callback) or
95 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010096 raise TypeError("coroutines cannot be used "
97 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010099 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 try:
101 # set_wakeup_fd() raises ValueError if this is not the
102 # main thread. By calling it early we ensure that an
103 # event loop running in another thread cannot add a signal
104 # handler.
105 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +0200106 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 raise RuntimeError(str(exc))
108
Yury Selivanovf23746a2018-01-22 19:11:18 -0500109 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 self._signal_handlers[sig] = handle
111
112 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200113 # Register a dummy signal handler to ask Python to write the signal
Kunal Bhallaf2947e32020-05-20 13:12:37 -0400114 # number in the wakeup file descriptor. _process_self_data() will
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200115 # read signal numbers from this file descriptor to handle signals.
116 signal.signal(sig, _sighandler_noop)
117
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100118 # Set SA_RESTART to limit EINTR occurrences.
119 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 except OSError as exc:
121 del self._signal_handlers[sig]
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200125 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500129 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130 else:
131 raise
132
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200133 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134 """Internal helper that is the actual signal handler."""
135 handle = self._signal_handlers.get(sig)
136 if handle is None:
137 return # Assume it's some race condition.
138 if handle._cancelled:
139 self.remove_signal_handler(sig) # Remove it properly.
140 else:
141 self._add_callback_signalsafe(handle)
142
143 def remove_signal_handler(self, sig):
144 """Remove a handler for a signal. UNIX only.
145
146 Return True if a signal handler was removed, False if not.
147 """
148 self._check_signal(sig)
149 try:
150 del self._signal_handlers[sig]
151 except KeyError:
152 return False
153
154 if sig == signal.SIGINT:
155 handler = signal.default_int_handler
156 else:
157 handler = signal.SIG_DFL
158
159 try:
160 signal.signal(sig, handler)
161 except OSError as exc:
162 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500163 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 else:
165 raise
166
167 if not self._signal_handlers:
168 try:
169 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200170 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700171 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172
173 return True
174
175 def _check_signal(self, sig):
176 """Internal helper to validate a signal.
177
178 Raise ValueError if the signal number is invalid or uncatchable.
179 Raise RuntimeError if there is a problem setting up the handler.
180 """
181 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500182 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200184 if sig not in signal.valid_signals():
185 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188 extra=None):
189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192 extra=None):
193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200195 async def _make_subprocess_transport(self, protocol, args, shell,
196 stdin, stdout, stderr, bufsize,
197 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800198 with events.get_child_watcher() as watcher:
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300199 if not watcher.is_active():
200 # Check early.
201 # Raising exception before process creation
202 # prevents subprocess execution if the watcher
203 # is not ready to handle it.
204 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
205 "subprocess support is not installed.")
Yury Selivanov7661db62016-05-16 15:38:39 -0400206 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800207 transp = _UnixSubprocessTransport(self, protocol, args, shell,
208 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100209 waiter=waiter, extra=extra,
210 **kwargs)
211
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800212 watcher.add_child_handler(transp.get_pid(),
213 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100214 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200215 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200216 except (SystemExit, KeyboardInterrupt):
217 raise
218 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100219 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200220 await transp._wait()
221 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800222
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 return transp
224
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800225 def _child_watcher_callback(self, pid, returncode, transp):
226 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227
Neil Aspinallf7686c12017-12-19 19:45:42 +0000228 async def create_unix_connection(
229 self, protocol_factory, path=None, *,
230 ssl=None, sock=None,
231 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200232 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500233 assert server_hostname is None or isinstance(server_hostname, str)
234 if ssl:
235 if server_hostname is None:
236 raise ValueError(
237 'you have to pass server_hostname when using ssl')
238 else:
239 if server_hostname is not None:
240 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200241 if ssl_handshake_timeout is not None:
242 raise ValueError(
243 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500244
245 if path is not None:
246 if sock is not None:
247 raise ValueError(
248 'path and sock can not be specified at the same time')
249
Andrew Svetlovcc839202017-11-29 18:23:43 +0200250 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100251 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500252 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500253 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200254 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100255 except:
256 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500257 raise
258
259 else:
260 if sock is None:
261 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400262 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500263 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400264 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500265 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500266 sock.setblocking(False)
267
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200268 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000269 sock, protocol_factory, ssl, server_hostname,
270 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500271 return transport, protocol
272
Neil Aspinallf7686c12017-12-19 19:45:42 +0000273 async def create_unix_server(
274 self, protocol_factory, path=None, *,
275 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500276 ssl_handshake_timeout=None,
277 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500278 if isinstance(ssl, bool):
279 raise TypeError('ssl argument must be an SSLContext or None')
280
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200281 if ssl_handshake_timeout is not None and not ssl:
282 raise ValueError(
283 'ssl_handshake_timeout is only meaningful with ssl')
284
Yury Selivanovb057c522014-02-18 12:15:06 -0500285 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200286 if sock is not None:
287 raise ValueError(
288 'path and sock can not be specified at the same time')
289
Andrew Svetlovcc839202017-11-29 18:23:43 +0200290 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500291 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
292
Yury Selivanov908d55d2016-10-09 12:15:08 -0400293 # Check for abstract socket. `str` and `bytes` paths are supported.
294 if path[0] not in (0, '\x00'):
295 try:
296 if stat.S_ISSOCK(os.stat(path).st_mode):
297 os.remove(path)
298 except FileNotFoundError:
299 pass
300 except OSError as err:
301 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200302 logger.error('Unable to check or remove stale UNIX socket '
303 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400304
Yury Selivanovb057c522014-02-18 12:15:06 -0500305 try:
306 sock.bind(path)
307 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100308 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500309 if exc.errno == errno.EADDRINUSE:
310 # Let's improve the error message by adding
311 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500312 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500313 raise OSError(errno.EADDRINUSE, msg) from None
314 else:
315 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200316 except:
317 sock.close()
318 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500319 else:
320 if sock is None:
321 raise ValueError(
322 'path was not specified, and no sock specified')
323
Yury Selivanov36e7e972016-10-07 12:39:57 -0400324 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500325 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500326 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500327 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500328
Yury Selivanovb057c522014-02-18 12:15:06 -0500329 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500330 server = base_events.Server(self, [sock], protocol_factory,
331 ssl, backlog, ssl_handshake_timeout)
332 if start_serving:
333 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400334 # Skip one loop iteration so that all 'loop.add_reader'
335 # go through.
Yurii Karabase4fe3032020-11-28 10:21:17 +0200336 await tasks.sleep(0)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500337
Yury Selivanovb057c522014-02-18 12:15:06 -0500338 return server
339
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200340 async def _sock_sendfile_native(self, sock, file, offset, count):
341 try:
342 os.sendfile
Pablo Galindo293dd232019-11-19 21:34:03 +0000343 except AttributeError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700344 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200345 "os.sendfile() is not available")
346 try:
347 fileno = file.fileno()
348 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700349 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200350 try:
351 fsize = os.fstat(fileno).st_size
Pablo Galindo293dd232019-11-19 21:34:03 +0000352 except OSError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700353 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200354 blocksize = count if count else fsize
355 if not blocksize:
356 return 0 # empty file
357
358 fut = self.create_future()
359 self._sock_sendfile_native_impl(fut, None, sock, fileno,
360 offset, count, blocksize, 0)
361 return await fut
362
363 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
364 offset, count, blocksize, total_sent):
365 fd = sock.fileno()
366 if registered_fd is not None:
367 # Remove the callback early. It should be rare that the
368 # selector says the fd is ready but the call still returns
369 # EAGAIN, and I am willing to take a hit in that case in
370 # order to simplify the common case.
371 self.remove_writer(registered_fd)
372 if fut.cancelled():
373 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
374 return
375 if count:
376 blocksize = count - total_sent
377 if blocksize <= 0:
378 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
379 fut.set_result(total_sent)
380 return
381
382 try:
383 sent = os.sendfile(fd, fileno, offset, blocksize)
384 except (BlockingIOError, InterruptedError):
385 if registered_fd is None:
386 self._sock_add_cancellation_callback(fut, sock)
387 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
388 fd, sock, fileno,
389 offset, count, blocksize, total_sent)
390 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500391 if (registered_fd is not None and
392 exc.errno == errno.ENOTCONN and
393 type(exc) is not ConnectionError):
394 # If we have an ENOTCONN and this isn't a first call to
395 # sendfile(), i.e. the connection was closed in the middle
396 # of the operation, normalize the error to ConnectionError
397 # to make it consistent across all Posix systems.
398 new_exc = ConnectionError(
399 "socket is not connected", errno.ENOTCONN)
400 new_exc.__cause__ = exc
401 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200402 if total_sent == 0:
403 # We can get here for different reasons, the main
404 # one being 'file' is not a regular mmap(2)-like
405 # file, in which case we'll fall back on using
406 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700407 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200408 "os.sendfile call failed")
409 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
410 fut.set_exception(err)
411 else:
412 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200414 except (SystemExit, KeyboardInterrupt):
415 raise
416 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200417 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
418 fut.set_exception(exc)
419 else:
420 if sent == 0:
421 # EOF
422 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
423 fut.set_result(total_sent)
424 else:
425 offset += sent
426 total_sent += sent
427 if registered_fd is None:
428 self._sock_add_cancellation_callback(fut, sock)
429 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
430 fd, sock, fileno,
431 offset, count, blocksize, total_sent)
432
433 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
434 if total_sent > 0:
435 os.lseek(fileno, offset, os.SEEK_SET)
436
437 def _sock_add_cancellation_callback(self, fut, sock):
438 def cb(fut):
439 if fut.cancelled():
440 fd = sock.fileno()
441 if fd != -1:
442 self.remove_writer(fd)
443 fut.add_done_callback(cb)
444
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446class _UnixReadPipeTransport(transports.ReadTransport):
447
Yury Selivanovdec1a452014-02-18 22:27:48 -0500448 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
450 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
451 super().__init__(extra)
452 self._extra['pipe'] = pipe
453 self._loop = loop
454 self._pipe = pipe
455 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700456 self._protocol = protocol
457 self._closing = False
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300458 self._paused = False
Guido van Rossum47867872016-08-31 09:42:38 -0700459
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700460 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800461 if not (stat.S_ISFIFO(mode) or
462 stat.S_ISSOCK(mode) or
463 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700464 self._pipe = None
465 self._fileno = None
466 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700467 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700468
Andrew Svetlovcc839202017-11-29 18:23:43 +0200469 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100472 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400473 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100474 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100476 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500477 self._loop.call_soon(futures._set_result_unless_cancelled,
478 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
Victor Stinnere912e652014-07-12 03:11:53 +0200480 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100481 info = [self.__class__.__name__]
482 if self._pipe is None:
483 info.append('closed')
484 elif self._closing:
485 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500486 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400487 selector = getattr(self._loop, '_selector', None)
488 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200489 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500490 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200491 if polling:
492 info.append('polling')
493 else:
494 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400495 elif self._pipe is not None:
496 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200497 else:
498 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500499 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200500
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501 def _read_ready(self):
502 try:
503 data = os.read(self._fileno, self.max_size)
504 except (BlockingIOError, InterruptedError):
505 pass
506 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100507 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 else:
509 if data:
510 self._protocol.data_received(data)
511 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200512 if self._loop.get_debug():
513 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400515 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 self._loop.call_soon(self._protocol.eof_received)
517 self._loop.call_soon(self._call_connection_lost, None)
518
Guido van Rossum57497ad2013-10-18 07:58:20 -0700519 def pause_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300520 if self._closing or self._paused:
521 return
522 self._paused = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400523 self._loop._remove_reader(self._fileno)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300524 if self._loop.get_debug():
525 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526
Guido van Rossum57497ad2013-10-18 07:58:20 -0700527 def resume_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300528 if self._closing or not self._paused:
529 return
530 self._paused = False
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400531 self._loop._add_reader(self._fileno, self._read_ready)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300532 if self._loop.get_debug():
533 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400535 def set_protocol(self, protocol):
536 self._protocol = protocol
537
538 def get_protocol(self):
539 return self._protocol
540
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500541 def is_closing(self):
542 return self._closing
543
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 def close(self):
545 if not self._closing:
546 self._close(None)
547
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100548 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900549 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100550 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900551 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100552
Victor Stinner0ee29c22014-02-19 01:40:41 +0100553 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200555 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
556 if self._loop.get_debug():
557 logger.debug("%r: %s", self, message, exc_info=True)
558 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500559 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100560 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500561 'exception': exc,
562 'transport': self,
563 'protocol': self._protocol,
564 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 self._close(exc)
566
567 def _close(self, exc):
568 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400569 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 self._loop.call_soon(self._call_connection_lost, exc)
571
572 def _call_connection_lost(self, exc):
573 try:
574 self._protocol.connection_lost(exc)
575 finally:
576 self._pipe.close()
577 self._pipe = None
578 self._protocol = None
579 self._loop = None
580
581
Yury Selivanov3cb99142014-02-18 18:41:13 -0500582class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800583 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584
585 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100586 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 self._pipe = pipe
589 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400591 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 self._conn_lost = 0
593 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700594
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700595 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700596 is_char = stat.S_ISCHR(mode)
597 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700598 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700599 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700600 self._pipe = None
601 self._fileno = None
602 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100603 raise ValueError("Pipe transport is only for "
604 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700605
Andrew Svetlovcc839202017-11-29 18:23:43 +0200606 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100608
609 # On AIX, the reader trick (to be notified when the read end of the
610 # socket is closed) only works for sockets. On other platforms it
611 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700612 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100613 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400614 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100615 self._fileno, self._read_ready)
616
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100618 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500619 self._loop.call_soon(futures._set_result_unless_cancelled,
620 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621
Victor Stinnere912e652014-07-12 03:11:53 +0200622 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100623 info = [self.__class__.__name__]
624 if self._pipe is None:
625 info.append('closed')
626 elif self._closing:
627 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500628 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400629 selector = getattr(self._loop, '_selector', None)
630 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200631 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500632 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200633 if polling:
634 info.append('polling')
635 else:
636 info.append('idle')
637
638 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500639 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400640 elif self._pipe is not None:
641 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200642 else:
643 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500644 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200645
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800646 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400647 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800648
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700650 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200651 if self._loop.get_debug():
652 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100653 if self._buffer:
654 self._close(BrokenPipeError())
655 else:
656 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657
658 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800659 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
660 if isinstance(data, bytearray):
661 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 if not data:
663 return
664
665 if self._conn_lost or self._closing:
666 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700667 logger.warning('pipe closed by peer or '
668 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 self._conn_lost += 1
670 return
671
672 if not self._buffer:
673 # Attempt to send it right away first.
674 try:
675 n = os.write(self._fileno, data)
676 except (BlockingIOError, InterruptedError):
677 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200678 except (SystemExit, KeyboardInterrupt):
679 raise
680 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100682 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 return
684 if n == len(data):
685 return
686 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400687 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400688 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400690 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800691 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700692
693 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400694 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400697 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400699 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200700 except (SystemExit, KeyboardInterrupt):
701 raise
702 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400703 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 self._conn_lost += 1
705 # Remove writer here, _fatal_error() doesn't it
706 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400707 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100708 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400710 if n == len(self._buffer):
711 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400712 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800713 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400714 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400715 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 self._call_connection_lost(None)
717 return
718 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400719 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720
721 def can_write_eof(self):
722 return True
723
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 def write_eof(self):
725 if self._closing:
726 return
727 assert self._pipe
728 self._closing = True
729 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400730 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 self._loop.call_soon(self._call_connection_lost, None)
732
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400733 def set_protocol(self, protocol):
734 self._protocol = protocol
735
736 def get_protocol(self):
737 return self._protocol
738
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500739 def is_closing(self):
740 return self._closing
741
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100743 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 # write_eof is all what we needed to close the write pipe
745 self.write_eof()
746
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100747 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900748 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100749 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900750 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100751
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 def abort(self):
753 self._close(None)
754
Victor Stinner0ee29c22014-02-19 01:40:41 +0100755 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300757 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200758 if self._loop.get_debug():
759 logger.debug("%r: %s", self, message, exc_info=True)
760 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500761 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100762 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500763 'exception': exc,
764 'transport': self,
765 'protocol': self._protocol,
766 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767 self._close(exc)
768
769 def _close(self, exc=None):
770 self._closing = True
771 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400772 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400774 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700775 self._loop.call_soon(self._call_connection_lost, exc)
776
777 def _call_connection_lost(self, exc):
778 try:
779 self._protocol.connection_lost(exc)
780 finally:
781 self._pipe.close()
782 self._pipe = None
783 self._protocol = None
784 self._loop = None
785
786
Guido van Rossum59691282013-10-30 14:52:03 -0700787class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788
Guido van Rossum59691282013-10-30 14:52:03 -0700789 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700790 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700791 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700792 # Use a socket pair for stdin, since not all platforms
793 # support selecting read events on the write end of a
794 # socket (which we use in order to detect closing of the
795 # other end). Notably this is needed on AIX, and works
796 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100797 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200798 try:
799 self._proc = subprocess.Popen(
800 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
801 universal_newlines=False, bufsize=bufsize, **kwargs)
802 if stdin_w is not None:
803 stdin.close()
804 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
805 stdin_w = None
806 finally:
807 if stdin_w is not None:
808 stdin.close()
809 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800810
811
812class AbstractChildWatcher:
813 """Abstract base class for monitoring child processes.
814
815 Objects derived from this class monitor a collection of subprocesses and
816 report their termination or interruption by a signal.
817
818 New callbacks are registered with .add_child_handler(). Starting a new
819 process must be done within a 'with' block to allow the watcher to suspend
820 its activity until the new process if fully registered (this is needed to
821 prevent a race condition in some implementations).
822
823 Example:
824 with watcher:
825 proc = subprocess.Popen("sleep 1")
826 watcher.add_child_handler(proc.pid, callback)
827
828 Notes:
829 Implementations of this class must be thread-safe.
830
831 Since child watcher objects may catch the SIGCHLD signal and call
832 waitpid(-1), there should be only one active object per process.
833 """
834
835 def add_child_handler(self, pid, callback, *args):
836 """Register a new child handler.
837
838 Arrange for callback(pid, returncode, *args) to be called when
839 process 'pid' terminates. Specifying another callback for the same
840 process replaces the previous handler.
841
Victor Stinneracdb7822014-07-14 18:33:40 +0200842 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800843 """
844 raise NotImplementedError()
845
846 def remove_child_handler(self, pid):
847 """Removes the handler for process 'pid'.
848
849 The function returns True if the handler was successfully removed,
850 False if there was nothing to remove."""
851
852 raise NotImplementedError()
853
Guido van Rossum2bcae702013-11-13 15:50:08 -0800854 def attach_loop(self, loop):
855 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856
Guido van Rossum2bcae702013-11-13 15:50:08 -0800857 If the watcher was previously attached to an event loop, then it is
858 first detached before attaching to the new loop.
859
860 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800861 """
862 raise NotImplementedError()
863
864 def close(self):
865 """Close the watcher.
866
867 This must be called to make sure that any underlying resource is freed.
868 """
869 raise NotImplementedError()
870
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300871 def is_active(self):
872 """Return ``True`` if the watcher is active and is used by the event loop.
873
874 Return True if the watcher is installed and ready to handle process exit
875 notifications.
876
877 """
878 raise NotImplementedError()
879
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800880 def __enter__(self):
881 """Enter the watcher's context and allow starting new processes
882
883 This function must return self"""
884 raise NotImplementedError()
885
886 def __exit__(self, a, b, c):
887 """Exit the watcher's context"""
888 raise NotImplementedError()
889
890
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800891class PidfdChildWatcher(AbstractChildWatcher):
892 """Child watcher implementation using Linux's pid file descriptors.
893
894 This child watcher polls process file descriptors (pidfds) to await child
895 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
896 child watcher implementation. It doesn't require signals or threads, doesn't
897 interfere with any processes launched outside the event loop, and scales
898 linearly with the number of subprocesses launched by the event loop. The
899 main disadvantage is that pidfds are specific to Linux, and only work on
900 recent (5.3+) kernels.
901 """
902
903 def __init__(self):
904 self._loop = None
905 self._callbacks = {}
906
907 def __enter__(self):
908 return self
909
910 def __exit__(self, exc_type, exc_value, exc_traceback):
911 pass
912
913 def is_active(self):
914 return self._loop is not None and self._loop.is_running()
915
916 def close(self):
917 self.attach_loop(None)
918
919 def attach_loop(self, loop):
920 if self._loop is not None and loop is None and self._callbacks:
921 warnings.warn(
922 'A loop is being detached '
923 'from a child watcher with pending handlers',
924 RuntimeWarning)
925 for pidfd, _, _ in self._callbacks.values():
926 self._loop._remove_reader(pidfd)
927 os.close(pidfd)
928 self._callbacks.clear()
929 self._loop = loop
930
931 def add_child_handler(self, pid, callback, *args):
932 existing = self._callbacks.get(pid)
933 if existing is not None:
934 self._callbacks[pid] = existing[0], callback, args
935 else:
936 pidfd = os.pidfd_open(pid)
937 self._loop._add_reader(pidfd, self._do_wait, pid)
938 self._callbacks[pid] = pidfd, callback, args
939
940 def _do_wait(self, pid):
941 pidfd, callback, args = self._callbacks.pop(pid)
942 self._loop._remove_reader(pidfd)
Victor Stinnere76ee1a2019-12-06 16:32:41 +0100943 try:
944 _, status = os.waitpid(pid, 0)
945 except ChildProcessError:
946 # The child process is already reaped
947 # (may happen if waitpid() is called elsewhere).
948 returncode = 255
949 logger.warning(
950 "child process pid %d exit status already read: "
951 " will report returncode 255",
952 pid)
953 else:
Victor Stinner99d28c52020-12-16 12:11:24 +0100954 returncode = waitstatus_to_exitcode(status)
Victor Stinnere76ee1a2019-12-06 16:32:41 +0100955
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800956 os.close(pidfd)
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800957 callback(pid, returncode, *args)
958
959 def remove_child_handler(self, pid):
960 try:
961 pidfd, _, _ = self._callbacks.pop(pid)
962 except KeyError:
963 return False
964 self._loop._remove_reader(pidfd)
965 os.close(pidfd)
966 return True
967
968
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969class BaseChildWatcher(AbstractChildWatcher):
970
Guido van Rossum2bcae702013-11-13 15:50:08 -0800971 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400973 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974
975 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800976 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800977
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300978 def is_active(self):
979 return self._loop is not None and self._loop.is_running()
980
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981 def _do_waitpid(self, expected_pid):
982 raise NotImplementedError()
983
984 def _do_waitpid_all(self):
985 raise NotImplementedError()
986
Guido van Rossum2bcae702013-11-13 15:50:08 -0800987 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800988 assert loop is None or isinstance(loop, events.AbstractEventLoop)
989
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400990 if self._loop is not None and loop is None and self._callbacks:
991 warnings.warn(
992 'A loop is being detached '
993 'from a child watcher with pending handlers',
994 RuntimeWarning)
995
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800996 if self._loop is not None:
997 self._loop.remove_signal_handler(signal.SIGCHLD)
998
999 self._loop = loop
1000 if loop is not None:
1001 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1002
1003 # Prevent a race condition in case a child terminated
1004 # during the switch.
1005 self._do_waitpid_all()
1006
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001007 def _sig_chld(self):
1008 try:
1009 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +02001010 except (SystemExit, KeyboardInterrupt):
1011 raise
1012 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001013 # self._loop should always be available here
1014 # as '_sig_chld' is added as a signal handler
1015 # in 'attach_loop'
1016 self._loop.call_exception_handler({
1017 'message': 'Unknown exception in SIGCHLD handler',
1018 'exception': exc,
1019 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001020
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001021
1022class SafeChildWatcher(BaseChildWatcher):
1023 """'Safe' child watcher implementation.
1024
1025 This implementation avoids disrupting other code spawning processes by
1026 polling explicitly each process in the SIGCHLD handler instead of calling
1027 os.waitpid(-1).
1028
1029 This is a safe solution but it has a significant overhead when handling a
1030 big number of children (O(n) each time SIGCHLD is raised)
1031 """
1032
Guido van Rossum2bcae702013-11-13 15:50:08 -08001033 def close(self):
1034 self._callbacks.clear()
1035 super().close()
1036
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001037 def __enter__(self):
1038 return self
1039
1040 def __exit__(self, a, b, c):
1041 pass
1042
1043 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +01001044 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001045
1046 # Prevent a race condition in case the child is already terminated.
1047 self._do_waitpid(pid)
1048
Guido van Rossum2bcae702013-11-13 15:50:08 -08001049 def remove_child_handler(self, pid):
1050 try:
1051 del self._callbacks[pid]
1052 return True
1053 except KeyError:
1054 return False
1055
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001056 def _do_waitpid_all(self):
1057
1058 for pid in list(self._callbacks):
1059 self._do_waitpid(pid)
1060
1061 def _do_waitpid(self, expected_pid):
1062 assert expected_pid > 0
1063
1064 try:
1065 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1066 except ChildProcessError:
1067 # The child process is already reaped
1068 # (may happen if waitpid() is called elsewhere).
1069 pid = expected_pid
1070 returncode = 255
1071 logger.warning(
1072 "Unknown child process pid %d, will report returncode 255",
1073 pid)
1074 else:
1075 if pid == 0:
1076 # The child process is still alive.
1077 return
1078
Victor Stinner99d28c52020-12-16 12:11:24 +01001079 returncode = waitstatus_to_exitcode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +02001080 if self._loop.get_debug():
1081 logger.debug('process %s exited with returncode %s',
1082 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001083
1084 try:
1085 callback, args = self._callbacks.pop(pid)
1086 except KeyError: # pragma: no cover
1087 # May happen if .remove_child_handler() is called
1088 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +02001089 if self._loop.get_debug():
1090 logger.warning("Child watcher got an unexpected pid: %r",
1091 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001092 else:
1093 callback(pid, returncode, *args)
1094
1095
1096class FastChildWatcher(BaseChildWatcher):
1097 """'Fast' child watcher implementation.
1098
1099 This implementation reaps every terminated processes by calling
1100 os.waitpid(-1) directly, possibly breaking other code spawning processes
1101 and waiting for their termination.
1102
1103 There is no noticeable overhead when handling a big number of children
1104 (O(1) each time a child terminates).
1105 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001106 def __init__(self):
1107 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001108 self._lock = threading.Lock()
1109 self._zombies = {}
1110 self._forks = 0
1111
1112 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001113 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001114 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001115 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001116
1117 def __enter__(self):
1118 with self._lock:
1119 self._forks += 1
1120
1121 return self
1122
1123 def __exit__(self, a, b, c):
1124 with self._lock:
1125 self._forks -= 1
1126
1127 if self._forks or not self._zombies:
1128 return
1129
1130 collateral_victims = str(self._zombies)
1131 self._zombies.clear()
1132
1133 logger.warning(
1134 "Caught subprocesses termination from unknown pids: %s",
1135 collateral_victims)
1136
1137 def add_child_handler(self, pid, callback, *args):
1138 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001139
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001140 with self._lock:
1141 try:
1142 returncode = self._zombies.pop(pid)
1143 except KeyError:
1144 # The child is running.
1145 self._callbacks[pid] = callback, args
1146 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001147
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001148 # The child is dead already. We can fire the callback.
1149 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001150
Guido van Rossum2bcae702013-11-13 15:50:08 -08001151 def remove_child_handler(self, pid):
1152 try:
1153 del self._callbacks[pid]
1154 return True
1155 except KeyError:
1156 return False
1157
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001158 def _do_waitpid_all(self):
1159 # Because of signal coalescing, we must keep calling waitpid() as
1160 # long as we're able to reap a child.
1161 while True:
1162 try:
1163 pid, status = os.waitpid(-1, os.WNOHANG)
1164 except ChildProcessError:
1165 # No more child processes exist.
1166 return
1167 else:
1168 if pid == 0:
1169 # A child process is still alive.
1170 return
1171
Victor Stinner99d28c52020-12-16 12:11:24 +01001172 returncode = waitstatus_to_exitcode(status)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001173
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001174 with self._lock:
1175 try:
1176 callback, args = self._callbacks.pop(pid)
1177 except KeyError:
1178 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001179 if self._forks:
1180 # It may not be registered yet.
1181 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001182 if self._loop.get_debug():
1183 logger.debug('unknown process %s exited '
1184 'with returncode %s',
1185 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001186 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001187 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001188 else:
1189 if self._loop.get_debug():
1190 logger.debug('process %s exited with returncode %s',
1191 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001192
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001193 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001194 logger.warning(
1195 "Caught subprocess termination from unknown pid: "
1196 "%d -> %d", pid, returncode)
1197 else:
1198 callback(pid, returncode, *args)
1199
1200
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001201class MultiLoopChildWatcher(AbstractChildWatcher):
1202 """A watcher that doesn't require running loop in the main thread.
1203
1204 This implementation registers a SIGCHLD signal handler on
1205 instantiation (which may conflict with other code that
1206 install own handler for this signal).
1207
1208 The solution is safe but it has a significant overhead when
1209 handling a big number of processes (*O(n)* each time a
1210 SIGCHLD is received).
1211 """
1212
1213 # Implementation note:
1214 # The class keeps compatibility with AbstractChildWatcher ABC
1215 # To achieve this it has empty attach_loop() method
1216 # and doesn't accept explicit loop argument
1217 # for add_child_handler()/remove_child_handler()
1218 # but retrieves the current loop by get_running_loop()
1219
1220 def __init__(self):
1221 self._callbacks = {}
1222 self._saved_sighandler = None
1223
1224 def is_active(self):
1225 return self._saved_sighandler is not None
1226
1227 def close(self):
1228 self._callbacks.clear()
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001229 if self._saved_sighandler is None:
1230 return
1231
1232 handler = signal.getsignal(signal.SIGCHLD)
1233 if handler != self._sig_chld:
1234 logger.warning("SIGCHLD handler was changed by outside code")
1235 else:
1236 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1237 self._saved_sighandler = None
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001238
1239 def __enter__(self):
1240 return self
1241
1242 def __exit__(self, exc_type, exc_val, exc_tb):
1243 pass
1244
1245 def add_child_handler(self, pid, callback, *args):
1246 loop = events.get_running_loop()
1247 self._callbacks[pid] = (loop, callback, args)
1248
1249 # Prevent a race condition in case the child is already terminated.
1250 self._do_waitpid(pid)
1251
1252 def remove_child_handler(self, pid):
1253 try:
1254 del self._callbacks[pid]
1255 return True
1256 except KeyError:
1257 return False
1258
1259 def attach_loop(self, loop):
1260 # Don't save the loop but initialize itself if called first time
1261 # The reason to do it here is that attach_loop() is called from
1262 # unix policy only for the main thread.
1263 # Main thread is required for subscription on SIGCHLD signal
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001264 if self._saved_sighandler is not None:
1265 return
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001266
Chris Jerdonek66d3b582020-12-16 09:50:25 -08001267 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1268 if self._saved_sighandler is None:
1269 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1270 "restore to default handler on watcher close.")
1271 self._saved_sighandler = signal.SIG_DFL
1272
1273 # Set SA_RESTART to limit EINTR occurrences.
1274 signal.siginterrupt(signal.SIGCHLD, False)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001275
1276 def _do_waitpid_all(self):
1277 for pid in list(self._callbacks):
1278 self._do_waitpid(pid)
1279
1280 def _do_waitpid(self, expected_pid):
1281 assert expected_pid > 0
1282
1283 try:
1284 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1285 except ChildProcessError:
1286 # The child process is already reaped
1287 # (may happen if waitpid() is called elsewhere).
1288 pid = expected_pid
1289 returncode = 255
1290 logger.warning(
1291 "Unknown child process pid %d, will report returncode 255",
1292 pid)
1293 debug_log = False
1294 else:
1295 if pid == 0:
1296 # The child process is still alive.
1297 return
1298
Victor Stinner99d28c52020-12-16 12:11:24 +01001299 returncode = waitstatus_to_exitcode(status)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001300 debug_log = True
1301 try:
1302 loop, callback, args = self._callbacks.pop(pid)
1303 except KeyError: # pragma: no cover
1304 # May happen if .remove_child_handler() is called
1305 # after os.waitpid() returns.
1306 logger.warning("Child watcher got an unexpected pid: %r",
1307 pid, exc_info=True)
1308 else:
1309 if loop.is_closed():
1310 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1311 else:
1312 if debug_log and loop.get_debug():
1313 logger.debug('process %s exited with returncode %s',
1314 expected_pid, returncode)
1315 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1316
1317 def _sig_chld(self, signum, frame):
1318 try:
1319 self._do_waitpid_all()
1320 except (SystemExit, KeyboardInterrupt):
1321 raise
1322 except BaseException:
1323 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1324
1325
1326class ThreadedChildWatcher(AbstractChildWatcher):
1327 """Threaded child watcher implementation.
1328
1329 The watcher uses a thread per process
1330 for waiting for the process finish.
1331
1332 It doesn't require subscription on POSIX signal
1333 but a thread creation is not free.
1334
Min ho Kim96e12d52019-07-22 06:12:33 +10001335 The watcher has O(1) complexity, its performance doesn't depend
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001336 on amount of spawn processes.
1337 """
1338
1339 def __init__(self):
1340 self._pid_counter = itertools.count(0)
1341 self._threads = {}
1342
1343 def is_active(self):
1344 return True
1345
1346 def close(self):
Kyle Stanley0ca7cc72020-01-12 06:02:50 -05001347 self._join_threads()
1348
1349 def _join_threads(self):
1350 """Internal: Join all non-daemon threads"""
1351 threads = [thread for thread in list(self._threads.values())
1352 if thread.is_alive() and not thread.daemon]
1353 for thread in threads:
1354 thread.join()
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001355
1356 def __enter__(self):
1357 return self
1358
1359 def __exit__(self, exc_type, exc_val, exc_tb):
1360 pass
1361
1362 def __del__(self, _warn=warnings.warn):
1363 threads = [thread for thread in list(self._threads.values())
1364 if thread.is_alive()]
1365 if threads:
1366 _warn(f"{self.__class__} has registered but not finished child processes",
1367 ResourceWarning,
1368 source=self)
1369
1370 def add_child_handler(self, pid, callback, *args):
1371 loop = events.get_running_loop()
1372 thread = threading.Thread(target=self._do_waitpid,
1373 name=f"waitpid-{next(self._pid_counter)}",
1374 args=(loop, pid, callback, args),
1375 daemon=True)
1376 self._threads[pid] = thread
1377 thread.start()
1378
1379 def remove_child_handler(self, pid):
1380 # asyncio never calls remove_child_handler() !!!
1381 # The method is no-op but is implemented because
1382 # abstract base classe requires it
1383 return True
1384
1385 def attach_loop(self, loop):
1386 pass
1387
1388 def _do_waitpid(self, loop, expected_pid, callback, args):
1389 assert expected_pid > 0
1390
1391 try:
1392 pid, status = os.waitpid(expected_pid, 0)
1393 except ChildProcessError:
1394 # The child process is already reaped
1395 # (may happen if waitpid() is called elsewhere).
1396 pid = expected_pid
1397 returncode = 255
1398 logger.warning(
1399 "Unknown child process pid %d, will report returncode 255",
1400 pid)
1401 else:
Victor Stinner99d28c52020-12-16 12:11:24 +01001402 returncode = waitstatus_to_exitcode(status)
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001403 if loop.get_debug():
1404 logger.debug('process %s exited with returncode %s',
1405 expected_pid, returncode)
1406
1407 if loop.is_closed():
1408 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1409 else:
1410 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1411
1412 self._threads.pop(expected_pid)
1413
1414
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001415class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001416 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001417 _loop_factory = _UnixSelectorEventLoop
1418
1419 def __init__(self):
1420 super().__init__()
1421 self._watcher = None
1422
1423 def _init_watcher(self):
1424 with events._lock:
1425 if self._watcher is None: # pragma: no branch
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001426 self._watcher = ThreadedChildWatcher()
Hill Ma99eb70a2019-12-05 04:40:12 -08001427 if threading.current_thread() is threading.main_thread():
Guido van Rossum2bcae702013-11-13 15:50:08 -08001428 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001429
1430 def set_event_loop(self, loop):
1431 """Set the event loop.
1432
1433 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001434 .set_event_loop() from the main thread will call .attach_loop(loop) on
1435 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001436 """
1437
1438 super().set_event_loop(loop)
1439
Andrew Svetlovcc839202017-11-29 18:23:43 +02001440 if (self._watcher is not None and
Hill Ma99eb70a2019-12-05 04:40:12 -08001441 threading.current_thread() is threading.main_thread()):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001442 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001443
1444 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001445 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001446
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001447 If not yet set, a ThreadedChildWatcher object is automatically created.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001448 """
1449 if self._watcher is None:
1450 self._init_watcher()
1451
1452 return self._watcher
1453
1454 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001455 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001456
1457 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1458
1459 if self._watcher is not None:
1460 self._watcher.close()
1461
1462 self._watcher = watcher
1463
Yury Selivanov6370f342017-12-10 18:36:12 -05001464
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001465SelectorEventLoop = _UnixSelectorEventLoop
1466DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy