blob: 639300f976ab8dce10a42015d6db8a70a61a2844 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Andrew Svetlov6b5a2792018-01-16 19:59:34 +02004import io
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01006import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import signal
8import socket
9import stat
10import subprocess
11import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15
Yury Selivanovb057c522014-02-18 12:15:06 -050016from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080019from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010021from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import selector_events
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -070023from . import tasks
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Yury Selivanov6370f342017-12-10 18:36:12 -050028__all__ = (
29 'SelectorEventLoop',
30 'AbstractChildWatcher', 'SafeChildWatcher',
31 'FastChildWatcher', 'DefaultEventLoopPolicy',
32)
33
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035if sys.platform == 'win32': # pragma: no cover
36 raise ImportError('Signals are not really supported on Windows')
37
38
Victor Stinnerfe5649c2014-07-17 22:43:40 +020039def _sighandler_noop(signum, frame):
40 """Dummy signal handler."""
41 pass
42
43
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080044class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050045 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
Yury Selivanovb057c522014-02-18 12:15:06 -050047 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 """
49
50 def __init__(self, selector=None):
51 super().__init__(selector)
52 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020055 super().close()
Andrew Svetlov4a025432017-12-21 17:06:46 +020056 if not sys.is_finalizing():
57 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
59 else:
Andrew Svetlov4f146f92017-12-24 13:50:03 +020060 if self._signal_handlers:
Andrew Svetlova8f4e152017-12-26 11:53:38 +020061 warnings.warn(f"Closing the loop {self!r} "
Andrew Svetlov4f146f92017-12-24 13:50:03 +020062 f"on interpreter shutdown "
Andrew Svetlova8f4e152017-12-26 11:53:38 +020063 f"stage, skipping signal handlers removal",
Andrew Svetlov4f146f92017-12-24 13:50:03 +020064 ResourceWarning,
65 source=self)
66 self._signal_handlers.clear()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080067
Victor Stinnerfe5649c2014-07-17 22:43:40 +020068 def _process_self_data(self, data):
69 for signum in data:
70 if not signum:
71 # ignore null bytes written by _write_to_self()
72 continue
73 self._handle_signal(signum)
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 def add_signal_handler(self, sig, callback, *args):
76 """Add a handler for a signal. UNIX only.
77
78 Raise ValueError if the signal number is invalid or uncatchable.
79 Raise RuntimeError if there is a problem setting up the handler.
80 """
Yury Selivanov6370f342017-12-10 18:36:12 -050081 if (coroutines.iscoroutine(callback) or
82 coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010083 raise TypeError("coroutines cannot be used "
84 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010086 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087 try:
88 # set_wakeup_fd() raises ValueError if this is not the
89 # main thread. By calling it early we ensure that an
90 # event loop running in another thread cannot add a signal
91 # handler.
92 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020093 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 raise RuntimeError(str(exc))
95
Yury Selivanovf23746a2018-01-22 19:11:18 -050096 handle = events.Handle(callback, args, self, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 self._signal_handlers[sig] = handle
98
99 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200100 # Register a dummy signal handler to ask Python to write the signal
101 # number in the wakup file descriptor. _process_self_data() will
102 # read signal numbers from this file descriptor to handle signals.
103 signal.signal(sig, _sighandler_noop)
104
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100105 # Set SA_RESTART to limit EINTR occurrences.
106 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 except OSError as exc:
108 del self._signal_handlers[sig]
109 if not self._signal_handlers:
110 try:
111 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200112 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700113 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500116 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 else:
118 raise
119
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200120 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 """Internal helper that is the actual signal handler."""
122 handle = self._signal_handlers.get(sig)
123 if handle is None:
124 return # Assume it's some race condition.
125 if handle._cancelled:
126 self.remove_signal_handler(sig) # Remove it properly.
127 else:
128 self._add_callback_signalsafe(handle)
129
130 def remove_signal_handler(self, sig):
131 """Remove a handler for a signal. UNIX only.
132
133 Return True if a signal handler was removed, False if not.
134 """
135 self._check_signal(sig)
136 try:
137 del self._signal_handlers[sig]
138 except KeyError:
139 return False
140
141 if sig == signal.SIGINT:
142 handler = signal.default_int_handler
143 else:
144 handler = signal.SIG_DFL
145
146 try:
147 signal.signal(sig, handler)
148 except OSError as exc:
149 if exc.errno == errno.EINVAL:
Yury Selivanov6370f342017-12-10 18:36:12 -0500150 raise RuntimeError(f'sig {sig} cannot be caught')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 else:
152 raise
153
154 if not self._signal_handlers:
155 try:
156 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200157 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700158 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159
160 return True
161
162 def _check_signal(self, sig):
163 """Internal helper to validate a signal.
164
165 Raise ValueError if the signal number is invalid or uncatchable.
166 Raise RuntimeError if there is a problem setting up the handler.
167 """
168 if not isinstance(sig, int):
Yury Selivanov6370f342017-12-10 18:36:12 -0500169 raise TypeError(f'sig must be an int, not {sig!r}')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
171 if not (1 <= sig < signal.NSIG):
Yury Selivanov6370f342017-12-10 18:36:12 -0500172 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173
174 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175 extra=None):
176 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177
178 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179 extra=None):
180 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200182 async def _make_subprocess_transport(self, protocol, args, shell,
183 stdin, stdout, stderr, bufsize,
184 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400186 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800187 transp = _UnixSubprocessTransport(self, protocol, args, shell,
188 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100189 waiter=waiter, extra=extra,
190 **kwargs)
191
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800192 watcher.add_child_handler(transp.get_pid(),
193 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100194 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200195 await waiter
196 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100197 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200198 await transp._wait()
199 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800200
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 return transp
202
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800203 def _child_watcher_callback(self, pid, returncode, transp):
204 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205
Neil Aspinallf7686c12017-12-19 19:45:42 +0000206 async def create_unix_connection(
207 self, protocol_factory, path=None, *,
208 ssl=None, sock=None,
209 server_hostname=None,
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200210 ssl_handshake_timeout=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500211 assert server_hostname is None or isinstance(server_hostname, str)
212 if ssl:
213 if server_hostname is None:
214 raise ValueError(
215 'you have to pass server_hostname when using ssl')
216 else:
217 if server_hostname is not None:
218 raise ValueError('server_hostname is only meaningful with ssl')
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200219 if ssl_handshake_timeout is not None:
220 raise ValueError(
221 'ssl_handshake_timeout is only meaningful with ssl')
Yury Selivanovb057c522014-02-18 12:15:06 -0500222
223 if path is not None:
224 if sock is not None:
225 raise ValueError(
226 'path and sock can not be specified at the same time')
227
Andrew Svetlovcc839202017-11-29 18:23:43 +0200228 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200232 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100233 except:
234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 raise
236
237 else:
238 if sock is None:
239 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400240 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500241 sock.type != socket.SOCK_STREAM):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400242 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500243 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 sock.setblocking(False)
245
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200246 transport, protocol = await self._create_connection_transport(
Neil Aspinallf7686c12017-12-19 19:45:42 +0000247 sock, protocol_factory, ssl, server_hostname,
248 ssl_handshake_timeout=ssl_handshake_timeout)
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 return transport, protocol
250
Neil Aspinallf7686c12017-12-19 19:45:42 +0000251 async def create_unix_server(
252 self, protocol_factory, path=None, *,
253 sock=None, backlog=100, ssl=None,
Yury Selivanovc9070d02018-01-25 18:08:09 -0500254 ssl_handshake_timeout=None,
255 start_serving=True):
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 if isinstance(ssl, bool):
257 raise TypeError('ssl argument must be an SSLContext or None')
258
Andrew Svetlov51eb1c62017-12-20 20:24:43 +0200259 if ssl_handshake_timeout is not None and not ssl:
260 raise ValueError(
261 'ssl_handshake_timeout is only meaningful with ssl')
262
Yury Selivanovb057c522014-02-18 12:15:06 -0500263 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200264 if sock is not None:
265 raise ValueError(
266 'path and sock can not be specified at the same time')
267
Andrew Svetlovcc839202017-11-29 18:23:43 +0200268 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500269 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270
Yury Selivanov908d55d2016-10-09 12:15:08 -0400271 # Check for abstract socket. `str` and `bytes` paths are supported.
272 if path[0] not in (0, '\x00'):
273 try:
274 if stat.S_ISSOCK(os.stat(path).st_mode):
275 os.remove(path)
276 except FileNotFoundError:
277 pass
278 except OSError as err:
279 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200280 logger.error('Unable to check or remove stale UNIX socket '
281 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400282
Yury Selivanovb057c522014-02-18 12:15:06 -0500283 try:
284 sock.bind(path)
285 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100286 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500287 if exc.errno == errno.EADDRINUSE:
288 # Let's improve the error message by adding
289 # with what exact address it occurs.
Yury Selivanov6370f342017-12-10 18:36:12 -0500290 msg = f'Address {path!r} is already in use'
Yury Selivanovb057c522014-02-18 12:15:06 -0500291 raise OSError(errno.EADDRINUSE, msg) from None
292 else:
293 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200294 except:
295 sock.close()
296 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500297 else:
298 if sock is None:
299 raise ValueError(
300 'path was not specified, and no sock specified')
301
Yury Selivanov36e7e972016-10-07 12:39:57 -0400302 if (sock.family != socket.AF_UNIX or
Yury Selivanova7bd64c2017-12-19 06:44:37 -0500303 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500304 raise ValueError(
Yury Selivanov6370f342017-12-10 18:36:12 -0500305 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
Yury Selivanovb057c522014-02-18 12:15:06 -0500306
Yury Selivanovb057c522014-02-18 12:15:06 -0500307 sock.setblocking(False)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500308 server = base_events.Server(self, [sock], protocol_factory,
309 ssl, backlog, ssl_handshake_timeout)
310 if start_serving:
311 server._start_serving()
Miss Islington (bot)bc3a0022018-05-28 11:50:45 -0700312 # Skip one loop iteration so that all 'loop.add_reader'
313 # go through.
314 await tasks.sleep(0, loop=self)
Yury Selivanovc9070d02018-01-25 18:08:09 -0500315
Yury Selivanovb057c522014-02-18 12:15:06 -0500316 return server
317
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200318 async def _sock_sendfile_native(self, sock, file, offset, count):
319 try:
320 os.sendfile
321 except AttributeError as exc:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200322 raise events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200323 "os.sendfile() is not available")
324 try:
325 fileno = file.fileno()
326 except (AttributeError, io.UnsupportedOperation) as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200327 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200328 try:
329 fsize = os.fstat(fileno).st_size
330 except OSError as err:
Andrew Svetlov7464e872018-01-19 20:04:29 +0200331 raise events.SendfileNotAvailableError("not a regular file")
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200332 blocksize = count if count else fsize
333 if not blocksize:
334 return 0 # empty file
335
336 fut = self.create_future()
337 self._sock_sendfile_native_impl(fut, None, sock, fileno,
338 offset, count, blocksize, 0)
339 return await fut
340
341 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
342 offset, count, blocksize, total_sent):
343 fd = sock.fileno()
344 if registered_fd is not None:
345 # Remove the callback early. It should be rare that the
346 # selector says the fd is ready but the call still returns
347 # EAGAIN, and I am willing to take a hit in that case in
348 # order to simplify the common case.
349 self.remove_writer(registered_fd)
350 if fut.cancelled():
351 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
352 return
353 if count:
354 blocksize = count - total_sent
355 if blocksize <= 0:
356 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
357 fut.set_result(total_sent)
358 return
359
360 try:
361 sent = os.sendfile(fd, fileno, offset, blocksize)
362 except (BlockingIOError, InterruptedError):
363 if registered_fd is None:
364 self._sock_add_cancellation_callback(fut, sock)
365 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
366 fd, sock, fileno,
367 offset, count, blocksize, total_sent)
368 except OSError as exc:
Yury Selivanov2a2247c2018-01-27 17:22:01 -0500369 if (registered_fd is not None and
370 exc.errno == errno.ENOTCONN and
371 type(exc) is not ConnectionError):
372 # If we have an ENOTCONN and this isn't a first call to
373 # sendfile(), i.e. the connection was closed in the middle
374 # of the operation, normalize the error to ConnectionError
375 # to make it consistent across all Posix systems.
376 new_exc = ConnectionError(
377 "socket is not connected", errno.ENOTCONN)
378 new_exc.__cause__ = exc
379 exc = new_exc
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200380 if total_sent == 0:
381 # We can get here for different reasons, the main
382 # one being 'file' is not a regular mmap(2)-like
383 # file, in which case we'll fall back on using
384 # plain send().
Andrew Svetlov7464e872018-01-19 20:04:29 +0200385 err = events.SendfileNotAvailableError(
Andrew Svetlov6b5a2792018-01-16 19:59:34 +0200386 "os.sendfile call failed")
387 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
388 fut.set_exception(err)
389 else:
390 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
391 fut.set_exception(exc)
392 except Exception as exc:
393 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394 fut.set_exception(exc)
395 else:
396 if sent == 0:
397 # EOF
398 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
399 fut.set_result(total_sent)
400 else:
401 offset += sent
402 total_sent += sent
403 if registered_fd is None:
404 self._sock_add_cancellation_callback(fut, sock)
405 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
406 fd, sock, fileno,
407 offset, count, blocksize, total_sent)
408
409 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
410 if total_sent > 0:
411 os.lseek(fileno, offset, os.SEEK_SET)
412
413 def _sock_add_cancellation_callback(self, fut, sock):
414 def cb(fut):
415 if fut.cancelled():
416 fd = sock.fileno()
417 if fd != -1:
418 self.remove_writer(fd)
419 fut.add_done_callback(cb)
420
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422class _UnixReadPipeTransport(transports.ReadTransport):
423
Yury Selivanovdec1a452014-02-18 22:27:48 -0500424 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
426 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
427 super().__init__(extra)
428 self._extra['pipe'] = pipe
429 self._loop = loop
430 self._pipe = pipe
431 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700432 self._protocol = protocol
433 self._closing = False
434
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700435 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800436 if not (stat.S_ISFIFO(mode) or
437 stat.S_ISSOCK(mode) or
438 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700439 self._pipe = None
440 self._fileno = None
441 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700442 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700443
Andrew Svetlovcc839202017-11-29 18:23:43 +0200444 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700445
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100447 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400448 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100449 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100451 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500452 self._loop.call_soon(futures._set_result_unless_cancelled,
453 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
Victor Stinnere912e652014-07-12 03:11:53 +0200455 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100456 info = [self.__class__.__name__]
457 if self._pipe is None:
458 info.append('closed')
459 elif self._closing:
460 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500461 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400462 selector = getattr(self._loop, '_selector', None)
463 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200464 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500465 selector, self._fileno, selectors.EVENT_READ)
Victor Stinnere912e652014-07-12 03:11:53 +0200466 if polling:
467 info.append('polling')
468 else:
469 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400470 elif self._pipe is not None:
471 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200472 else:
473 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500474 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200475
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 def _read_ready(self):
477 try:
478 data = os.read(self._fileno, self.max_size)
479 except (BlockingIOError, InterruptedError):
480 pass
481 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100482 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 else:
484 if data:
485 self._protocol.data_received(data)
486 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200487 if self._loop.get_debug():
488 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400490 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 self._loop.call_soon(self._protocol.eof_received)
492 self._loop.call_soon(self._call_connection_lost, None)
493
Guido van Rossum57497ad2013-10-18 07:58:20 -0700494 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400495 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496
Guido van Rossum57497ad2013-10-18 07:58:20 -0700497 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400498 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400500 def set_protocol(self, protocol):
501 self._protocol = protocol
502
503 def get_protocol(self):
504 return self._protocol
505
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500506 def is_closing(self):
507 return self._closing
508
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 def close(self):
510 if not self._closing:
511 self._close(None)
512
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900513 def __del__(self):
514 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500515 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900516 source=self)
517 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100518
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200521 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
522 if self._loop.get_debug():
523 logger.debug("%r: %s", self, message, exc_info=True)
524 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500527 'exception': exc,
528 'transport': self,
529 'protocol': self._protocol,
530 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 self._close(exc)
532
533 def _close(self, exc):
534 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400535 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 self._loop.call_soon(self._call_connection_lost, exc)
537
538 def _call_connection_lost(self, exc):
539 try:
540 self._protocol.connection_lost(exc)
541 finally:
542 self._pipe.close()
543 self._pipe = None
544 self._protocol = None
545 self._loop = None
546
547
Yury Selivanov3cb99142014-02-18 18:41:13 -0500548class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800549 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
551 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100552 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._pipe = pipe
555 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 self._conn_lost = 0
559 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700560
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700561 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700562 is_char = stat.S_ISCHR(mode)
563 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700564 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700565 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700566 self._pipe = None
567 self._fileno = None
568 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100569 raise ValueError("Pipe transport is only for "
570 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700571
Andrew Svetlovcc839202017-11-29 18:23:43 +0200572 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100574
575 # On AIX, the reader trick (to be notified when the read end of the
576 # socket is closed) only works for sockets. On other platforms it
577 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700578 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100579 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400580 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100581 self._fileno, self._read_ready)
582
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100584 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500585 self._loop.call_soon(futures._set_result_unless_cancelled,
586 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Victor Stinnere912e652014-07-12 03:11:53 +0200588 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100589 info = [self.__class__.__name__]
590 if self._pipe is None:
591 info.append('closed')
592 elif self._closing:
593 info.append('closing')
Yury Selivanov6370f342017-12-10 18:36:12 -0500594 info.append(f'fd={self._fileno}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400595 selector = getattr(self._loop, '_selector', None)
596 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200597 polling = selector_events._test_selector_event(
Yury Selivanov6370f342017-12-10 18:36:12 -0500598 selector, self._fileno, selectors.EVENT_WRITE)
Victor Stinnere912e652014-07-12 03:11:53 +0200599 if polling:
600 info.append('polling')
601 else:
602 info.append('idle')
603
604 bufsize = self.get_write_buffer_size()
Yury Selivanov6370f342017-12-10 18:36:12 -0500605 info.append(f'bufsize={bufsize}')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400606 elif self._pipe is not None:
607 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200608 else:
609 info.append('closed')
Yury Selivanov6370f342017-12-10 18:36:12 -0500610 return '<{}>'.format(' '.join(info))
Victor Stinnere912e652014-07-12 03:11:53 +0200611
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800612 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400613 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800614
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700616 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200617 if self._loop.get_debug():
618 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100619 if self._buffer:
620 self._close(BrokenPipeError())
621 else:
622 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
624 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800625 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
626 if isinstance(data, bytearray):
627 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 if not data:
629 return
630
631 if self._conn_lost or self._closing:
632 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700633 logger.warning('pipe closed by peer or '
634 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 self._conn_lost += 1
636 return
637
638 if not self._buffer:
639 # Attempt to send it right away first.
640 try:
641 n = os.write(self._fileno, data)
642 except (BlockingIOError, InterruptedError):
643 n = 0
644 except Exception as exc:
645 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100646 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 return
648 if n == len(data):
649 return
650 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400651 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400652 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400654 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800655 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700656
657 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400658 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400661 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400663 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400665 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 self._conn_lost += 1
667 # Remove writer here, _fatal_error() doesn't it
668 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400669 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100670 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400672 if n == len(self._buffer):
673 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400674 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800675 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400676 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400677 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 self._call_connection_lost(None)
679 return
680 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400681 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
683 def can_write_eof(self):
684 return True
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def write_eof(self):
687 if self._closing:
688 return
689 assert self._pipe
690 self._closing = True
691 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400692 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 self._loop.call_soon(self._call_connection_lost, None)
694
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400695 def set_protocol(self, protocol):
696 self._protocol = protocol
697
698 def get_protocol(self):
699 return self._protocol
700
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500701 def is_closing(self):
702 return self._closing
703
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100705 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 # write_eof is all what we needed to close the write pipe
707 self.write_eof()
708
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900709 def __del__(self):
710 if self._pipe is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500711 warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900712 source=self)
713 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100714
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700715 def abort(self):
716 self._close(None)
717
Victor Stinner0ee29c22014-02-19 01:40:41 +0100718 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200720 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200721 if self._loop.get_debug():
722 logger.debug("%r: %s", self, message, exc_info=True)
723 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500724 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100725 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500726 'exception': exc,
727 'transport': self,
728 'protocol': self._protocol,
729 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 self._close(exc)
731
732 def _close(self, exc=None):
733 self._closing = True
734 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400735 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400737 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 self._loop.call_soon(self._call_connection_lost, exc)
739
740 def _call_connection_lost(self, exc):
741 try:
742 self._protocol.connection_lost(exc)
743 finally:
744 self._pipe.close()
745 self._pipe = None
746 self._protocol = None
747 self._loop = None
748
749
Guido van Rossum59691282013-10-30 14:52:03 -0700750class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
Guido van Rossum59691282013-10-30 14:52:03 -0700752 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700753 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700755 # Use a socket pair for stdin, since not all platforms
756 # support selecting read events on the write end of a
757 # socket (which we use in order to detect closing of the
758 # other end). Notably this is needed on AIX, and works
759 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100760 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 self._proc = subprocess.Popen(
762 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
763 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700764 if stdin_w is not None:
765 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200766 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800767
768
769class AbstractChildWatcher:
770 """Abstract base class for monitoring child processes.
771
772 Objects derived from this class monitor a collection of subprocesses and
773 report their termination or interruption by a signal.
774
775 New callbacks are registered with .add_child_handler(). Starting a new
776 process must be done within a 'with' block to allow the watcher to suspend
777 its activity until the new process if fully registered (this is needed to
778 prevent a race condition in some implementations).
779
780 Example:
781 with watcher:
782 proc = subprocess.Popen("sleep 1")
783 watcher.add_child_handler(proc.pid, callback)
784
785 Notes:
786 Implementations of this class must be thread-safe.
787
788 Since child watcher objects may catch the SIGCHLD signal and call
789 waitpid(-1), there should be only one active object per process.
790 """
791
792 def add_child_handler(self, pid, callback, *args):
793 """Register a new child handler.
794
795 Arrange for callback(pid, returncode, *args) to be called when
796 process 'pid' terminates. Specifying another callback for the same
797 process replaces the previous handler.
798
Victor Stinneracdb7822014-07-14 18:33:40 +0200799 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 """
801 raise NotImplementedError()
802
803 def remove_child_handler(self, pid):
804 """Removes the handler for process 'pid'.
805
806 The function returns True if the handler was successfully removed,
807 False if there was nothing to remove."""
808
809 raise NotImplementedError()
810
Guido van Rossum2bcae702013-11-13 15:50:08 -0800811 def attach_loop(self, loop):
812 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800813
Guido van Rossum2bcae702013-11-13 15:50:08 -0800814 If the watcher was previously attached to an event loop, then it is
815 first detached before attaching to the new loop.
816
817 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800818 """
819 raise NotImplementedError()
820
821 def close(self):
822 """Close the watcher.
823
824 This must be called to make sure that any underlying resource is freed.
825 """
826 raise NotImplementedError()
827
828 def __enter__(self):
829 """Enter the watcher's context and allow starting new processes
830
831 This function must return self"""
832 raise NotImplementedError()
833
834 def __exit__(self, a, b, c):
835 """Exit the watcher's context"""
836 raise NotImplementedError()
837
838
839class BaseChildWatcher(AbstractChildWatcher):
840
Guido van Rossum2bcae702013-11-13 15:50:08 -0800841 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800842 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400843 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800844
845 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800846 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800847
848 def _do_waitpid(self, expected_pid):
849 raise NotImplementedError()
850
851 def _do_waitpid_all(self):
852 raise NotImplementedError()
853
Guido van Rossum2bcae702013-11-13 15:50:08 -0800854 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 assert loop is None or isinstance(loop, events.AbstractEventLoop)
856
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400857 if self._loop is not None and loop is None and self._callbacks:
858 warnings.warn(
859 'A loop is being detached '
860 'from a child watcher with pending handlers',
861 RuntimeWarning)
862
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863 if self._loop is not None:
864 self._loop.remove_signal_handler(signal.SIGCHLD)
865
866 self._loop = loop
867 if loop is not None:
868 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
869
870 # Prevent a race condition in case a child terminated
871 # during the switch.
872 self._do_waitpid_all()
873
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800874 def _sig_chld(self):
875 try:
876 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500877 except Exception as exc:
878 # self._loop should always be available here
879 # as '_sig_chld' is added as a signal handler
880 # in 'attach_loop'
881 self._loop.call_exception_handler({
882 'message': 'Unknown exception in SIGCHLD handler',
883 'exception': exc,
884 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885
886 def _compute_returncode(self, status):
887 if os.WIFSIGNALED(status):
888 # The child process died because of a signal.
889 return -os.WTERMSIG(status)
890 elif os.WIFEXITED(status):
891 # The child process exited (e.g sys.exit()).
892 return os.WEXITSTATUS(status)
893 else:
894 # The child exited, but we don't understand its status.
895 # This shouldn't happen, but if it does, let's just
896 # return that status; perhaps that helps debug it.
897 return status
898
899
900class SafeChildWatcher(BaseChildWatcher):
901 """'Safe' child watcher implementation.
902
903 This implementation avoids disrupting other code spawning processes by
904 polling explicitly each process in the SIGCHLD handler instead of calling
905 os.waitpid(-1).
906
907 This is a safe solution but it has a significant overhead when handling a
908 big number of children (O(n) each time SIGCHLD is raised)
909 """
910
Guido van Rossum2bcae702013-11-13 15:50:08 -0800911 def close(self):
912 self._callbacks.clear()
913 super().close()
914
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800915 def __enter__(self):
916 return self
917
918 def __exit__(self, a, b, c):
919 pass
920
921 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400922 if self._loop is None:
923 raise RuntimeError(
924 "Cannot add child handler, "
925 "the child watcher does not have a loop attached")
926
Victor Stinner47cd10d2015-01-30 00:05:19 +0100927 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800928
929 # Prevent a race condition in case the child is already terminated.
930 self._do_waitpid(pid)
931
Guido van Rossum2bcae702013-11-13 15:50:08 -0800932 def remove_child_handler(self, pid):
933 try:
934 del self._callbacks[pid]
935 return True
936 except KeyError:
937 return False
938
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800939 def _do_waitpid_all(self):
940
941 for pid in list(self._callbacks):
942 self._do_waitpid(pid)
943
944 def _do_waitpid(self, expected_pid):
945 assert expected_pid > 0
946
947 try:
948 pid, status = os.waitpid(expected_pid, os.WNOHANG)
949 except ChildProcessError:
950 # The child process is already reaped
951 # (may happen if waitpid() is called elsewhere).
952 pid = expected_pid
953 returncode = 255
954 logger.warning(
955 "Unknown child process pid %d, will report returncode 255",
956 pid)
957 else:
958 if pid == 0:
959 # The child process is still alive.
960 return
961
962 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200963 if self._loop.get_debug():
964 logger.debug('process %s exited with returncode %s',
965 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800966
967 try:
968 callback, args = self._callbacks.pop(pid)
969 except KeyError: # pragma: no cover
970 # May happen if .remove_child_handler() is called
971 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200972 if self._loop.get_debug():
973 logger.warning("Child watcher got an unexpected pid: %r",
974 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800975 else:
976 callback(pid, returncode, *args)
977
978
979class FastChildWatcher(BaseChildWatcher):
980 """'Fast' child watcher implementation.
981
982 This implementation reaps every terminated processes by calling
983 os.waitpid(-1) directly, possibly breaking other code spawning processes
984 and waiting for their termination.
985
986 There is no noticeable overhead when handling a big number of children
987 (O(1) each time a child terminates).
988 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800989 def __init__(self):
990 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800991 self._lock = threading.Lock()
992 self._zombies = {}
993 self._forks = 0
994
995 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800996 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800997 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800998 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800999
1000 def __enter__(self):
1001 with self._lock:
1002 self._forks += 1
1003
1004 return self
1005
1006 def __exit__(self, a, b, c):
1007 with self._lock:
1008 self._forks -= 1
1009
1010 if self._forks or not self._zombies:
1011 return
1012
1013 collateral_victims = str(self._zombies)
1014 self._zombies.clear()
1015
1016 logger.warning(
1017 "Caught subprocesses termination from unknown pids: %s",
1018 collateral_victims)
1019
1020 def add_child_handler(self, pid, callback, *args):
1021 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -04001022
1023 if self._loop is None:
1024 raise RuntimeError(
1025 "Cannot add child handler, "
1026 "the child watcher does not have a loop attached")
1027
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001028 with self._lock:
1029 try:
1030 returncode = self._zombies.pop(pid)
1031 except KeyError:
1032 # The child is running.
1033 self._callbacks[pid] = callback, args
1034 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001035
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001036 # The child is dead already. We can fire the callback.
1037 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001038
Guido van Rossum2bcae702013-11-13 15:50:08 -08001039 def remove_child_handler(self, pid):
1040 try:
1041 del self._callbacks[pid]
1042 return True
1043 except KeyError:
1044 return False
1045
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001046 def _do_waitpid_all(self):
1047 # Because of signal coalescing, we must keep calling waitpid() as
1048 # long as we're able to reap a child.
1049 while True:
1050 try:
1051 pid, status = os.waitpid(-1, os.WNOHANG)
1052 except ChildProcessError:
1053 # No more child processes exist.
1054 return
1055 else:
1056 if pid == 0:
1057 # A child process is still alive.
1058 return
1059
1060 returncode = self._compute_returncode(status)
1061
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001062 with self._lock:
1063 try:
1064 callback, args = self._callbacks.pop(pid)
1065 except KeyError:
1066 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001067 if self._forks:
1068 # It may not be registered yet.
1069 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001070 if self._loop.get_debug():
1071 logger.debug('unknown process %s exited '
1072 'with returncode %s',
1073 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001074 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001075 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001076 else:
1077 if self._loop.get_debug():
1078 logger.debug('process %s exited with returncode %s',
1079 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001080
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001081 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001082 logger.warning(
1083 "Caught subprocess termination from unknown pid: "
1084 "%d -> %d", pid, returncode)
1085 else:
1086 callback(pid, returncode, *args)
1087
1088
1089class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001090 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001091 _loop_factory = _UnixSelectorEventLoop
1092
1093 def __init__(self):
1094 super().__init__()
1095 self._watcher = None
1096
1097 def _init_watcher(self):
1098 with events._lock:
1099 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001100 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001101 if isinstance(threading.current_thread(),
1102 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001103 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001104
1105 def set_event_loop(self, loop):
1106 """Set the event loop.
1107
1108 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001109 .set_event_loop() from the main thread will call .attach_loop(loop) on
1110 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001111 """
1112
1113 super().set_event_loop(loop)
1114
Andrew Svetlovcc839202017-11-29 18:23:43 +02001115 if (self._watcher is not None and
1116 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001117 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001118
1119 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001120 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001121
1122 If not yet set, a SafeChildWatcher object is automatically created.
1123 """
1124 if self._watcher is None:
1125 self._init_watcher()
1126
1127 return self._watcher
1128
1129 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001130 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001131
1132 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1133
1134 if self._watcher is not None:
1135 self._watcher.close()
1136
1137 self._watcher = watcher
1138
Yury Selivanov6370f342017-12-10 18:36:12 -05001139
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001140SelectorEventLoop = _UnixSelectorEventLoop
1141DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy