blob: 28128d2977df64ef50a8643aa75b6eb152283238 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Victor Stinner47cd10d2015-01-30 00:05:19 +010022from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import selector_events
Yury Selivanovdbf10222018-05-28 14:31:28 -040024from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
32 'FastChildWatcher', 'DefaultEventLoopPolicy',
33)
34
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036if sys.platform == 'win32': # pragma: no cover
37 raise ImportError('Signals are not really supported on Windows')
38
39
Victor Stinnerfe5649c2014-07-17 22:43:40 +020040def _sighandler_noop(signum, frame):
41 """Dummy signal handler."""
42 pass
43
44
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080045class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050046 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
Yury Selivanovb057c522014-02-18 12:15:06 -050048 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """
50
51 def __init__(self, selector=None):
52 super().__init__(selector)
53 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020057 if not sys.is_finalizing():
58 for sig in list(self._signal_handlers):
59 self.remove_signal_handler(sig)
60 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020061 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020062 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020063 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020064 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020065 ResourceWarning,
66 source=self)
67 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080068
Victor Stinnerfe5649c2014-07-17 22:43:40 +020069 def _process_self_data(self, data):
70 for signum in data:
71 if not signum:
72 # ignore null bytes written by _write_to_self()
73 continue
74 self._handle_signal(signum)
75
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 def add_signal_handler(self, sig, callback, *args):
77 """Add a handler for a signal. UNIX only.
78
79 Raise ValueError if the signal number is invalid or uncatchable.
80 Raise RuntimeError if there is a problem setting up the handler.
81 """
Yury Selivanov6370f342017-12-10 18:36:12 -050082 if (coroutines.iscoroutine(callback) or
83 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010084 raise TypeError("coroutines cannot be used "
85 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010087 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088 try:
89 # set_wakeup_fd() raises ValueError if this is not the
90 # main thread. By calling it early we ensure that an
91 # event loop running in another thread cannot add a signal
92 # handler.
93 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020094 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 raise RuntimeError(str(exc))
96
Yury Selivanovf23746a2018-01-22 19:11:18 -050097 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._signal_handlers[sig] = handle
99
100 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200101 # Register a dummy signal handler to ask Python to write the signal
102 # number in the wakup file descriptor. _process_self_data() will
103 # read signal numbers from this file descriptor to handle signals.
104 signal.signal(sig, _sighandler_noop)
105
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100106 # Set SA_RESTART to limit EINTR occurrences.
107 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 except OSError as exc:
109 del self._signal_handlers[sig]
110 if not self._signal_handlers:
111 try:
112 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200113 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700114 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115
116 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 else:
119 raise
120
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200121 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 """Internal helper that is the actual signal handler."""
123 handle = self._signal_handlers.get(sig)
124 if handle is None:
125 return # Assume it's some race condition.
126 if handle._cancelled:
127 self.remove_signal_handler(sig) # Remove it properly.
128 else:
129 self._add_callback_signalsafe(handle)
130
131 def remove_signal_handler(self, sig):
132 """Remove a handler for a signal. UNIX only.
133
134 Return True if a signal handler was removed, False if not.
135 """
136 self._check_signal(sig)
137 try:
138 del self._signal_handlers[sig]
139 except KeyError:
140 return False
141
142 if sig == signal.SIGINT:
143 handler = signal.default_int_handler
144 else:
145 handler = signal.SIG_DFL
146
147 try:
148 signal.signal(sig, handler)
149 except OSError as exc:
150 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500151 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 else:
153 raise
154
155 if not self._signal_handlers:
156 try:
157 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200158 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700159 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160
161 return True
162
163 def _check_signal(self, sig):
164 """Internal helper to validate a signal.
165
166 Raise ValueError if the signal number is invalid or uncatchable.
167 Raise RuntimeError if there is a problem setting up the handler.
168 """
169 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500170 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200172 if sig not in signal.valid_signals():
173 raise ValueError(f'invalid signal number {sig}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174
175 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
176 extra=None):
177 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
178
179 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
180 extra=None):
181 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
182
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 async def _make_subprocess_transport(self, protocol, args, shell,
184 stdin, stdout, stderr, bufsize,
185 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400187 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 transp = _UnixSubprocessTransport(self, protocol, args, shell,
189 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100190 waiter=waiter, extra=extra,
191 **kwargs)
192
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800193 watcher.add_child_handler(transp.get_pid(),
194 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200197 except (SystemExit, KeyboardInterrupt):
198 raise
199 except BaseException:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100200 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200201 await transp._wait()
202 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800203
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 return transp
205
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800206 def _child_watcher_callback(self, pid, returncode, transp):
207 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208
Neil Aspinallf7686c12017-12-19 19:45:42 +0000209 async def create_unix_connection(
210 self, protocol_factory, path=None, *,
211 ssl=None, sock=None,
212 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200213 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500214 assert server_hostname is None or isinstance(server_hostname, str)
215 if ssl:
216 if server_hostname is None:
217 raise ValueError(
218 'you have to pass server_hostname when using ssl')
219 else:
220 if server_hostname is not None:
221 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200222 if ssl_handshake_timeout is not None:
223 raise ValueError(
224 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500225
226 if path is not None:
227 if sock is not None:
228 raise ValueError(
229 'path and sock can not be specified at the same time')
230
Andrew Svetlovcc839202017-11-29 18:23:43 +0200231 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100232 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500233 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200235 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100236 except:
237 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500238 raise
239
240 else:
241 if sock is None:
242 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400243 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500244 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400245 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500246 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500247 sock.setblocking(False)
248
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200249 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000250 sock, protocol_factory, ssl, server_hostname,
251 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500252 return transport, protocol
253
Neil Aspinallf7686c12017-12-19 19:45:42 +0000254 async def create_unix_server(
255 self, protocol_factory, path=None, *,
256 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500257 ssl_handshake_timeout=None,
258 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 if isinstance(ssl, bool):
260 raise TypeError('ssl argument must be an SSLContext or None')
261
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200262 if ssl_handshake_timeout is not None and not ssl:
263 raise ValueError(
264 'ssl_handshake_timeout is only meaningful with ssl')
265
Yury Selivanovb057c522014-02-18 12:15:06 -0500266 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200267 if sock is not None:
268 raise ValueError(
269 'path and sock can not be specified at the same time')
270
Andrew Svetlovcc839202017-11-29 18:23:43 +0200271 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500272 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
273
Yury Selivanov908d55d2016-10-09 12:15:08 -0400274 # Check for abstract socket. `str` and `bytes` paths are supported.
275 if path[0] not in (0, '\x00'):
276 try:
277 if stat.S_ISSOCK(os.stat(path).st_mode):
278 os.remove(path)
279 except FileNotFoundError:
280 pass
281 except OSError as err:
282 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200283 logger.error('Unable to check or remove stale UNIX socket '
284 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400285
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 try:
287 sock.bind(path)
288 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100289 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500290 if exc.errno == errno.EADDRINUSE:
291 # Let's improve the error message by adding
292 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500293 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500294 raise OSError(errno.EADDRINUSE, msg) from None
295 else:
296 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200297 except:
298 sock.close()
299 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500300 else:
301 if sock is None:
302 raise ValueError(
303 'path was not specified, and no sock specified')
304
Yury Selivanov36e7e972016-10-07 12:39:57 -0400305 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500306 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500307 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500308 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500309
Yury Selivanovb057c522014-02-18 12:15:06 -0500310 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500311 server = base_events.Server(self, [sock], protocol_factory,
312 ssl, backlog, ssl_handshake_timeout)
313 if start_serving:
314 server._start_serving()
Yury Selivanovdbf10222018-05-28 14:31:28 -0400315 # Skip one loop iteration so that all 'loop.add_reader'
316 # go through.
317 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500318
Yury Selivanovb057c522014-02-18 12:15:06 -0500319 return server
320
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200321 async def _sock_sendfile_native(self, sock, file, offset, count):
322 try:
323 os.sendfile
324 except AttributeError as exc:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700325 raise exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200326 "os.sendfile() is not available")
327 try:
328 fileno = file.fileno()
329 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700330 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200331 try:
332 fsize = os.fstat(fileno).st_size
333 except OSError as err:
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700334 raise exceptions.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200335 blocksize = count if count else fsize
336 if not blocksize:
337 return 0 # empty file
338
339 fut = self.create_future()
340 self._sock_sendfile_native_impl(fut, None, sock, fileno,
341 offset, count, blocksize, 0)
342 return await fut
343
344 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
345 offset, count, blocksize, total_sent):
346 fd = sock.fileno()
347 if registered_fd is not None:
348 # Remove the callback early. It should be rare that the
349 # selector says the fd is ready but the call still returns
350 # EAGAIN, and I am willing to take a hit in that case in
351 # order to simplify the common case.
352 self.remove_writer(registered_fd)
353 if fut.cancelled():
354 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
355 return
356 if count:
357 blocksize = count - total_sent
358 if blocksize <= 0:
359 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
360 fut.set_result(total_sent)
361 return
362
363 try:
364 sent = os.sendfile(fd, fileno, offset, blocksize)
365 except (BlockingIOError, InterruptedError):
366 if registered_fd is None:
367 self._sock_add_cancellation_callback(fut, sock)
368 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
369 fd, sock, fileno,
370 offset, count, blocksize, total_sent)
371 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500372 if (registered_fd is not None and
373 exc.errno == errno.ENOTCONN and
374 type(exc) is not ConnectionError):
375 # If we have an ENOTCONN and this isn't a first call to
376 # sendfile(), i.e. the connection was closed in the middle
377 # of the operation, normalize the error to ConnectionError
378 # to make it consistent across all Posix systems.
379 new_exc = ConnectionError(
380 "socket is not connected", errno.ENOTCONN)
381 new_exc.__cause__ = exc
382 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200383 if total_sent == 0:
384 # We can get here for different reasons, the main
385 # one being 'file' is not a regular mmap(2)-like
386 # file, in which case we'll fall back on using
387 # plain send().
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700388 err = exceptions.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200389 "os.sendfile call failed")
390 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
391 fut.set_exception(err)
392 else:
393 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394 fut.set_exception(exc)
Yury Selivanov431b5402019-05-27 14:45:12 +0200395 except (SystemExit, KeyboardInterrupt):
396 raise
397 except BaseException as exc:
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200398 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
399 fut.set_exception(exc)
400 else:
401 if sent == 0:
402 # EOF
403 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
404 fut.set_result(total_sent)
405 else:
406 offset += sent
407 total_sent += sent
408 if registered_fd is None:
409 self._sock_add_cancellation_callback(fut, sock)
410 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
411 fd, sock, fileno,
412 offset, count, blocksize, total_sent)
413
414 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
415 if total_sent > 0:
416 os.lseek(fileno, offset, os.SEEK_SET)
417
418 def _sock_add_cancellation_callback(self, fut, sock):
419 def cb(fut):
420 if fut.cancelled():
421 fd = sock.fileno()
422 if fd != -1:
423 self.remove_writer(fd)
424 fut.add_done_callback(cb)
425
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427class _UnixReadPipeTransport(transports.ReadTransport):
428
Yury Selivanovdec1a452014-02-18 22:27:48 -0500429 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
431 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
432 super().__init__(extra)
433 self._extra['pipe'] = pipe
434 self._loop = loop
435 self._pipe = pipe
436 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700437 self._protocol = protocol
438 self._closing = False
439
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700440 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800441 if not (stat.S_ISFIFO(mode) or
442 stat.S_ISSOCK(mode) or
443 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700444 self._pipe = None
445 self._fileno = None
446 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700447 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700448
Andrew Svetlovcc839202017-11-29 18:23:43 +0200449 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700450
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100452 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400453 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100454 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100456 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500457 self._loop.call_soon(futures._set_result_unless_cancelled,
458 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459
Victor Stinnere912e652014-07-12 03:11:53 +0200460 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100461 info = [self.__class__.__name__]
462 if self._pipe is None:
463 info.append('closed')
464 elif self._closing:
465 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500466 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400467 selector = getattr(self._loop, '_selector', None)
468 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200469 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500470 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200471 if polling:
472 info.append('polling')
473 else:
474 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400475 elif self._pipe is not None:
476 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200477 else:
478 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500479 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200480
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 def _read_ready(self):
482 try:
483 data = os.read(self._fileno, self.max_size)
484 except (BlockingIOError, InterruptedError):
485 pass
486 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100487 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 else:
489 if data:
490 self._protocol.data_received(data)
491 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200492 if self._loop.get_debug():
493 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400495 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 self._loop.call_soon(self._protocol.eof_received)
497 self._loop.call_soon(self._call_connection_lost, None)
498
Guido van Rossum57497ad2013-10-18 07:58:20 -0700499 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400500 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700501
Guido van Rossum57497ad2013-10-18 07:58:20 -0700502 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400503 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400505 def set_protocol(self, protocol):
506 self._protocol = protocol
507
508 def get_protocol(self):
509 return self._protocol
510
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500511 def is_closing(self):
512 return self._closing
513
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 def close(self):
515 if not self._closing:
516 self._close(None)
517
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100518 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900519 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100520 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900521 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100522
Victor Stinner0ee29c22014-02-19 01:40:41 +0100523 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200525 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
526 if self._loop.get_debug():
527 logger.debug("%r: %s", self, message, exc_info=True)
528 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500529 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100530 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500531 'exception': exc,
532 'transport': self,
533 'protocol': self._protocol,
534 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 self._close(exc)
536
537 def _close(self, exc):
538 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400539 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 self._loop.call_soon(self._call_connection_lost, exc)
541
542 def _call_connection_lost(self, exc):
543 try:
544 self._protocol.connection_lost(exc)
545 finally:
546 self._pipe.close()
547 self._pipe = None
548 self._protocol = None
549 self._loop = None
550
551
Yury Selivanov3cb99142014-02-18 18:41:13 -0500552class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800553 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554
555 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100556 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._pipe = pipe
559 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400561 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 self._conn_lost = 0
563 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700565 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700566 is_char = stat.S_ISCHR(mode)
567 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700568 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700569 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700570 self._pipe = None
571 self._fileno = None
572 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100573 raise ValueError("Pipe transport is only for "
574 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700575
Andrew Svetlovcc839202017-11-29 18:23:43 +0200576 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100578
579 # On AIX, the reader trick (to be notified when the read end of the
580 # socket is closed) only works for sockets. On other platforms it
581 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700582 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100583 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400584 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100585 self._fileno, self._read_ready)
586
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100588 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500589 self._loop.call_soon(futures._set_result_unless_cancelled,
590 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591
Victor Stinnere912e652014-07-12 03:11:53 +0200592 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100593 info = [self.__class__.__name__]
594 if self._pipe is None:
595 info.append('closed')
596 elif self._closing:
597 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500598 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400599 selector = getattr(self._loop, '_selector', None)
600 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200601 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500602 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200603 if polling:
604 info.append('polling')
605 else:
606 info.append('idle')
607
608 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500609 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400610 elif self._pipe is not None:
611 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200612 else:
613 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500614 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200615
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800616 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400617 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800618
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700620 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200621 if self._loop.get_debug():
622 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100623 if self._buffer:
624 self._close(BrokenPipeError())
625 else:
626 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
628 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800629 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
630 if isinstance(data, bytearray):
631 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 if not data:
633 return
634
635 if self._conn_lost or self._closing:
636 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700637 logger.warning('pipe closed by peer or '
638 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 self._conn_lost += 1
640 return
641
642 if not self._buffer:
643 # Attempt to send it right away first.
644 try:
645 n = os.write(self._fileno, data)
646 except (BlockingIOError, InterruptedError):
647 n = 0
Yury Selivanov431b5402019-05-27 14:45:12 +0200648 except (SystemExit, KeyboardInterrupt):
649 raise
650 except BaseException as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100652 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 return
654 if n == len(data):
655 return
656 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400657 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400658 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400660 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800661 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662
663 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400664 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400667 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400669 pass
Yury Selivanov431b5402019-05-27 14:45:12 +0200670 except (SystemExit, KeyboardInterrupt):
671 raise
672 except BaseException as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400673 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 self._conn_lost += 1
675 # Remove writer here, _fatal_error() doesn't it
676 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100678 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400680 if n == len(self._buffer):
681 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400682 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800683 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400684 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400685 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 self._call_connection_lost(None)
687 return
688 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400689 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690
691 def can_write_eof(self):
692 return True
693
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 def write_eof(self):
695 if self._closing:
696 return
697 assert self._pipe
698 self._closing = True
699 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400700 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 self._loop.call_soon(self._call_connection_lost, None)
702
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400703 def set_protocol(self, protocol):
704 self._protocol = protocol
705
706 def get_protocol(self):
707 return self._protocol
708
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500709 def is_closing(self):
710 return self._closing
711
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100713 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 # write_eof is all what we needed to close the write pipe
715 self.write_eof()
716
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100717 def __del__(self, _warn=warnings.warn):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900718 if self._pipe is not None:
Victor Stinnerfb2c3462019-01-10 11:24:40 +0100719 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900720 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100721
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 def abort(self):
723 self._close(None)
724
Victor Stinner0ee29c22014-02-19 01:40:41 +0100725 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726 # should be called by exception handler only
Andrew Svetlov1f39c282019-05-27 16:28:34 +0300727 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200728 if self._loop.get_debug():
729 logger.debug("%r: %s", self, message, exc_info=True)
730 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500731 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100732 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500733 'exception': exc,
734 'transport': self,
735 'protocol': self._protocol,
736 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 self._close(exc)
738
739 def _close(self, exc=None):
740 self._closing = True
741 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400742 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400744 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 self._loop.call_soon(self._call_connection_lost, exc)
746
747 def _call_connection_lost(self, exc):
748 try:
749 self._protocol.connection_lost(exc)
750 finally:
751 self._pipe.close()
752 self._pipe = None
753 self._protocol = None
754 self._loop = None
755
756
Guido van Rossum59691282013-10-30 14:52:03 -0700757class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700758
Guido van Rossum59691282013-10-30 14:52:03 -0700759 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700760 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700762 # Use a socket pair for stdin, since not all platforms
763 # support selecting read events on the write end of a
764 # socket (which we use in order to detect closing of the
765 # other end). Notably this is needed on AIX, and works
766 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100767 stdin, stdin_w = socket.socketpair()
Niklas Fiekas9932fd92019-05-20 14:02:17 +0200768 try:
769 self._proc = subprocess.Popen(
770 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
771 universal_newlines=False, bufsize=bufsize, **kwargs)
772 if stdin_w is not None:
773 stdin.close()
774 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
775 stdin_w = None
776 finally:
777 if stdin_w is not None:
778 stdin.close()
779 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780
781
782class AbstractChildWatcher:
783 """Abstract base class for monitoring child processes.
784
785 Objects derived from this class monitor a collection of subprocesses and
786 report their termination or interruption by a signal.
787
788 New callbacks are registered with .add_child_handler(). Starting a new
789 process must be done within a 'with' block to allow the watcher to suspend
790 its activity until the new process if fully registered (this is needed to
791 prevent a race condition in some implementations).
792
793 Example:
794 with watcher:
795 proc = subprocess.Popen("sleep 1")
796 watcher.add_child_handler(proc.pid, callback)
797
798 Notes:
799 Implementations of this class must be thread-safe.
800
801 Since child watcher objects may catch the SIGCHLD signal and call
802 waitpid(-1), there should be only one active object per process.
803 """
804
805 def add_child_handler(self, pid, callback, *args):
806 """Register a new child handler.
807
808 Arrange for callback(pid, returncode, *args) to be called when
809 process 'pid' terminates. Specifying another callback for the same
810 process replaces the previous handler.
811
Victor Stinneracdb7822014-07-14 18:33:40 +0200812 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800813 """
814 raise NotImplementedError()
815
816 def remove_child_handler(self, pid):
817 """Removes the handler for process 'pid'.
818
819 The function returns True if the handler was successfully removed,
820 False if there was nothing to remove."""
821
822 raise NotImplementedError()
823
Guido van Rossum2bcae702013-11-13 15:50:08 -0800824 def attach_loop(self, loop):
825 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800826
Guido van Rossum2bcae702013-11-13 15:50:08 -0800827 If the watcher was previously attached to an event loop, then it is
828 first detached before attaching to the new loop.
829
830 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800831 """
832 raise NotImplementedError()
833
834 def close(self):
835 """Close the watcher.
836
837 This must be called to make sure that any underlying resource is freed.
838 """
839 raise NotImplementedError()
840
841 def __enter__(self):
842 """Enter the watcher's context and allow starting new processes
843
844 This function must return self"""
845 raise NotImplementedError()
846
847 def __exit__(self, a, b, c):
848 """Exit the watcher's context"""
849 raise NotImplementedError()
850
851
852class BaseChildWatcher(AbstractChildWatcher):
853
Guido van Rossum2bcae702013-11-13 15:50:08 -0800854 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400856 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800857
858 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800859 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800860
861 def _do_waitpid(self, expected_pid):
862 raise NotImplementedError()
863
864 def _do_waitpid_all(self):
865 raise NotImplementedError()
866
Guido van Rossum2bcae702013-11-13 15:50:08 -0800867 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868 assert loop is None or isinstance(loop, events.AbstractEventLoop)
869
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400870 if self._loop is not None and loop is None and self._callbacks:
871 warnings.warn(
872 'A loop is being detached '
873 'from a child watcher with pending handlers',
874 RuntimeWarning)
875
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800876 if self._loop is not None:
877 self._loop.remove_signal_handler(signal.SIGCHLD)
878
879 self._loop = loop
880 if loop is not None:
881 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
882
883 # Prevent a race condition in case a child terminated
884 # during the switch.
885 self._do_waitpid_all()
886
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800887 def _sig_chld(self):
888 try:
889 self._do_waitpid_all()
Yury Selivanov431b5402019-05-27 14:45:12 +0200890 except (SystemExit, KeyboardInterrupt):
891 raise
892 except BaseException as exc:
Yury Selivanov569efa22014-02-18 18:02:19 -0500893 # self._loop should always be available here
894 # as '_sig_chld' is added as a signal handler
895 # in 'attach_loop'
896 self._loop.call_exception_handler({
897 'message': 'Unknown exception in SIGCHLD handler',
898 'exception': exc,
899 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
901 def _compute_returncode(self, status):
902 if os.WIFSIGNALED(status):
903 # The child process died because of a signal.
904 return -os.WTERMSIG(status)
905 elif os.WIFEXITED(status):
906 # The child process exited (e.g sys.exit()).
907 return os.WEXITSTATUS(status)
908 else:
909 # The child exited, but we don't understand its status.
910 # This shouldn't happen, but if it does, let's just
911 # return that status; perhaps that helps debug it.
912 return status
913
914
915class SafeChildWatcher(BaseChildWatcher):
916 """'Safe' child watcher implementation.
917
918 This implementation avoids disrupting other code spawning processes by
919 polling explicitly each process in the SIGCHLD handler instead of calling
920 os.waitpid(-1).
921
922 This is a safe solution but it has a significant overhead when handling a
923 big number of children (O(n) each time SIGCHLD is raised)
924 """
925
Guido van Rossum2bcae702013-11-13 15:50:08 -0800926 def close(self):
927 self._callbacks.clear()
928 super().close()
929
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800930 def __enter__(self):
931 return self
932
933 def __exit__(self, a, b, c):
934 pass
935
936 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400937 if self._loop is None:
938 raise RuntimeError(
939 "Cannot add child handler, "
940 "the child watcher does not have a loop attached")
941
Victor Stinner47cd10d2015-01-30 00:05:19 +0100942 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800943
944 # Prevent a race condition in case the child is already terminated.
945 self._do_waitpid(pid)
946
Guido van Rossum2bcae702013-11-13 15:50:08 -0800947 def remove_child_handler(self, pid):
948 try:
949 del self._callbacks[pid]
950 return True
951 except KeyError:
952 return False
953
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800954 def _do_waitpid_all(self):
955
956 for pid in list(self._callbacks):
957 self._do_waitpid(pid)
958
959 def _do_waitpid(self, expected_pid):
960 assert expected_pid > 0
961
962 try:
963 pid, status = os.waitpid(expected_pid, os.WNOHANG)
964 except ChildProcessError:
965 # The child process is already reaped
966 # (may happen if waitpid() is called elsewhere).
967 pid = expected_pid
968 returncode = 255
969 logger.warning(
970 "Unknown child process pid %d, will report returncode 255",
971 pid)
972 else:
973 if pid == 0:
974 # The child process is still alive.
975 return
976
977 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200978 if self._loop.get_debug():
979 logger.debug('process %s exited with returncode %s',
980 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981
982 try:
983 callback, args = self._callbacks.pop(pid)
984 except KeyError: # pragma: no cover
985 # May happen if .remove_child_handler() is called
986 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200987 if self._loop.get_debug():
988 logger.warning("Child watcher got an unexpected pid: %r",
989 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800990 else:
991 callback(pid, returncode, *args)
992
993
994class FastChildWatcher(BaseChildWatcher):
995 """'Fast' child watcher implementation.
996
997 This implementation reaps every terminated processes by calling
998 os.waitpid(-1) directly, possibly breaking other code spawning processes
999 and waiting for their termination.
1000
1001 There is no noticeable overhead when handling a big number of children
1002 (O(1) each time a child terminates).
1003 """
Guido van Rossum2bcae702013-11-13 15:50:08 -08001004 def __init__(self):
1005 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001006 self._lock = threading.Lock()
1007 self._zombies = {}
1008 self._forks = 0
1009
1010 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001011 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001012 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001013 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001014
1015 def __enter__(self):
1016 with self._lock:
1017 self._forks += 1
1018
1019 return self
1020
1021 def __exit__(self, a, b, c):
1022 with self._lock:
1023 self._forks -= 1
1024
1025 if self._forks or not self._zombies:
1026 return
1027
1028 collateral_victims = str(self._zombies)
1029 self._zombies.clear()
1030
1031 logger.warning(
1032 "Caught subprocesses termination from unknown pids: %s",
1033 collateral_victims)
1034
1035 def add_child_handler(self, pid, callback, *args):
1036 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001037
1038 if self._loop is None:
1039 raise RuntimeError(
1040 "Cannot add child handler, "
1041 "the child watcher does not have a loop attached")
1042
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001043 with self._lock:
1044 try:
1045 returncode = self._zombies.pop(pid)
1046 except KeyError:
1047 # The child is running.
1048 self._callbacks[pid] = callback, args
1049 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001050
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001051 # The child is dead already. We can fire the callback.
1052 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001053
Guido van Rossum2bcae702013-11-13 15:50:08 -08001054 def remove_child_handler(self, pid):
1055 try:
1056 del self._callbacks[pid]
1057 return True
1058 except KeyError:
1059 return False
1060
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001061 def _do_waitpid_all(self):
1062 # Because of signal coalescing, we must keep calling waitpid() as
1063 # long as we're able to reap a child.
1064 while True:
1065 try:
1066 pid, status = os.waitpid(-1, os.WNOHANG)
1067 except ChildProcessError:
1068 # No more child processes exist.
1069 return
1070 else:
1071 if pid == 0:
1072 # A child process is still alive.
1073 return
1074
1075 returncode = self._compute_returncode(status)
1076
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001077 with self._lock:
1078 try:
1079 callback, args = self._callbacks.pop(pid)
1080 except KeyError:
1081 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001082 if self._forks:
1083 # It may not be registered yet.
1084 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001085 if self._loop.get_debug():
1086 logger.debug('unknown process %s exited '
1087 'with returncode %s',
1088 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001089 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001090 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001091 else:
1092 if self._loop.get_debug():
1093 logger.debug('process %s exited with returncode %s',
1094 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001095
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001096 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001097 logger.warning(
1098 "Caught subprocess termination from unknown pid: "
1099 "%d -> %d", pid, returncode)
1100 else:
1101 callback(pid, returncode, *args)
1102
1103
1104class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001105 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001106 _loop_factory = _UnixSelectorEventLoop
1107
1108 def __init__(self):
1109 super().__init__()
1110 self._watcher = None
1111
1112 def _init_watcher(self):
1113 with events._lock:
1114 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001115 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001116 if isinstance(threading.current_thread(),
1117 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001118 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001119
1120 def set_event_loop(self, loop):
1121 """Set the event loop.
1122
1123 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001124 .set_event_loop() from the main thread will call .attach_loop(loop) on
1125 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001126 """
1127
1128 super().set_event_loop(loop)
1129
Andrew Svetlovcc839202017-11-29 18:23:43 +02001130 if (self._watcher is not None and
1131 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001132 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001133
1134 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001135 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001136
1137 If not yet set, a SafeChildWatcher object is automatically created.
1138 """
1139 if self._watcher is None:
1140 self._init_watcher()
1141
1142 return self._watcher
1143
1144 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001145 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001146
1147 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1148
1149 if self._watcher is not None:
1150 self._watcher.close()
1151
1152 self._watcher = watcher
1153
Yury Selivanov6370f342017-12-10 18:36:12 -05001154
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001155SelectorEventLoop = _UnixSelectorEventLoop
1156DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy