blob: a0fc996d23a0de7fe34132214e6e8057bfa0db40 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010021from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import selector_events
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -070023from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Yury Selivanov6370f342017-12-10 18:36:12 -050028__all__ = (
29 'SelectorEventLoop',
30 'AbstractChildWatcher', 'SafeChildWatcher',
31 'FastChildWatcher', 'DefaultEventLoopPolicy',
32)
33
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035if sys.platform == 'win32': # pragma: no cover
36 raise ImportError('Signals are not really supported on Windows')
37
38
Victor Stinnerfe5649c2014-07-17 22:43:40 +020039def _sighandler_noop(signum, frame):
40 """Dummy signal handler."""
41 pass
42
43
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080044class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050045 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
Yury Selivanovb057c522014-02-18 12:15:06 -050047 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 """
49
50 def __init__(self, selector=None):
51 super().__init__(selector)
52 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020055 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020056 if not sys.is_finalizing():
57 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
59 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020060 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020061 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020062 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020063 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020064 ResourceWarning,
65 source=self)
66 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080067
Victor Stinnerfe5649c2014-07-17 22:43:40 +020068 def _process_self_data(self, data):
69 for signum in data:
70 if not signum:
71 # ignore null bytes written by _write_to_self()
72 continue
73 self._handle_signal(signum)
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 def add_signal_handler(self, sig, callback, *args):
76 """Add a handler for a signal. UNIX only.
77
78 Raise ValueError if the signal number is invalid or uncatchable.
79 Raise RuntimeError if there is a problem setting up the handler.
80 """
Yury Selivanov6370f342017-12-10 18:36:12 -050081 if (coroutines.iscoroutine(callback) or
82 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010083 raise TypeError("coroutines cannot be used "
84 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010086 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 try:
88 # set_wakeup_fd() raises ValueError if this is not the
89 # main thread. By calling it early we ensure that an
90 # event loop running in another thread cannot add a signal
91 # handler.
92 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020093 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 raise RuntimeError(str(exc))
95
Yury Selivanovf23746a2018-01-22 19:11:18 -050096 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 self._signal_handlers[sig] = handle
98
99 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200100 # Register a dummy signal handler to ask Python to write the signal
101 # number in the wakup file descriptor. _process_self_data() will
102 # read signal numbers from this file descriptor to handle signals.
103 signal.signal(sig, _sighandler_noop)
104
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100105 # Set SA_RESTART to limit EINTR occurrences.
106 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 except OSError as exc:
108 del self._signal_handlers[sig]
109 if not self._signal_handlers:
110 try:
111 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200112 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700113 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500116 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 else:
118 raise
119
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200120 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 """Internal helper that is the actual signal handler."""
122 handle = self._signal_handlers.get(sig)
123 if handle is None:
124 return # Assume it's some race condition.
125 if handle._cancelled:
126 self.remove_signal_handler(sig) # Remove it properly.
127 else:
128 self._add_callback_signalsafe(handle)
129
130 def remove_signal_handler(self, sig):
131 """Remove a handler for a signal. UNIX only.
132
133 Return True if a signal handler was removed, False if not.
134 """
135 self._check_signal(sig)
136 try:
137 del self._signal_handlers[sig]
138 except KeyError:
139 return False
140
141 if sig == signal.SIGINT:
142 handler = signal.default_int_handler
143 else:
144 handler = signal.SIG_DFL
145
146 try:
147 signal.signal(sig, handler)
148 except OSError as exc:
149 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500150 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 else:
152 raise
153
154 if not self._signal_handlers:
155 try:
156 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200157 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700158 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 return True
161
162 def _check_signal(self, sig):
163 """Internal helper to validate a signal.
164
165 Raise ValueError if the signal number is invalid or uncatchable.
166 Raise RuntimeError if there is a problem setting up the handler.
167 """
168 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500169 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
171 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500172 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173
174 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175 extra=None):
176 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177
178 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179 extra=None):
180 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200182 async def _make_subprocess_transport(self, protocol, args, shell,
183 stdin, stdout, stderr, bufsize,
184 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400186 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800187 transp = _UnixSubprocessTransport(self, protocol, args, shell,
188 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100189 waiter=waiter, extra=extra,
190 **kwargs)
191
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800192 watcher.add_child_handler(transp.get_pid(),
193 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100194 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200195 await waiter
196 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100197 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200198 await transp._wait()
199 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800200
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 return transp
202
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800203 def _child_watcher_callback(self, pid, returncode, transp):
204 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
Neil Aspinallf7686c12017-12-19 19:45:42 +0000206 async def create_unix_connection(
207 self, protocol_factory, path=None, *,
208 ssl=None, sock=None,
209 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200210 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500211 assert server_hostname is None or isinstance(server_hostname, str)
212 if ssl:
213 if server_hostname is None:
214 raise ValueError(
215 'you have to pass server_hostname when using ssl')
216 else:
217 if server_hostname is not None:
218 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200219 if ssl_handshake_timeout is not None:
220 raise ValueError(
221 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500222
223 if path is not None:
224 if sock is not None:
225 raise ValueError(
226 'path and sock can not be specified at the same time')
227
Andrew Svetlovcc839202017-11-29 18:23:43 +0200228 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200232 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100233 except:
234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 raise
236
237 else:
238 if sock is None:
239 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400240 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500241 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400242 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500243 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 sock.setblocking(False)
245
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200246 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000247 sock, protocol_factory, ssl, server_hostname,
248 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 return transport, protocol
250
Neil Aspinallf7686c12017-12-19 19:45:42 +0000251 async def create_unix_server(
252 self, protocol_factory, path=None, *,
253 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500254 ssl_handshake_timeout=None,
255 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 if isinstance(ssl, bool):
257 raise TypeError('ssl argument must be an SSLContext or None')
258
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200259 if ssl_handshake_timeout is not None and not ssl:
260 raise ValueError(
261 'ssl_handshake_timeout is only meaningful with ssl')
262
Yury Selivanovb057c522014-02-18 12:15:06 -0500263 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200264 if sock is not None:
265 raise ValueError(
266 'path and sock can not be specified at the same time')
267
Andrew Svetlovcc839202017-11-29 18:23:43 +0200268 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500269 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270
Yury Selivanov908d55d2016-10-09 12:15:08 -0400271 # Check for abstract socket. `str` and `bytes` paths are supported.
272 if path[0] not in (0, '\x00'):
273 try:
274 if stat.S_ISSOCK(os.stat(path).st_mode):
275 os.remove(path)
276 except FileNotFoundError:
277 pass
278 except OSError as err:
279 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200280 logger.error('Unable to check or remove stale UNIX socket '
281 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400282
Yury Selivanovb057c522014-02-18 12:15:06 -0500283 try:
284 sock.bind(path)
285 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100286 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500287 if exc.errno == errno.EADDRINUSE:
288 # Let's improve the error message by adding
289 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500290 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500291 raise OSError(errno.EADDRINUSE, msg) from None
292 else:
293 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200294 except:
295 sock.close()
296 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500297 else:
298 if sock is None:
299 raise ValueError(
300 'path was not specified, and no sock specified')
301
Yury Selivanov36e7e972016-10-07 12:39:57 -0400302 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500303 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500304 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500305 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500306
Yury Selivanovb057c522014-02-18 12:15:06 -0500307 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500308 server = base_events.Server(self, [sock], protocol_factory,
309 ssl, backlog, ssl_handshake_timeout)
310 if start_serving:
311 server._start_serving()
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -0700312 # Skip one loop iteration so that all 'loop.add_reader'
313 # go through.
314 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500315
Yury Selivanovb057c522014-02-18 12:15:06 -0500316 return server
317
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200318 async def _sock_sendfile_native(self, sock, file, offset, count):
319 try:
320 os.sendfile
321 except AttributeError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200322 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200323 "os.sendfile() is not available")
324 try:
325 fileno = file.fileno()
326 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200327 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200328 try:
329 fsize = os.fstat(fileno).st_size
330 except OSError as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200331 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200332 blocksize = count if count else fsize
333 if not blocksize:
334 return 0 # empty file
335
336 fut = self.create_future()
337 self._sock_sendfile_native_impl(fut, None, sock, fileno,
338 offset, count, blocksize, 0)
339 return await fut
340
341 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
342 offset, count, blocksize, total_sent):
343 fd = sock.fileno()
344 if registered_fd is not None:
345 # Remove the callback early. It should be rare that the
346 # selector says the fd is ready but the call still returns
347 # EAGAIN, and I am willing to take a hit in that case in
348 # order to simplify the common case.
349 self.remove_writer(registered_fd)
350 if fut.cancelled():
351 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
352 return
353 if count:
354 blocksize = count - total_sent
355 if blocksize <= 0:
356 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
357 fut.set_result(total_sent)
358 return
359
360 try:
361 sent = os.sendfile(fd, fileno, offset, blocksize)
362 except (BlockingIOError, InterruptedError):
363 if registered_fd is None:
364 self._sock_add_cancellation_callback(fut, sock)
365 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
366 fd, sock, fileno,
367 offset, count, blocksize, total_sent)
368 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500369 if (registered_fd is not None and
370 exc.errno == errno.ENOTCONN and
371 type(exc) is not ConnectionError):
372 # If we have an ENOTCONN and this isn't a first call to
373 # sendfile(), i.e. the connection was closed in the middle
374 # of the operation, normalize the error to ConnectionError
375 # to make it consistent across all Posix systems.
376 new_exc = ConnectionError(
377 "socket is not connected", errno.ENOTCONN)
378 new_exc.__cause__ = exc
379 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200380 if total_sent == 0:
381 # We can get here for different reasons, the main
382 # one being 'file' is not a regular mmap(2)-like
383 # file, in which case we'll fall back on using
384 # plain send().
Andrew Svetlov7464e872018-01-19 20:04:29 +0200385 err = events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200386 "os.sendfile call failed")
387 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
388 fut.set_exception(err)
389 else:
390 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
391 fut.set_exception(exc)
392 except Exception as exc:
393 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394 fut.set_exception(exc)
395 else:
396 if sent == 0:
397 # EOF
398 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
399 fut.set_result(total_sent)
400 else:
401 offset += sent
402 total_sent += sent
403 if registered_fd is None:
404 self._sock_add_cancellation_callback(fut, sock)
405 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
406 fd, sock, fileno,
407 offset, count, blocksize, total_sent)
408
409 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
410 if total_sent > 0:
411 os.lseek(fileno, offset, os.SEEK_SET)
412
413 def _sock_add_cancellation_callback(self, fut, sock):
414 def cb(fut):
415 if fut.cancelled():
416 fd = sock.fileno()
417 if fd != -1:
418 self.remove_writer(fd)
419 fut.add_done_callback(cb)
420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422class _UnixReadPipeTransport(transports.ReadTransport):
423
Yury Selivanovdec1a452014-02-18 22:27:48 -0500424 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
426 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
427 super().__init__(extra)
428 self._extra['pipe'] = pipe
429 self._loop = loop
430 self._pipe = pipe
431 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700432 self._protocol = protocol
433 self._closing = False
434
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700435 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800436 if not (stat.S_ISFIFO(mode) or
437 stat.S_ISSOCK(mode) or
438 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700439 self._pipe = None
440 self._fileno = None
441 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700442 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700443
Andrew Svetlovcc839202017-11-29 18:23:43 +0200444 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700445
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100447 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400448 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100449 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100451 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500452 self._loop.call_soon(futures._set_result_unless_cancelled,
453 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
Victor Stinnere912e652014-07-12 03:11:53 +0200455 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100456 info = [self.__class__.__name__]
457 if self._pipe is None:
458 info.append('closed')
459 elif self._closing:
460 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500461 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400462 selector = getattr(self._loop, '_selector', None)
463 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200464 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500465 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200466 if polling:
467 info.append('polling')
468 else:
469 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400470 elif self._pipe is not None:
471 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200472 else:
473 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500474 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200475
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 def _read_ready(self):
477 try:
478 data = os.read(self._fileno, self.max_size)
479 except (BlockingIOError, InterruptedError):
480 pass
481 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100482 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 else:
484 if data:
485 self._protocol.data_received(data)
486 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200487 if self._loop.get_debug():
488 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400490 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 self._loop.call_soon(self._protocol.eof_received)
492 self._loop.call_soon(self._call_connection_lost, None)
493
Guido van Rossum57497ad2013-10-18 07:58:20 -0700494 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400495 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496
Guido van Rossum57497ad2013-10-18 07:58:20 -0700497 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400498 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400500 def set_protocol(self, protocol):
501 self._protocol = protocol
502
503 def get_protocol(self):
504 return self._protocol
505
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500506 def is_closing(self):
507 return self._closing
508
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 def close(self):
510 if not self._closing:
511 self._close(None)
512
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900513 def __del__(self):
514 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500515 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900516 source=self)
517 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100518
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200521 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
522 if self._loop.get_debug():
523 logger.debug("%r: %s", self, message, exc_info=True)
524 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500527 'exception': exc,
528 'transport': self,
529 'protocol': self._protocol,
530 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 self._close(exc)
532
533 def _close(self, exc):
534 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400535 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._loop.call_soon(self._call_connection_lost, exc)
537
538 def _call_connection_lost(self, exc):
539 try:
540 self._protocol.connection_lost(exc)
541 finally:
542 self._pipe.close()
543 self._pipe = None
544 self._protocol = None
545 self._loop = None
546
547
Yury Selivanov3cb99142014-02-18 18:41:13 -0500548class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800549 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100552 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._pipe = pipe
555 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._conn_lost = 0
559 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700561 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700562 is_char = stat.S_ISCHR(mode)
563 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700565 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700566 self._pipe = None
567 self._fileno = None
568 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100569 raise ValueError("Pipe transport is only for "
570 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700571
Andrew Svetlovcc839202017-11-29 18:23:43 +0200572 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100574
575 # On AIX, the reader trick (to be notified when the read end of the
576 # socket is closed) only works for sockets. On other platforms it
577 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700578 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100579 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100581 self._fileno, self._read_ready)
582
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100584 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500585 self._loop.call_soon(futures._set_result_unless_cancelled,
586 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Victor Stinnere912e652014-07-12 03:11:53 +0200588 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100589 info = [self.__class__.__name__]
590 if self._pipe is None:
591 info.append('closed')
592 elif self._closing:
593 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500594 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400595 selector = getattr(self._loop, '_selector', None)
596 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200597 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500598 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200599 if polling:
600 info.append('polling')
601 else:
602 info.append('idle')
603
604 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500605 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400606 elif self._pipe is not None:
607 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200608 else:
609 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500610 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200611
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800612 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400613 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800614
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700616 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200617 if self._loop.get_debug():
618 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100619 if self._buffer:
620 self._close(BrokenPipeError())
621 else:
622 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
624 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800625 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
626 if isinstance(data, bytearray):
627 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 if not data:
629 return
630
631 if self._conn_lost or self._closing:
632 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700633 logger.warning('pipe closed by peer or '
634 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 self._conn_lost += 1
636 return
637
638 if not self._buffer:
639 # Attempt to send it right away first.
640 try:
641 n = os.write(self._fileno, data)
642 except (BlockingIOError, InterruptedError):
643 n = 0
644 except Exception as exc:
645 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100646 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 return
648 if n == len(data):
649 return
650 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400651 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400652 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400654 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800655 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
657 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400658 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400663 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400665 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 self._conn_lost += 1
667 # Remove writer here, _fatal_error() doesn't it
668 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400669 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100670 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400672 if n == len(self._buffer):
673 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400674 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800675 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400676 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 self._call_connection_lost(None)
679 return
680 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400681 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def can_write_eof(self):
684 return True
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def write_eof(self):
687 if self._closing:
688 return
689 assert self._pipe
690 self._closing = True
691 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400692 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 self._loop.call_soon(self._call_connection_lost, None)
694
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400695 def set_protocol(self, protocol):
696 self._protocol = protocol
697
698 def get_protocol(self):
699 return self._protocol
700
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500701 def is_closing(self):
702 return self._closing
703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100705 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 # write_eof is all what we needed to close the write pipe
707 self.write_eof()
708
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900709 def __del__(self):
710 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500711 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900712 source=self)
713 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100714
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 def abort(self):
716 self._close(None)
717
Victor Stinner0ee29c22014-02-19 01:40:41 +0100718 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 # should be called by exception handler only
Andrew Svetlova79b6c52019-05-27 18:52:05 +0300720 if isinstance(exc, OSError):
Victor Stinnerb2614752014-08-25 23:20:52 +0200721 if self._loop.get_debug():
722 logger.debug("%r: %s", self, message, exc_info=True)
723 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500724 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100725 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500726 'exception': exc,
727 'transport': self,
728 'protocol': self._protocol,
729 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 self._close(exc)
731
732 def _close(self, exc=None):
733 self._closing = True
734 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400735 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400737 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 self._loop.call_soon(self._call_connection_lost, exc)
739
740 def _call_connection_lost(self, exc):
741 try:
742 self._protocol.connection_lost(exc)
743 finally:
744 self._pipe.close()
745 self._pipe = None
746 self._protocol = None
747 self._loop = None
748
749
Guido van Rossum59691282013-10-30 14:52:03 -0700750class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
Guido van Rossum59691282013-10-30 14:52:03 -0700752 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700753 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700755 # Use a socket pair for stdin, since not all platforms
756 # support selecting read events on the write end of a
757 # socket (which we use in order to detect closing of the
758 # other end). Notably this is needed on AIX, and works
759 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100760 stdin, stdin_w = socket.socketpair()
Miss Islington (bot)38879322019-05-20 05:35:56 -0700761 try:
762 self._proc = subprocess.Popen(
763 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
764 universal_newlines=False, bufsize=bufsize, **kwargs)
765 if stdin_w is not None:
766 stdin.close()
767 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
768 stdin_w = None
769 finally:
770 if stdin_w is not None:
771 stdin.close()
772 stdin_w.close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773
774
775class AbstractChildWatcher:
776 """Abstract base class for monitoring child processes.
777
778 Objects derived from this class monitor a collection of subprocesses and
779 report their termination or interruption by a signal.
780
781 New callbacks are registered with .add_child_handler(). Starting a new
782 process must be done within a 'with' block to allow the watcher to suspend
783 its activity until the new process if fully registered (this is needed to
784 prevent a race condition in some implementations).
785
786 Example:
787 with watcher:
788 proc = subprocess.Popen("sleep 1")
789 watcher.add_child_handler(proc.pid, callback)
790
791 Notes:
792 Implementations of this class must be thread-safe.
793
794 Since child watcher objects may catch the SIGCHLD signal and call
795 waitpid(-1), there should be only one active object per process.
796 """
797
798 def add_child_handler(self, pid, callback, *args):
799 """Register a new child handler.
800
801 Arrange for callback(pid, returncode, *args) to be called when
802 process 'pid' terminates. Specifying another callback for the same
803 process replaces the previous handler.
804
Victor Stinneracdb7822014-07-14 18:33:40 +0200805 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806 """
807 raise NotImplementedError()
808
809 def remove_child_handler(self, pid):
810 """Removes the handler for process 'pid'.
811
812 The function returns True if the handler was successfully removed,
813 False if there was nothing to remove."""
814
815 raise NotImplementedError()
816
Guido van Rossum2bcae702013-11-13 15:50:08 -0800817 def attach_loop(self, loop):
818 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819
Guido van Rossum2bcae702013-11-13 15:50:08 -0800820 If the watcher was previously attached to an event loop, then it is
821 first detached before attaching to the new loop.
822
823 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800824 """
825 raise NotImplementedError()
826
827 def close(self):
828 """Close the watcher.
829
830 This must be called to make sure that any underlying resource is freed.
831 """
832 raise NotImplementedError()
833
834 def __enter__(self):
835 """Enter the watcher's context and allow starting new processes
836
837 This function must return self"""
838 raise NotImplementedError()
839
840 def __exit__(self, a, b, c):
841 """Exit the watcher's context"""
842 raise NotImplementedError()
843
844
845class BaseChildWatcher(AbstractChildWatcher):
846
Guido van Rossum2bcae702013-11-13 15:50:08 -0800847 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400849 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800850
851 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800852 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853
854 def _do_waitpid(self, expected_pid):
855 raise NotImplementedError()
856
857 def _do_waitpid_all(self):
858 raise NotImplementedError()
859
Guido van Rossum2bcae702013-11-13 15:50:08 -0800860 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800861 assert loop is None or isinstance(loop, events.AbstractEventLoop)
862
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400863 if self._loop is not None and loop is None and self._callbacks:
864 warnings.warn(
865 'A loop is being detached '
866 'from a child watcher with pending handlers',
867 RuntimeWarning)
868
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800869 if self._loop is not None:
870 self._loop.remove_signal_handler(signal.SIGCHLD)
871
872 self._loop = loop
873 if loop is not None:
874 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
875
876 # Prevent a race condition in case a child terminated
877 # during the switch.
878 self._do_waitpid_all()
879
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800880 def _sig_chld(self):
881 try:
882 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500883 except Exception as exc:
884 # self._loop should always be available here
885 # as '_sig_chld' is added as a signal handler
886 # in 'attach_loop'
887 self._loop.call_exception_handler({
888 'message': 'Unknown exception in SIGCHLD handler',
889 'exception': exc,
890 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891
892 def _compute_returncode(self, status):
893 if os.WIFSIGNALED(status):
894 # The child process died because of a signal.
895 return -os.WTERMSIG(status)
896 elif os.WIFEXITED(status):
897 # The child process exited (e.g sys.exit()).
898 return os.WEXITSTATUS(status)
899 else:
900 # The child exited, but we don't understand its status.
901 # This shouldn't happen, but if it does, let's just
902 # return that status; perhaps that helps debug it.
903 return status
904
905
906class SafeChildWatcher(BaseChildWatcher):
907 """'Safe' child watcher implementation.
908
909 This implementation avoids disrupting other code spawning processes by
910 polling explicitly each process in the SIGCHLD handler instead of calling
911 os.waitpid(-1).
912
913 This is a safe solution but it has a significant overhead when handling a
914 big number of children (O(n) each time SIGCHLD is raised)
915 """
916
Guido van Rossum2bcae702013-11-13 15:50:08 -0800917 def close(self):
918 self._callbacks.clear()
919 super().close()
920
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921 def __enter__(self):
922 return self
923
924 def __exit__(self, a, b, c):
925 pass
926
927 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400928 if self._loop is None:
929 raise RuntimeError(
930 "Cannot add child handler, "
931 "the child watcher does not have a loop attached")
932
Victor Stinner47cd10d2015-01-30 00:05:19 +0100933 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800934
935 # Prevent a race condition in case the child is already terminated.
936 self._do_waitpid(pid)
937
Guido van Rossum2bcae702013-11-13 15:50:08 -0800938 def remove_child_handler(self, pid):
939 try:
940 del self._callbacks[pid]
941 return True
942 except KeyError:
943 return False
944
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800945 def _do_waitpid_all(self):
946
947 for pid in list(self._callbacks):
948 self._do_waitpid(pid)
949
950 def _do_waitpid(self, expected_pid):
951 assert expected_pid > 0
952
953 try:
954 pid, status = os.waitpid(expected_pid, os.WNOHANG)
955 except ChildProcessError:
956 # The child process is already reaped
957 # (may happen if waitpid() is called elsewhere).
958 pid = expected_pid
959 returncode = 255
960 logger.warning(
961 "Unknown child process pid %d, will report returncode 255",
962 pid)
963 else:
964 if pid == 0:
965 # The child process is still alive.
966 return
967
968 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200969 if self._loop.get_debug():
970 logger.debug('process %s exited with returncode %s',
971 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972
973 try:
974 callback, args = self._callbacks.pop(pid)
975 except KeyError: # pragma: no cover
976 # May happen if .remove_child_handler() is called
977 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200978 if self._loop.get_debug():
979 logger.warning("Child watcher got an unexpected pid: %r",
980 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800981 else:
982 callback(pid, returncode, *args)
983
984
985class FastChildWatcher(BaseChildWatcher):
986 """'Fast' child watcher implementation.
987
988 This implementation reaps every terminated processes by calling
989 os.waitpid(-1) directly, possibly breaking other code spawning processes
990 and waiting for their termination.
991
992 There is no noticeable overhead when handling a big number of children
993 (O(1) each time a child terminates).
994 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800995 def __init__(self):
996 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800997 self._lock = threading.Lock()
998 self._zombies = {}
999 self._forks = 0
1000
1001 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001002 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001003 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -08001004 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001005
1006 def __enter__(self):
1007 with self._lock:
1008 self._forks += 1
1009
1010 return self
1011
1012 def __exit__(self, a, b, c):
1013 with self._lock:
1014 self._forks -= 1
1015
1016 if self._forks or not self._zombies:
1017 return
1018
1019 collateral_victims = str(self._zombies)
1020 self._zombies.clear()
1021
1022 logger.warning(
1023 "Caught subprocesses termination from unknown pids: %s",
1024 collateral_victims)
1025
1026 def add_child_handler(self, pid, callback, *args):
1027 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001028
1029 if self._loop is None:
1030 raise RuntimeError(
1031 "Cannot add child handler, "
1032 "the child watcher does not have a loop attached")
1033
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001034 with self._lock:
1035 try:
1036 returncode = self._zombies.pop(pid)
1037 except KeyError:
1038 # The child is running.
1039 self._callbacks[pid] = callback, args
1040 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001041
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001042 # The child is dead already. We can fire the callback.
1043 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001044
Guido van Rossum2bcae702013-11-13 15:50:08 -08001045 def remove_child_handler(self, pid):
1046 try:
1047 del self._callbacks[pid]
1048 return True
1049 except KeyError:
1050 return False
1051
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001052 def _do_waitpid_all(self):
1053 # Because of signal coalescing, we must keep calling waitpid() as
1054 # long as we're able to reap a child.
1055 while True:
1056 try:
1057 pid, status = os.waitpid(-1, os.WNOHANG)
1058 except ChildProcessError:
1059 # No more child processes exist.
1060 return
1061 else:
1062 if pid == 0:
1063 # A child process is still alive.
1064 return
1065
1066 returncode = self._compute_returncode(status)
1067
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001068 with self._lock:
1069 try:
1070 callback, args = self._callbacks.pop(pid)
1071 except KeyError:
1072 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001073 if self._forks:
1074 # It may not be registered yet.
1075 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001076 if self._loop.get_debug():
1077 logger.debug('unknown process %s exited '
1078 'with returncode %s',
1079 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001080 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001081 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001082 else:
1083 if self._loop.get_debug():
1084 logger.debug('process %s exited with returncode %s',
1085 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001086
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001087 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001088 logger.warning(
1089 "Caught subprocess termination from unknown pid: "
1090 "%d -> %d", pid, returncode)
1091 else:
1092 callback(pid, returncode, *args)
1093
1094
1095class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001096 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001097 _loop_factory = _UnixSelectorEventLoop
1098
1099 def __init__(self):
1100 super().__init__()
1101 self._watcher = None
1102
1103 def _init_watcher(self):
1104 with events._lock:
1105 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001106 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001107 if isinstance(threading.current_thread(),
1108 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001109 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001110
1111 def set_event_loop(self, loop):
1112 """Set the event loop.
1113
1114 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001115 .set_event_loop() from the main thread will call .attach_loop(loop) on
1116 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001117 """
1118
1119 super().set_event_loop(loop)
1120
Andrew Svetlovcc839202017-11-29 18:23:43 +02001121 if (self._watcher is not None and
1122 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001123 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001124
1125 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001126 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001127
1128 If not yet set, a SafeChildWatcher object is automatically created.
1129 """
1130 if self._watcher is None:
1131 self._init_watcher()
1132
1133 return self._watcher
1134
1135 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001136 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001137
1138 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1139
1140 if self._watcher is not None:
1141 self._watcher.close()
1142
1143 self._watcher = watcher
1144
Yury Selivanov6370f342017-12-10 18:36:12 -05001145
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001146SelectorEventLoop = _UnixSelectorEventLoop
1147DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy