blob: 97198ea2f4953bc308c7cddd8096cf6198726745 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Andrew Svetlov0d671c02019-06-30 12:54:59 +03005import itertools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01007import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import signal
9import socket
10import stat
11import subprocess
12import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010014import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
Kyle Stanley3f8cebd2019-11-14 21:47:56 -050032 'FastChildWatcher', 'PidfdChildWatcher',
Andrew Svetlov0d671c02019-06-30 12:54:59 +030033 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050035)
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
Victor Stinnerfe5649c2014-07-17 22:43:40 +020042def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080047class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050048 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
Yury Selivanovb057c522014-02-18 12:15:06 -050050 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 """
52
53 def __init__(self, selector=None):
54 super().__init__(selector)
55 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020058 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020059 if not sys.is_finalizing():
60 for sig in list(self._signal_handlers):
61 self.remove_signal_handler(sig)
62 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020066 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020067 ResourceWarning,
68 source=self)
69 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080070
Victor Stinnerfe5649c2014-07-17 22:43:40 +020071 def _process_self_data(self, data):
72 for signum in data:
73 if not signum:
74 # ignore null bytes written by _write_to_self()
75 continue
76 self._handle_signal(signum)
77
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 def add_signal_handler(self, sig, callback, *args):
79 """Add a handler for a signal. UNIX only.
80
81 Raise ValueError if the signal number is invalid or uncatchable.
82 Raise RuntimeError if there is a problem setting up the handler.
83 """
Yury Selivanov6370f342017-12-10 18:36:12 -050084 if (coroutines.iscoroutine(callback) or
85 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010086 raise TypeError("coroutines cannot be used "
87 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010089 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 try:
91 # set_wakeup_fd() raises ValueError if this is not the
92 # main thread. By calling it early we ensure that an
93 # event loop running in another thread cannot add a signal
94 # handler.
95 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020096 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 raise RuntimeError(str(exc))
98
Yury Selivanovf23746a2018-01-22 19:11:18 -050099 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 self._signal_handlers[sig] = handle
101
102 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 # Register a dummy signal handler to ask Python to write the signal
104 # number in the wakup file descriptor. _process_self_data() will
105 # read signal numbers from this file descriptor to handle signals.
106 signal.signal(sig, _sighandler_noop)
107
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100108 # Set SA_RESTART to limit EINTR occurrences.
109 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 except OSError as exc:
111 del self._signal_handlers[sig]
112 if not self._signal_handlers:
113 try:
114 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200115 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700116 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
118 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500119 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 else:
121 raise
122
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200123 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 """Internal helper that is the actual signal handler."""
125 handle = self._signal_handlers.get(sig)
126 if handle is None:
127 return # Assume it's some race condition.
128 if handle._cancelled:
129 self.remove_signal_handler(sig) # Remove it properly.
130 else:
131 self._add_callback_signalsafe(handle)
132
133 def remove_signal_handler(self, sig):
134 """Remove a handler for a signal. UNIX only.
135
136 Return True if a signal handler was removed, False if not.
137 """
138 self._check_signal(sig)
139 try:
140 del self._signal_handlers[sig]
141 except KeyError:
142 return False
143
144 if sig == signal.SIGINT:
145 handler = signal.default_int_handler
146 else:
147 handler = signal.SIG_DFL
148
149 try:
150 signal.signal(sig, handler)
151 except OSError as exc:
152 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500153 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 else:
155 raise
156
157 if not self._signal_handlers:
158 try:
159 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200160 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700161 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162
163 return True
164
165 def _check_signal(self, sig):
166 """Internal helper to validate a signal.
167
168 Raise ValueError if the signal number is invalid or uncatchable.
169 Raise RuntimeError if there is a problem setting up the handler.
170 """
171 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500172 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200174 if sig not in signal.valid_signals():
175 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176
177 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
180
181 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
182 extra=None):
183 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
184
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200185 async def _make_subprocess_transport(self, protocol, args, shell,
186 stdin, stdout, stderr, bufsize,
187 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 with events.get_child_watcher() as watcher:
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300189 if not watcher.is_active():
190 # Check early.
191 # Raising exception before process creation
192 # prevents subprocess execution if the watcher
193 # is not ready to handle it.
194 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
195 "subprocess support is not installed.")
Yury Selivanov7661db62016-05-16 15:38:39 -0400196 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800197 transp = _UnixSubprocessTransport(self, protocol, args, shell,
198 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 waiter=waiter, extra=extra,
200 **kwargs)
201
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800202 watcher.add_child_handler(transp.get_pid(),
203 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100204 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200205 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200206 except (SystemExit, KeyboardInterrupt):
207 raise
208 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100209 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200210 await transp._wait()
211 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800212
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 return transp
214
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800215 def _child_watcher_callback(self, pid, returncode, transp):
216 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
Neil Aspinallf7686c12017-12-19 19:45:42 +0000218 async def create_unix_connection(
219 self, protocol_factory, path=None, *,
220 ssl=None, sock=None,
221 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200222 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500223 assert server_hostname is None or isinstance(server_hostname, str)
224 if ssl:
225 if server_hostname is None:
226 raise ValueError(
227 'you have to pass server_hostname when using ssl')
228 else:
229 if server_hostname is not None:
230 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200231 if ssl_handshake_timeout is not None:
232 raise ValueError(
233 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500234
235 if path is not None:
236 if sock is not None:
237 raise ValueError(
238 'path and sock can not be specified at the same time')
239
Andrew Svetlovcc839202017-11-29 18:23:43 +0200240 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100241 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500242 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500243 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200244 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100245 except:
246 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500247 raise
248
249 else:
250 if sock is None:
251 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400252 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500253 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400254 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500255 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 sock.setblocking(False)
257
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200258 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000259 sock, protocol_factory, ssl, server_hostname,
260 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500261 return transport, protocol
262
Neil Aspinallf7686c12017-12-19 19:45:42 +0000263 async def create_unix_server(
264 self, protocol_factory, path=None, *,
265 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500266 ssl_handshake_timeout=None,
267 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500268 if isinstance(ssl, bool):
269 raise TypeError('ssl argument must be an SSLContext or None')
270
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200271 if ssl_handshake_timeout is not None and not ssl:
272 raise ValueError(
273 'ssl_handshake_timeout is only meaningful with ssl')
274
Yury Selivanovb057c522014-02-18 12:15:06 -0500275 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200276 if sock is not None:
277 raise ValueError(
278 'path and sock can not be specified at the same time')
279
Andrew Svetlovcc839202017-11-29 18:23:43 +0200280 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500281 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
282
Yury Selivanov908d55d2016-10-09 12:15:08 -0400283 # Check for abstract socket. `str` and `bytes` paths are supported.
284 if path[0] not in (0, '\x00'):
285 try:
286 if stat.S_ISSOCK(os.stat(path).st_mode):
287 os.remove(path)
288 except FileNotFoundError:
289 pass
290 except OSError as err:
291 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200292 logger.error('Unable to check or remove stale UNIX socket '
293 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400294
Yury Selivanovb057c522014-02-18 12:15:06 -0500295 try:
296 sock.bind(path)
297 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100298 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500299 if exc.errno == errno.EADDRINUSE:
300 # Let's improve the error message by adding
301 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500302 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500303 raise OSError(errno.EADDRINUSE, msg) from None
304 else:
305 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200306 except:
307 sock.close()
308 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500309 else:
310 if sock is None:
311 raise ValueError(
312 'path was not specified, and no sock specified')
313
Yury Selivanov36e7e972016-10-07 12:39:57 -0400314 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500315 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500316 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500317 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500318
Yury Selivanovb057c522014-02-18 12:15:06 -0500319 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500320 server = base_events.Server(self, [sock], protocol_factory,
321 ssl, backlog, ssl_handshake_timeout)
322 if start_serving:
323 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400324 # Skip one loop iteration so that all 'loop.add_reader'
325 # go through.
326 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500327
Yury Selivanovb057c522014-02-18 12:15:06 -0500328 return server
329
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200330 async def _sock_sendfile_native(self, sock, file, offset, count):
331 try:
332 os.sendfile
Pablo Galindo293dd232019-11-19 21:34:03 +0000333 except AttributeError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700334 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200335 "os.sendfile() is not available")
336 try:
337 fileno = file.fileno()
338 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700339 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200340 try:
341 fsize = os.fstat(fileno).st_size
Pablo Galindo293dd232019-11-19 21:34:03 +0000342 except OSError:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700343 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200344 blocksize = count if count else fsize
345 if not blocksize:
346 return 0 # empty file
347
348 fut = self.create_future()
349 self._sock_sendfile_native_impl(fut, None, sock, fileno,
350 offset, count, blocksize, 0)
351 return await fut
352
353 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
354 offset, count, blocksize, total_sent):
355 fd = sock.fileno()
356 if registered_fd is not None:
357 # Remove the callback early. It should be rare that the
358 # selector says the fd is ready but the call still returns
359 # EAGAIN, and I am willing to take a hit in that case in
360 # order to simplify the common case.
361 self.remove_writer(registered_fd)
362 if fut.cancelled():
363 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
364 return
365 if count:
366 blocksize = count - total_sent
367 if blocksize <= 0:
368 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
369 fut.set_result(total_sent)
370 return
371
372 try:
373 sent = os.sendfile(fd, fileno, offset, blocksize)
374 except (BlockingIOError, InterruptedError):
375 if registered_fd is None:
376 self._sock_add_cancellation_callback(fut, sock)
377 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
378 fd, sock, fileno,
379 offset, count, blocksize, total_sent)
380 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500381 if (registered_fd is not None and
382 exc.errno == errno.ENOTCONN and
383 type(exc) is not ConnectionError):
384 # If we have an ENOTCONN and this isn't a first call to
385 # sendfile(), i.e. the connection was closed in the middle
386 # of the operation, normalize the error to ConnectionError
387 # to make it consistent across all Posix systems.
388 new_exc = ConnectionError(
389 "socket is not connected", errno.ENOTCONN)
390 new_exc.__cause__ = exc
391 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200392 if total_sent == 0:
393 # We can get here for different reasons, the main
394 # one being 'file' is not a regular mmap(2)-like
395 # file, in which case we'll fall back on using
396 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700397 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200398 "os.sendfile call failed")
399 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400 fut.set_exception(err)
401 else:
402 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
403 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200404 except (SystemExit, KeyboardInterrupt):
405 raise
406 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200407 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
408 fut.set_exception(exc)
409 else:
410 if sent == 0:
411 # EOF
412 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413 fut.set_result(total_sent)
414 else:
415 offset += sent
416 total_sent += sent
417 if registered_fd is None:
418 self._sock_add_cancellation_callback(fut, sock)
419 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
420 fd, sock, fileno,
421 offset, count, blocksize, total_sent)
422
423 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
424 if total_sent > 0:
425 os.lseek(fileno, offset, os.SEEK_SET)
426
427 def _sock_add_cancellation_callback(self, fut, sock):
428 def cb(fut):
429 if fut.cancelled():
430 fd = sock.fileno()
431 if fd != -1:
432 self.remove_writer(fd)
433 fut.add_done_callback(cb)
434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436class _UnixReadPipeTransport(transports.ReadTransport):
437
Yury Selivanovdec1a452014-02-18 22:27:48 -0500438 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439
440 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441 super().__init__(extra)
442 self._extra['pipe'] = pipe
443 self._loop = loop
444 self._pipe = pipe
445 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700446 self._protocol = protocol
447 self._closing = False
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300448 self._paused = False
Guido van Rossum47867872016-08-31 09:42:38 -0700449
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700450 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800451 if not (stat.S_ISFIFO(mode) or
452 stat.S_ISSOCK(mode) or
453 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700454 self._pipe = None
455 self._fileno = None
456 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700457 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700458
Andrew Svetlovcc839202017-11-29 18:23:43 +0200459 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700460
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100462 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400463 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100464 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100466 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500467 self._loop.call_soon(futures._set_result_unless_cancelled,
468 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Victor Stinnere912e652014-07-12 03:11:53 +0200470 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100471 info = [self.__class__.__name__]
472 if self._pipe is None:
473 info.append('closed')
474 elif self._closing:
475 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500476 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400477 selector = getattr(self._loop, '_selector', None)
478 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200479 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500480 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200481 if polling:
482 info.append('polling')
483 else:
484 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400485 elif self._pipe is not None:
486 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200487 else:
488 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500489 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200490
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 def _read_ready(self):
492 try:
493 data = os.read(self._fileno, self.max_size)
494 except (BlockingIOError, InterruptedError):
495 pass
496 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100497 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 else:
499 if data:
500 self._protocol.data_received(data)
501 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200502 if self._loop.get_debug():
503 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400505 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 self._loop.call_soon(self._protocol.eof_received)
507 self._loop.call_soon(self._call_connection_lost, None)
508
Guido van Rossum57497ad2013-10-18 07:58:20 -0700509 def pause_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300510 if self._closing or self._paused:
511 return
512 self._paused = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400513 self._loop._remove_reader(self._fileno)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300514 if self._loop.get_debug():
515 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516
Guido van Rossum57497ad2013-10-18 07:58:20 -0700517 def resume_reading(self):
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300518 if self._closing or not self._paused:
519 return
520 self._paused = False
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400521 self._loop._add_reader(self._fileno, self._read_ready)
Andrew Svetlov58498bc2019-09-29 15:00:35 +0300522 if self._loop.get_debug():
523 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400525 def set_protocol(self, protocol):
526 self._protocol = protocol
527
528 def get_protocol(self):
529 return self._protocol
530
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500531 def is_closing(self):
532 return self._closing
533
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 def close(self):
535 if not self._closing:
536 self._close(None)
537
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100538 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900539 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100540 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900541 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100542
Victor Stinner0ee29c22014-02-19 01:40:41 +0100543 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700544 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200545 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
546 if self._loop.get_debug():
547 logger.debug("%r: %s", self, message, exc_info=True)
548 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500549 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100550 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500551 'exception': exc,
552 'transport': self,
553 'protocol': self._protocol,
554 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 self._close(exc)
556
557 def _close(self, exc):
558 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400559 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 self._loop.call_soon(self._call_connection_lost, exc)
561
562 def _call_connection_lost(self, exc):
563 try:
564 self._protocol.connection_lost(exc)
565 finally:
566 self._pipe.close()
567 self._pipe = None
568 self._protocol = None
569 self._loop = None
570
571
Yury Selivanov3cb99142014-02-18 18:41:13 -0500572class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800573 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574
575 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100576 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 self._pipe = pipe
579 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400581 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 self._conn_lost = 0
583 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700584
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700585 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700586 is_char = stat.S_ISCHR(mode)
587 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700588 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700589 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700590 self._pipe = None
591 self._fileno = None
592 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100593 raise ValueError("Pipe transport is only for "
594 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700595
Andrew Svetlovcc839202017-11-29 18:23:43 +0200596 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100598
599 # On AIX, the reader trick (to be notified when the read end of the
600 # socket is closed) only works for sockets. On other platforms it
601 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700602 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100603 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400604 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100605 self._fileno, self._read_ready)
606
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100608 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500609 self._loop.call_soon(futures._set_result_unless_cancelled,
610 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611
Victor Stinnere912e652014-07-12 03:11:53 +0200612 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100613 info = [self.__class__.__name__]
614 if self._pipe is None:
615 info.append('closed')
616 elif self._closing:
617 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500618 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400619 selector = getattr(self._loop, '_selector', None)
620 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200621 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500622 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200623 if polling:
624 info.append('polling')
625 else:
626 info.append('idle')
627
628 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500629 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400630 elif self._pipe is not None:
631 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200632 else:
633 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500634 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200635
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800636 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400637 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800638
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700640 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200641 if self._loop.get_debug():
642 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100643 if self._buffer:
644 self._close(BrokenPipeError())
645 else:
646 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647
648 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800649 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
650 if isinstance(data, bytearray):
651 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700652 if not data:
653 return
654
655 if self._conn_lost or self._closing:
656 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700657 logger.warning('pipe closed by peer or '
658 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 self._conn_lost += 1
660 return
661
662 if not self._buffer:
663 # Attempt to send it right away first.
664 try:
665 n = os.write(self._fileno, data)
666 except (BlockingIOError, InterruptedError):
667 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200668 except (SystemExit, KeyboardInterrupt):
669 raise
670 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100672 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 return
674 if n == len(data):
675 return
676 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400677 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400678 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400680 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800681 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400684 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400687 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400689 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200690 except (SystemExit, KeyboardInterrupt):
691 raise
692 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400693 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 self._conn_lost += 1
695 # Remove writer here, _fatal_error() doesn't it
696 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400697 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100698 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400700 if n == len(self._buffer):
701 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400702 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800703 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400704 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400705 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 self._call_connection_lost(None)
707 return
708 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400709 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710
711 def can_write_eof(self):
712 return True
713
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 def write_eof(self):
715 if self._closing:
716 return
717 assert self._pipe
718 self._closing = True
719 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400720 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 self._loop.call_soon(self._call_connection_lost, None)
722
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400723 def set_protocol(self, protocol):
724 self._protocol = protocol
725
726 def get_protocol(self):
727 return self._protocol
728
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500729 def is_closing(self):
730 return self._closing
731
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100733 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 # write_eof is all what we needed to close the write pipe
735 self.write_eof()
736
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100737 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900738 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100739 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900740 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100741
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 def abort(self):
743 self._close(None)
744
Victor Stinner0ee29c22014-02-19 01:40:41 +0100745 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300747 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200748 if self._loop.get_debug():
749 logger.debug("%r: %s", self, message, exc_info=True)
750 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500751 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100752 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500753 'exception': exc,
754 'transport': self,
755 'protocol': self._protocol,
756 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757 self._close(exc)
758
759 def _close(self, exc=None):
760 self._closing = True
761 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400762 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700763 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400764 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765 self._loop.call_soon(self._call_connection_lost, exc)
766
767 def _call_connection_lost(self, exc):
768 try:
769 self._protocol.connection_lost(exc)
770 finally:
771 self._pipe.close()
772 self._pipe = None
773 self._protocol = None
774 self._loop = None
775
776
Guido van Rossum59691282013-10-30 14:52:03 -0700777class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
Guido van Rossum59691282013-10-30 14:52:03 -0700779 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700780 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700782 # Use a socket pair for stdin, since not all platforms
783 # support selecting read events on the write end of a
784 # socket (which we use in order to detect closing of the
785 # other end). Notably this is needed on AIX, and works
786 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100787 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200788 try:
789 self._proc = subprocess.Popen(
790 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
791 universal_newlines=False, bufsize=bufsize, **kwargs)
792 if stdin_w is not None:
793 stdin.close()
794 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
795 stdin_w = None
796 finally:
797 if stdin_w is not None:
798 stdin.close()
799 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800
801
802class AbstractChildWatcher:
803 """Abstract base class for monitoring child processes.
804
805 Objects derived from this class monitor a collection of subprocesses and
806 report their termination or interruption by a signal.
807
808 New callbacks are registered with .add_child_handler(). Starting a new
809 process must be done within a 'with' block to allow the watcher to suspend
810 its activity until the new process if fully registered (this is needed to
811 prevent a race condition in some implementations).
812
813 Example:
814 with watcher:
815 proc = subprocess.Popen("sleep 1")
816 watcher.add_child_handler(proc.pid, callback)
817
818 Notes:
819 Implementations of this class must be thread-safe.
820
821 Since child watcher objects may catch the SIGCHLD signal and call
822 waitpid(-1), there should be only one active object per process.
823 """
824
825 def add_child_handler(self, pid, callback, *args):
826 """Register a new child handler.
827
828 Arrange for callback(pid, returncode, *args) to be called when
829 process 'pid' terminates. Specifying another callback for the same
830 process replaces the previous handler.
831
Victor Stinneracdb7822014-07-14 18:33:40 +0200832 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800833 """
834 raise NotImplementedError()
835
836 def remove_child_handler(self, pid):
837 """Removes the handler for process 'pid'.
838
839 The function returns True if the handler was successfully removed,
840 False if there was nothing to remove."""
841
842 raise NotImplementedError()
843
Guido van Rossum2bcae702013-11-13 15:50:08 -0800844 def attach_loop(self, loop):
845 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800846
Guido van Rossum2bcae702013-11-13 15:50:08 -0800847 If the watcher was previously attached to an event loop, then it is
848 first detached before attaching to the new loop.
849
850 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800851 """
852 raise NotImplementedError()
853
854 def close(self):
855 """Close the watcher.
856
857 This must be called to make sure that any underlying resource is freed.
858 """
859 raise NotImplementedError()
860
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300861 def is_active(self):
862 """Return ``True`` if the watcher is active and is used by the event loop.
863
864 Return True if the watcher is installed and ready to handle process exit
865 notifications.
866
867 """
868 raise NotImplementedError()
869
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870 def __enter__(self):
871 """Enter the watcher's context and allow starting new processes
872
873 This function must return self"""
874 raise NotImplementedError()
875
876 def __exit__(self, a, b, c):
877 """Exit the watcher's context"""
878 raise NotImplementedError()
879
880
Benjamin Peterson3ccdd9b2019-11-13 19:08:50 -0800881class PidfdChildWatcher(AbstractChildWatcher):
882 """Child watcher implementation using Linux's pid file descriptors.
883
884 This child watcher polls process file descriptors (pidfds) to await child
885 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
886 child watcher implementation. It doesn't require signals or threads, doesn't
887 interfere with any processes launched outside the event loop, and scales
888 linearly with the number of subprocesses launched by the event loop. The
889 main disadvantage is that pidfds are specific to Linux, and only work on
890 recent (5.3+) kernels.
891 """
892
893 def __init__(self):
894 self._loop = None
895 self._callbacks = {}
896
897 def __enter__(self):
898 return self
899
900 def __exit__(self, exc_type, exc_value, exc_traceback):
901 pass
902
903 def is_active(self):
904 return self._loop is not None and self._loop.is_running()
905
906 def close(self):
907 self.attach_loop(None)
908
909 def attach_loop(self, loop):
910 if self._loop is not None and loop is None and self._callbacks:
911 warnings.warn(
912 'A loop is being detached '
913 'from a child watcher with pending handlers',
914 RuntimeWarning)
915 for pidfd, _, _ in self._callbacks.values():
916 self._loop._remove_reader(pidfd)
917 os.close(pidfd)
918 self._callbacks.clear()
919 self._loop = loop
920
921 def add_child_handler(self, pid, callback, *args):
922 existing = self._callbacks.get(pid)
923 if existing is not None:
924 self._callbacks[pid] = existing[0], callback, args
925 else:
926 pidfd = os.pidfd_open(pid)
927 self._loop._add_reader(pidfd, self._do_wait, pid)
928 self._callbacks[pid] = pidfd, callback, args
929
930 def _do_wait(self, pid):
931 pidfd, callback, args = self._callbacks.pop(pid)
932 self._loop._remove_reader(pidfd)
933 _, status = os.waitpid(pid, 0)
934 os.close(pidfd)
935 returncode = _compute_returncode(status)
936 callback(pid, returncode, *args)
937
938 def remove_child_handler(self, pid):
939 try:
940 pidfd, _, _ = self._callbacks.pop(pid)
941 except KeyError:
942 return False
943 self._loop._remove_reader(pidfd)
944 os.close(pidfd)
945 return True
946
947
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300948def _compute_returncode(status):
949 if os.WIFSIGNALED(status):
950 # The child process died because of a signal.
951 return -os.WTERMSIG(status)
952 elif os.WIFEXITED(status):
953 # The child process exited (e.g sys.exit()).
954 return os.WEXITSTATUS(status)
955 else:
956 # The child exited, but we don't understand its status.
957 # This shouldn't happen, but if it does, let's just
958 # return that status; perhaps that helps debug it.
959 return status
960
961
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800962class BaseChildWatcher(AbstractChildWatcher):
963
Guido van Rossum2bcae702013-11-13 15:50:08 -0800964 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800965 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400966 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800967
968 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800969 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800970
Andrew Svetlov0d671c02019-06-30 12:54:59 +0300971 def is_active(self):
972 return self._loop is not None and self._loop.is_running()
973
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800974 def _do_waitpid(self, expected_pid):
975 raise NotImplementedError()
976
977 def _do_waitpid_all(self):
978 raise NotImplementedError()
979
Guido van Rossum2bcae702013-11-13 15:50:08 -0800980 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981 assert loop is None or isinstance(loop, events.AbstractEventLoop)
982
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400983 if self._loop is not None and loop is None and self._callbacks:
984 warnings.warn(
985 'A loop is being detached '
986 'from a child watcher with pending handlers',
987 RuntimeWarning)
988
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800989 if self._loop is not None:
990 self._loop.remove_signal_handler(signal.SIGCHLD)
991
992 self._loop = loop
993 if loop is not None:
994 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
995
996 # Prevent a race condition in case a child terminated
997 # during the switch.
998 self._do_waitpid_all()
999
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001000 def _sig_chld(self):
1001 try:
1002 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +02001003 except (SystemExit, KeyboardInterrupt):
1004 raise
1005 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -05001006 # self._loop should always be available here
1007 # as '_sig_chld' is added as a signal handler
1008 # in 'attach_loop'
1009 self._loop.call_exception_handler({
1010 'message': 'Unknown exception in SIGCHLD handler',
1011 'exception': exc,
1012 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001013
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001014
1015class SafeChildWatcher(BaseChildWatcher):
1016 """'Safe' child watcher implementation.
1017
1018 This implementation avoids disrupting other code spawning processes by
1019 polling explicitly each process in the SIGCHLD handler instead of calling
1020 os.waitpid(-1).
1021
1022 This is a safe solution but it has a significant overhead when handling a
1023 big number of children (O(n) each time SIGCHLD is raised)
1024 """
1025
Guido van Rossum2bcae702013-11-13 15:50:08 -08001026 def close(self):
1027 self._callbacks.clear()
1028 super().close()
1029
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001030 def __enter__(self):
1031 return self
1032
1033 def __exit__(self, a, b, c):
1034 pass
1035
1036 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +01001037 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001038
1039 # Prevent a race condition in case the child is already terminated.
1040 self._do_waitpid(pid)
1041
Guido van Rossum2bcae702013-11-13 15:50:08 -08001042 def remove_child_handler(self, pid):
1043 try:
1044 del self._callbacks[pid]
1045 return True
1046 except KeyError:
1047 return False
1048
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001049 def _do_waitpid_all(self):
1050
1051 for pid in list(self._callbacks):
1052 self._do_waitpid(pid)
1053
1054 def _do_waitpid(self, expected_pid):
1055 assert expected_pid > 0
1056
1057 try:
1058 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1059 except ChildProcessError:
1060 # The child process is already reaped
1061 # (may happen if waitpid() is called elsewhere).
1062 pid = expected_pid
1063 returncode = 255
1064 logger.warning(
1065 "Unknown child process pid %d, will report returncode 255",
1066 pid)
1067 else:
1068 if pid == 0:
1069 # The child process is still alive.
1070 return
1071
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001072 returncode = _compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +02001073 if self._loop.get_debug():
1074 logger.debug('process %s exited with returncode %s',
1075 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001076
1077 try:
1078 callback, args = self._callbacks.pop(pid)
1079 except KeyError: # pragma: no cover
1080 # May happen if .remove_child_handler() is called
1081 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +02001082 if self._loop.get_debug():
1083 logger.warning("Child watcher got an unexpected pid: %r",
1084 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001085 else:
1086 callback(pid, returncode, *args)
1087
1088
1089class FastChildWatcher(BaseChildWatcher):
1090 """'Fast' child watcher implementation.
1091
1092 This implementation reaps every terminated processes by calling
1093 os.waitpid(-1) directly, possibly breaking other code spawning processes
1094 and waiting for their termination.
1095
1096 There is no noticeable overhead when handling a big number of children
1097 (O(1) each time a child terminates).
1098 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001099 def __init__(self):
1100 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001101 self._lock = threading.Lock()
1102 self._zombies = {}
1103 self._forks = 0
1104
1105 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001106 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001107 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001108 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001109
1110 def __enter__(self):
1111 with self._lock:
1112 self._forks += 1
1113
1114 return self
1115
1116 def __exit__(self, a, b, c):
1117 with self._lock:
1118 self._forks -= 1
1119
1120 if self._forks or not self._zombies:
1121 return
1122
1123 collateral_victims = str(self._zombies)
1124 self._zombies.clear()
1125
1126 logger.warning(
1127 "Caught subprocesses termination from unknown pids: %s",
1128 collateral_victims)
1129
1130 def add_child_handler(self, pid, callback, *args):
1131 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001132
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001133 with self._lock:
1134 try:
1135 returncode = self._zombies.pop(pid)
1136 except KeyError:
1137 # The child is running.
1138 self._callbacks[pid] = callback, args
1139 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001140
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001141 # The child is dead already. We can fire the callback.
1142 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001143
Guido van Rossum2bcae702013-11-13 15:50:08 -08001144 def remove_child_handler(self, pid):
1145 try:
1146 del self._callbacks[pid]
1147 return True
1148 except KeyError:
1149 return False
1150
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001151 def _do_waitpid_all(self):
1152 # Because of signal coalescing, we must keep calling waitpid() as
1153 # long as we're able to reap a child.
1154 while True:
1155 try:
1156 pid, status = os.waitpid(-1, os.WNOHANG)
1157 except ChildProcessError:
1158 # No more child processes exist.
1159 return
1160 else:
1161 if pid == 0:
1162 # A child process is still alive.
1163 return
1164
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001165 returncode = _compute_returncode(status)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001166
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001167 with self._lock:
1168 try:
1169 callback, args = self._callbacks.pop(pid)
1170 except KeyError:
1171 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001172 if self._forks:
1173 # It may not be registered yet.
1174 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001175 if self._loop.get_debug():
1176 logger.debug('unknown process %s exited '
1177 'with returncode %s',
1178 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001179 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001180 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001181 else:
1182 if self._loop.get_debug():
1183 logger.debug('process %s exited with returncode %s',
1184 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001185
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001186 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001187 logger.warning(
1188 "Caught subprocess termination from unknown pid: "
1189 "%d -> %d", pid, returncode)
1190 else:
1191 callback(pid, returncode, *args)
1192
1193
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001194class MultiLoopChildWatcher(AbstractChildWatcher):
1195 """A watcher that doesn't require running loop in the main thread.
1196
1197 This implementation registers a SIGCHLD signal handler on
1198 instantiation (which may conflict with other code that
1199 install own handler for this signal).
1200
1201 The solution is safe but it has a significant overhead when
1202 handling a big number of processes (*O(n)* each time a
1203 SIGCHLD is received).
1204 """
1205
1206 # Implementation note:
1207 # The class keeps compatibility with AbstractChildWatcher ABC
1208 # To achieve this it has empty attach_loop() method
1209 # and doesn't accept explicit loop argument
1210 # for add_child_handler()/remove_child_handler()
1211 # but retrieves the current loop by get_running_loop()
1212
1213 def __init__(self):
1214 self._callbacks = {}
1215 self._saved_sighandler = None
1216
1217 def is_active(self):
1218 return self._saved_sighandler is not None
1219
1220 def close(self):
1221 self._callbacks.clear()
1222 if self._saved_sighandler is not None:
1223 handler = signal.getsignal(signal.SIGCHLD)
1224 if handler != self._sig_chld:
1225 logger.warning("SIGCHLD handler was changed by outside code")
1226 else:
1227 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1228 self._saved_sighandler = None
1229
1230 def __enter__(self):
1231 return self
1232
1233 def __exit__(self, exc_type, exc_val, exc_tb):
1234 pass
1235
1236 def add_child_handler(self, pid, callback, *args):
1237 loop = events.get_running_loop()
1238 self._callbacks[pid] = (loop, callback, args)
1239
1240 # Prevent a race condition in case the child is already terminated.
1241 self._do_waitpid(pid)
1242
1243 def remove_child_handler(self, pid):
1244 try:
1245 del self._callbacks[pid]
1246 return True
1247 except KeyError:
1248 return False
1249
1250 def attach_loop(self, loop):
1251 # Don't save the loop but initialize itself if called first time
1252 # The reason to do it here is that attach_loop() is called from
1253 # unix policy only for the main thread.
1254 # Main thread is required for subscription on SIGCHLD signal
1255 if self._saved_sighandler is None:
1256 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1257 if self._saved_sighandler is None:
1258 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1259 "restore to default handler on watcher close.")
1260 self._saved_sighandler = signal.SIG_DFL
1261
1262 # Set SA_RESTART to limit EINTR occurrences.
1263 signal.siginterrupt(signal.SIGCHLD, False)
1264
1265 def _do_waitpid_all(self):
1266 for pid in list(self._callbacks):
1267 self._do_waitpid(pid)
1268
1269 def _do_waitpid(self, expected_pid):
1270 assert expected_pid > 0
1271
1272 try:
1273 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1274 except ChildProcessError:
1275 # The child process is already reaped
1276 # (may happen if waitpid() is called elsewhere).
1277 pid = expected_pid
1278 returncode = 255
1279 logger.warning(
1280 "Unknown child process pid %d, will report returncode 255",
1281 pid)
1282 debug_log = False
1283 else:
1284 if pid == 0:
1285 # The child process is still alive.
1286 return
1287
1288 returncode = _compute_returncode(status)
1289 debug_log = True
1290 try:
1291 loop, callback, args = self._callbacks.pop(pid)
1292 except KeyError: # pragma: no cover
1293 # May happen if .remove_child_handler() is called
1294 # after os.waitpid() returns.
1295 logger.warning("Child watcher got an unexpected pid: %r",
1296 pid, exc_info=True)
1297 else:
1298 if loop.is_closed():
1299 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1300 else:
1301 if debug_log and loop.get_debug():
1302 logger.debug('process %s exited with returncode %s',
1303 expected_pid, returncode)
1304 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1305
1306 def _sig_chld(self, signum, frame):
1307 try:
1308 self._do_waitpid_all()
1309 except (SystemExit, KeyboardInterrupt):
1310 raise
1311 except BaseException:
1312 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1313
1314
1315class ThreadedChildWatcher(AbstractChildWatcher):
1316 """Threaded child watcher implementation.
1317
1318 The watcher uses a thread per process
1319 for waiting for the process finish.
1320
1321 It doesn't require subscription on POSIX signal
1322 but a thread creation is not free.
1323
Min ho Kim96e12d52019-07-22 06:12:33 +10001324 The watcher has O(1) complexity, its performance doesn't depend
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001325 on amount of spawn processes.
1326 """
1327
1328 def __init__(self):
1329 self._pid_counter = itertools.count(0)
1330 self._threads = {}
1331
1332 def is_active(self):
1333 return True
1334
1335 def close(self):
1336 pass
1337
1338 def __enter__(self):
1339 return self
1340
1341 def __exit__(self, exc_type, exc_val, exc_tb):
1342 pass
1343
1344 def __del__(self, _warn=warnings.warn):
1345 threads = [thread for thread in list(self._threads.values())
1346 if thread.is_alive()]
1347 if threads:
1348 _warn(f"{self.__class__} has registered but not finished child processes",
1349 ResourceWarning,
1350 source=self)
1351
1352 def add_child_handler(self, pid, callback, *args):
1353 loop = events.get_running_loop()
1354 thread = threading.Thread(target=self._do_waitpid,
1355 name=f"waitpid-{next(self._pid_counter)}",
1356 args=(loop, pid, callback, args),
1357 daemon=True)
1358 self._threads[pid] = thread
1359 thread.start()
1360
1361 def remove_child_handler(self, pid):
1362 # asyncio never calls remove_child_handler() !!!
1363 # The method is no-op but is implemented because
1364 # abstract base classe requires it
1365 return True
1366
1367 def attach_loop(self, loop):
1368 pass
1369
1370 def _do_waitpid(self, loop, expected_pid, callback, args):
1371 assert expected_pid > 0
1372
1373 try:
1374 pid, status = os.waitpid(expected_pid, 0)
1375 except ChildProcessError:
1376 # The child process is already reaped
1377 # (may happen if waitpid() is called elsewhere).
1378 pid = expected_pid
1379 returncode = 255
1380 logger.warning(
1381 "Unknown child process pid %d, will report returncode 255",
1382 pid)
1383 else:
1384 returncode = _compute_returncode(status)
1385 if loop.get_debug():
1386 logger.debug('process %s exited with returncode %s',
1387 expected_pid, returncode)
1388
1389 if loop.is_closed():
1390 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1391 else:
1392 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1393
1394 self._threads.pop(expected_pid)
1395
1396
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001397class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001398 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001399 _loop_factory = _UnixSelectorEventLoop
1400
1401 def __init__(self):
1402 super().__init__()
1403 self._watcher = None
1404
1405 def _init_watcher(self):
1406 with events._lock:
1407 if self._watcher is None: # pragma: no branch
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001408 self._watcher = ThreadedChildWatcher()
Hill Ma99eb70a2019-12-05 04:40:12 -08001409 if threading.current_thread() is threading.main_thread():
Guido van Rossum2bcae702013-11-13 15:50:08 -08001410 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001411
1412 def set_event_loop(self, loop):
1413 """Set the event loop.
1414
1415 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001416 .set_event_loop() from the main thread will call .attach_loop(loop) on
1417 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001418 """
1419
1420 super().set_event_loop(loop)
1421
Andrew Svetlovcc839202017-11-29 18:23:43 +02001422 if (self._watcher is not None and
Hill Ma99eb70a2019-12-05 04:40:12 -08001423 threading.current_thread() is threading.main_thread()):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001424 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001425
1426 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001427 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001428
Andrew Svetlov0d671c02019-06-30 12:54:59 +03001429 If not yet set, a ThreadedChildWatcher object is automatically created.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001430 """
1431 if self._watcher is None:
1432 self._init_watcher()
1433
1434 return self._watcher
1435
1436 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001437 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001438
1439 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1440
1441 if self._watcher is not None:
1442 self._watcher.close()
1443
1444 self._watcher = watcher
1445
Yury Selivanov6370f342017-12-10 18:36:12 -05001446
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001447SelectorEventLoop = _UnixSelectorEventLoop
1448DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy