blob: 2806ea8dc9089d8aa5322380ee46b8b82020f845 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Yury Selivanovd7c15182016-11-15 15:26:34 -050042try:
43 _fspath = os.fspath
44except AttributeError:
45 # Python 3.5 or earlier
46 _fspath = lambda path: path
47
48
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080049class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050050 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
Yury Selivanovb057c522014-02-18 12:15:06 -050052 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 """
54
55 def __init__(self, selector=None):
56 super().__init__(selector)
57 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058
59 def _socketpair(self):
60 return socket.socketpair()
61
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080062 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020063 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080064 for sig in list(self._signal_handlers):
65 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080066
Victor Stinnerfe5649c2014-07-17 22:43:40 +020067 def _process_self_data(self, data):
68 for signum in data:
69 if not signum:
70 # ignore null bytes written by _write_to_self()
71 continue
72 self._handle_signal(signum)
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def add_signal_handler(self, sig, callback, *args):
75 """Add a handler for a signal. UNIX only.
76
77 Raise ValueError if the signal number is invalid or uncatchable.
78 Raise RuntimeError if there is a problem setting up the handler.
79 """
Victor Stinner2d99d932014-11-20 15:03:52 +010080 if (coroutines.iscoroutine(callback)
81 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010082 raise TypeError("coroutines cannot be used "
83 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010085 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 try:
87 # set_wakeup_fd() raises ValueError if this is not the
88 # main thread. By calling it early we ensure that an
89 # event loop running in another thread cannot add a signal
90 # handler.
91 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020092 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 raise RuntimeError(str(exc))
94
Yury Selivanov569efa22014-02-18 18:02:19 -050095 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 self._signal_handlers[sig] = handle
97
98 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020099 # Register a dummy signal handler to ask Python to write the signal
100 # number in the wakup file descriptor. _process_self_data() will
101 # read signal numbers from this file descriptor to handle signals.
102 signal.signal(sig, _sighandler_noop)
103
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100104 # Set SA_RESTART to limit EINTR occurrences.
105 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 except OSError as exc:
107 del self._signal_handlers[sig]
108 if not self._signal_handlers:
109 try:
110 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200111 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700112 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 if exc.errno == errno.EINVAL:
115 raise RuntimeError('sig {} cannot be caught'.format(sig))
116 else:
117 raise
118
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200119 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 """Internal helper that is the actual signal handler."""
121 handle = self._signal_handlers.get(sig)
122 if handle is None:
123 return # Assume it's some race condition.
124 if handle._cancelled:
125 self.remove_signal_handler(sig) # Remove it properly.
126 else:
127 self._add_callback_signalsafe(handle)
128
129 def remove_signal_handler(self, sig):
130 """Remove a handler for a signal. UNIX only.
131
132 Return True if a signal handler was removed, False if not.
133 """
134 self._check_signal(sig)
135 try:
136 del self._signal_handlers[sig]
137 except KeyError:
138 return False
139
140 if sig == signal.SIGINT:
141 handler = signal.default_int_handler
142 else:
143 handler = signal.SIG_DFL
144
145 try:
146 signal.signal(sig, handler)
147 except OSError as exc:
148 if exc.errno == errno.EINVAL:
149 raise RuntimeError('sig {} cannot be caught'.format(sig))
150 else:
151 raise
152
153 if not self._signal_handlers:
154 try:
155 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200156 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700157 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 return True
160
161 def _check_signal(self, sig):
162 """Internal helper to validate a signal.
163
164 Raise ValueError if the signal number is invalid or uncatchable.
165 Raise RuntimeError if there is a problem setting up the handler.
166 """
167 if not isinstance(sig, int):
168 raise TypeError('sig must be an int, not {!r}'.format(sig))
169
170 if not (1 <= sig < signal.NSIG):
171 raise ValueError(
172 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
173
174 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175 extra=None):
176 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177
178 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179 extra=None):
180 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181
Victor Stinnerf951d282014-06-29 00:46:45 +0200182 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183 def _make_subprocess_transport(self, protocol, args, shell,
184 stdin, stdout, stderr, bufsize,
185 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400187 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188 transp = _UnixSubprocessTransport(self, protocol, args, shell,
189 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100190 waiter=waiter, extra=extra,
191 **kwargs)
192
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800193 watcher.add_child_handler(transp.get_pid(),
194 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100195 try:
196 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100197 except Exception as exc:
198 # Workaround CPython bug #23353: using yield/yield-from in an
199 # except block of a generator doesn't clear properly
200 # sys.exc_info()
201 err = exc
202 else:
203 err = None
204
205 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100206 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100207 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100208 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800209
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 return transp
211
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800212 def _child_watcher_callback(self, pid, returncode, transp):
213 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
Victor Stinnerf951d282014-06-29 00:46:45 +0200215 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500216 def create_unix_connection(self, protocol_factory, path, *,
217 ssl=None, sock=None,
218 server_hostname=None):
219 assert server_hostname is None or isinstance(server_hostname, str)
220 if ssl:
221 if server_hostname is None:
222 raise ValueError(
223 'you have to pass server_hostname when using ssl')
224 else:
225 if server_hostname is not None:
226 raise ValueError('server_hostname is only meaningful with ssl')
227
228 if path is not None:
229 if sock is not None:
230 raise ValueError(
231 'path and sock can not be specified at the same time')
232
Victor Stinner79a29522014-02-19 01:45:59 +0100233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 sock.setblocking(False)
236 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100237 except:
238 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 raise
240
241 else:
242 if sock is None:
243 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400244 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500245 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400246 raise ValueError(
247 'A UNIX Domain Stream Socket was expected, got {!r}'
248 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 sock.setblocking(False)
250
251 transport, protocol = yield from self._create_connection_transport(
252 sock, protocol_factory, ssl, server_hostname)
253 return transport, protocol
254
Victor Stinnerf951d282014-06-29 00:46:45 +0200255 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 def create_unix_server(self, protocol_factory, path=None, *,
257 sock=None, backlog=100, ssl=None):
258 if isinstance(ssl, bool):
259 raise TypeError('ssl argument must be an SSLContext or None')
260
261 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200262 if sock is not None:
263 raise ValueError(
264 'path and sock can not be specified at the same time')
265
Yury Selivanovd7c15182016-11-15 15:26:34 -0500266 path = _fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500267 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268
Yury Selivanov908d55d2016-10-09 12:15:08 -0400269 # Check for abstract socket. `str` and `bytes` paths are supported.
270 if path[0] not in (0, '\x00'):
271 try:
272 if stat.S_ISSOCK(os.stat(path).st_mode):
273 os.remove(path)
274 except FileNotFoundError:
275 pass
276 except OSError as err:
277 # Directory may have permissions only to create socket.
278 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
279
Yury Selivanovb057c522014-02-18 12:15:06 -0500280 try:
281 sock.bind(path)
282 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100283 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 if exc.errno == errno.EADDRINUSE:
285 # Let's improve the error message by adding
286 # with what exact address it occurs.
287 msg = 'Address {!r} is already in use'.format(path)
288 raise OSError(errno.EADDRINUSE, msg) from None
289 else:
290 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200291 except:
292 sock.close()
293 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500294 else:
295 if sock is None:
296 raise ValueError(
297 'path was not specified, and no sock specified')
298
Yury Selivanov36e7e972016-10-07 12:39:57 -0400299 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500300 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500301 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400302 'A UNIX Domain Stream Socket was expected, got {!r}'
303 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500304
305 server = base_events.Server(self, [sock])
306 sock.listen(backlog)
307 sock.setblocking(False)
308 self._start_serving(protocol_factory, sock, ssl, server)
309 return server
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200312if hasattr(os, 'set_blocking'):
313 def _set_nonblocking(fd):
314 os.set_blocking(fd, False)
315else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400316 import fcntl
317
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200318 def _set_nonblocking(fd):
319 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
320 flags = flags | os.O_NONBLOCK
321 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
323
324class _UnixReadPipeTransport(transports.ReadTransport):
325
Yury Selivanovdec1a452014-02-18 22:27:48 -0500326 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
329 super().__init__(extra)
330 self._extra['pipe'] = pipe
331 self._loop = loop
332 self._pipe = pipe
333 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700334 self._protocol = protocol
335 self._closing = False
336
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700337 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800338 if not (stat.S_ISFIFO(mode) or
339 stat.S_ISSOCK(mode) or
340 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700341 self._pipe = None
342 self._fileno = None
343 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700344 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700345
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700347
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100349 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400350 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100351 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100353 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500354 self._loop.call_soon(futures._set_result_unless_cancelled,
355 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Victor Stinnere912e652014-07-12 03:11:53 +0200357 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100358 info = [self.__class__.__name__]
359 if self._pipe is None:
360 info.append('closed')
361 elif self._closing:
362 info.append('closing')
363 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400364 selector = getattr(self._loop, '_selector', None)
365 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200366 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400367 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200368 self._fileno, selectors.EVENT_READ)
369 if polling:
370 info.append('polling')
371 else:
372 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400373 elif self._pipe is not None:
374 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200375 else:
376 info.append('closed')
377 return '<%s>' % ' '.join(info)
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def _read_ready(self):
380 try:
381 data = os.read(self._fileno, self.max_size)
382 except (BlockingIOError, InterruptedError):
383 pass
384 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100385 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 else:
387 if data:
388 self._protocol.data_received(data)
389 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200390 if self._loop.get_debug():
391 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400393 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._loop.call_soon(self._protocol.eof_received)
395 self._loop.call_soon(self._call_connection_lost, None)
396
Guido van Rossum57497ad2013-10-18 07:58:20 -0700397 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400398 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Guido van Rossum57497ad2013-10-18 07:58:20 -0700400 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400401 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400403 def set_protocol(self, protocol):
404 self._protocol = protocol
405
406 def get_protocol(self):
407 return self._protocol
408
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500409 def is_closing(self):
410 return self._closing
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 def close(self):
413 if not self._closing:
414 self._close(None)
415
Victor Stinner978a9af2015-01-29 17:50:58 +0100416 # On Python 3.3 and older, objects with a destructor part of a reference
417 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
418 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400419 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100420 def __del__(self):
421 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100422 warnings.warn("unclosed transport %r" % self, ResourceWarning,
423 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100424 self._pipe.close()
425
Victor Stinner0ee29c22014-02-19 01:40:41 +0100426 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200428 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
429 if self._loop.get_debug():
430 logger.debug("%r: %s", self, message, exc_info=True)
431 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500432 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100433 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500434 'exception': exc,
435 'transport': self,
436 'protocol': self._protocol,
437 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._close(exc)
439
440 def _close(self, exc):
441 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400442 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 self._loop.call_soon(self._call_connection_lost, exc)
444
445 def _call_connection_lost(self, exc):
446 try:
447 self._protocol.connection_lost(exc)
448 finally:
449 self._pipe.close()
450 self._pipe = None
451 self._protocol = None
452 self._loop = None
453
454
Yury Selivanov3cb99142014-02-18 18:41:13 -0500455class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800456 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457
458 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100459 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._pipe = pipe
462 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400464 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 self._conn_lost = 0
466 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700467
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700468 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700469 is_char = stat.S_ISCHR(mode)
470 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700471 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700472 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700473 self._pipe = None
474 self._fileno = None
475 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100476 raise ValueError("Pipe transport is only for "
477 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700478
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100481
482 # On AIX, the reader trick (to be notified when the read end of the
483 # socket is closed) only works for sockets. On other platforms it
484 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700485 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100486 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400487 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100488 self._fileno, self._read_ready)
489
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100491 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500492 self._loop.call_soon(futures._set_result_unless_cancelled,
493 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
Victor Stinnere912e652014-07-12 03:11:53 +0200495 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100496 info = [self.__class__.__name__]
497 if self._pipe is None:
498 info.append('closed')
499 elif self._closing:
500 info.append('closing')
501 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400502 selector = getattr(self._loop, '_selector', None)
503 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200504 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400505 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200506 self._fileno, selectors.EVENT_WRITE)
507 if polling:
508 info.append('polling')
509 else:
510 info.append('idle')
511
512 bufsize = self.get_write_buffer_size()
513 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400514 elif self._pipe is not None:
515 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200516 else:
517 info.append('closed')
518 return '<%s>' % ' '.join(info)
519
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800520 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400521 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800522
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700524 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200525 if self._loop.get_debug():
526 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100527 if self._buffer:
528 self._close(BrokenPipeError())
529 else:
530 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
532 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800533 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
534 if isinstance(data, bytearray):
535 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 if not data:
537 return
538
539 if self._conn_lost or self._closing:
540 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700541 logger.warning('pipe closed by peer or '
542 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 self._conn_lost += 1
544 return
545
546 if not self._buffer:
547 # Attempt to send it right away first.
548 try:
549 n = os.write(self._fileno, data)
550 except (BlockingIOError, InterruptedError):
551 n = 0
552 except Exception as exc:
553 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100554 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 return
556 if n == len(data):
557 return
558 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400559 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400560 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800563 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564
565 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400566 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400569 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400571 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400573 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 self._conn_lost += 1
575 # Remove writer here, _fatal_error() doesn't it
576 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400577 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100578 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400580 if n == len(self._buffer):
581 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400582 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800583 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400584 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400585 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 self._call_connection_lost(None)
587 return
588 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400589 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590
591 def can_write_eof(self):
592 return True
593
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 def write_eof(self):
595 if self._closing:
596 return
597 assert self._pipe
598 self._closing = True
599 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400600 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 self._loop.call_soon(self._call_connection_lost, None)
602
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400603 def set_protocol(self, protocol):
604 self._protocol = protocol
605
606 def get_protocol(self):
607 return self._protocol
608
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500609 def is_closing(self):
610 return self._closing
611
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100613 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 # write_eof is all what we needed to close the write pipe
615 self.write_eof()
616
Victor Stinner978a9af2015-01-29 17:50:58 +0100617 # On Python 3.3 and older, objects with a destructor part of a reference
618 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
619 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400620 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100621 def __del__(self):
622 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100623 warnings.warn("unclosed transport %r" % self, ResourceWarning,
624 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100625 self._pipe.close()
626
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 def abort(self):
628 self._close(None)
629
Victor Stinner0ee29c22014-02-19 01:40:41 +0100630 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200632 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200633 if self._loop.get_debug():
634 logger.debug("%r: %s", self, message, exc_info=True)
635 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500636 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100637 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500638 'exception': exc,
639 'transport': self,
640 'protocol': self._protocol,
641 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 self._close(exc)
643
644 def _close(self, exc=None):
645 self._closing = True
646 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400647 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400649 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650 self._loop.call_soon(self._call_connection_lost, exc)
651
652 def _call_connection_lost(self, exc):
653 try:
654 self._protocol.connection_lost(exc)
655 finally:
656 self._pipe.close()
657 self._pipe = None
658 self._protocol = None
659 self._loop = None
660
661
Victor Stinner1e40f102014-12-11 23:30:17 +0100662if hasattr(os, 'set_inheritable'):
663 # Python 3.4 and newer
664 _set_inheritable = os.set_inheritable
665else:
666 import fcntl
667
668 def _set_inheritable(fd, inheritable):
669 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
670
671 old = fcntl.fcntl(fd, fcntl.F_GETFD)
672 if not inheritable:
673 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
674 else:
675 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
676
677
Guido van Rossum59691282013-10-30 14:52:03 -0700678class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679
Guido van Rossum59691282013-10-30 14:52:03 -0700680 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700681 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700683 # Use a socket pair for stdin, since not all platforms
684 # support selecting read events on the write end of a
685 # socket (which we use in order to detect closing of the
686 # other end). Notably this is needed on AIX, and works
687 # just fine on other platforms.
688 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100689
690 # Mark the write end of the stdin pipe as non-inheritable,
691 # needed by close_fds=False on Python 3.3 and older
692 # (Python 3.4 implements the PEP 446, socketpair returns
693 # non-inheritable sockets)
694 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 self._proc = subprocess.Popen(
696 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
697 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700698 if stdin_w is not None:
699 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200700 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701
702
703class AbstractChildWatcher:
704 """Abstract base class for monitoring child processes.
705
706 Objects derived from this class monitor a collection of subprocesses and
707 report their termination or interruption by a signal.
708
709 New callbacks are registered with .add_child_handler(). Starting a new
710 process must be done within a 'with' block to allow the watcher to suspend
711 its activity until the new process if fully registered (this is needed to
712 prevent a race condition in some implementations).
713
714 Example:
715 with watcher:
716 proc = subprocess.Popen("sleep 1")
717 watcher.add_child_handler(proc.pid, callback)
718
719 Notes:
720 Implementations of this class must be thread-safe.
721
722 Since child watcher objects may catch the SIGCHLD signal and call
723 waitpid(-1), there should be only one active object per process.
724 """
725
726 def add_child_handler(self, pid, callback, *args):
727 """Register a new child handler.
728
729 Arrange for callback(pid, returncode, *args) to be called when
730 process 'pid' terminates. Specifying another callback for the same
731 process replaces the previous handler.
732
Victor Stinneracdb7822014-07-14 18:33:40 +0200733 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800734 """
735 raise NotImplementedError()
736
737 def remove_child_handler(self, pid):
738 """Removes the handler for process 'pid'.
739
740 The function returns True if the handler was successfully removed,
741 False if there was nothing to remove."""
742
743 raise NotImplementedError()
744
Guido van Rossum2bcae702013-11-13 15:50:08 -0800745 def attach_loop(self, loop):
746 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800747
Guido van Rossum2bcae702013-11-13 15:50:08 -0800748 If the watcher was previously attached to an event loop, then it is
749 first detached before attaching to the new loop.
750
751 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800752 """
753 raise NotImplementedError()
754
755 def close(self):
756 """Close the watcher.
757
758 This must be called to make sure that any underlying resource is freed.
759 """
760 raise NotImplementedError()
761
762 def __enter__(self):
763 """Enter the watcher's context and allow starting new processes
764
765 This function must return self"""
766 raise NotImplementedError()
767
768 def __exit__(self, a, b, c):
769 """Exit the watcher's context"""
770 raise NotImplementedError()
771
772
773class BaseChildWatcher(AbstractChildWatcher):
774
Guido van Rossum2bcae702013-11-13 15:50:08 -0800775 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800776 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400777 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800778
779 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800780 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800781
782 def _do_waitpid(self, expected_pid):
783 raise NotImplementedError()
784
785 def _do_waitpid_all(self):
786 raise NotImplementedError()
787
Guido van Rossum2bcae702013-11-13 15:50:08 -0800788 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789 assert loop is None or isinstance(loop, events.AbstractEventLoop)
790
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400791 if self._loop is not None and loop is None and self._callbacks:
792 warnings.warn(
793 'A loop is being detached '
794 'from a child watcher with pending handlers',
795 RuntimeWarning)
796
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800797 if self._loop is not None:
798 self._loop.remove_signal_handler(signal.SIGCHLD)
799
800 self._loop = loop
801 if loop is not None:
802 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
803
804 # Prevent a race condition in case a child terminated
805 # during the switch.
806 self._do_waitpid_all()
807
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800808 def _sig_chld(self):
809 try:
810 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500811 except Exception as exc:
812 # self._loop should always be available here
813 # as '_sig_chld' is added as a signal handler
814 # in 'attach_loop'
815 self._loop.call_exception_handler({
816 'message': 'Unknown exception in SIGCHLD handler',
817 'exception': exc,
818 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819
820 def _compute_returncode(self, status):
821 if os.WIFSIGNALED(status):
822 # The child process died because of a signal.
823 return -os.WTERMSIG(status)
824 elif os.WIFEXITED(status):
825 # The child process exited (e.g sys.exit()).
826 return os.WEXITSTATUS(status)
827 else:
828 # The child exited, but we don't understand its status.
829 # This shouldn't happen, but if it does, let's just
830 # return that status; perhaps that helps debug it.
831 return status
832
833
834class SafeChildWatcher(BaseChildWatcher):
835 """'Safe' child watcher implementation.
836
837 This implementation avoids disrupting other code spawning processes by
838 polling explicitly each process in the SIGCHLD handler instead of calling
839 os.waitpid(-1).
840
841 This is a safe solution but it has a significant overhead when handling a
842 big number of children (O(n) each time SIGCHLD is raised)
843 """
844
Guido van Rossum2bcae702013-11-13 15:50:08 -0800845 def close(self):
846 self._callbacks.clear()
847 super().close()
848
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849 def __enter__(self):
850 return self
851
852 def __exit__(self, a, b, c):
853 pass
854
855 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400856 if self._loop is None:
857 raise RuntimeError(
858 "Cannot add child handler, "
859 "the child watcher does not have a loop attached")
860
Victor Stinner47cd10d2015-01-30 00:05:19 +0100861 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862
863 # Prevent a race condition in case the child is already terminated.
864 self._do_waitpid(pid)
865
Guido van Rossum2bcae702013-11-13 15:50:08 -0800866 def remove_child_handler(self, pid):
867 try:
868 del self._callbacks[pid]
869 return True
870 except KeyError:
871 return False
872
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800873 def _do_waitpid_all(self):
874
875 for pid in list(self._callbacks):
876 self._do_waitpid(pid)
877
878 def _do_waitpid(self, expected_pid):
879 assert expected_pid > 0
880
881 try:
882 pid, status = os.waitpid(expected_pid, os.WNOHANG)
883 except ChildProcessError:
884 # The child process is already reaped
885 # (may happen if waitpid() is called elsewhere).
886 pid = expected_pid
887 returncode = 255
888 logger.warning(
889 "Unknown child process pid %d, will report returncode 255",
890 pid)
891 else:
892 if pid == 0:
893 # The child process is still alive.
894 return
895
896 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200897 if self._loop.get_debug():
898 logger.debug('process %s exited with returncode %s',
899 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
901 try:
902 callback, args = self._callbacks.pop(pid)
903 except KeyError: # pragma: no cover
904 # May happen if .remove_child_handler() is called
905 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200906 if self._loop.get_debug():
907 logger.warning("Child watcher got an unexpected pid: %r",
908 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800909 else:
910 callback(pid, returncode, *args)
911
912
913class FastChildWatcher(BaseChildWatcher):
914 """'Fast' child watcher implementation.
915
916 This implementation reaps every terminated processes by calling
917 os.waitpid(-1) directly, possibly breaking other code spawning processes
918 and waiting for their termination.
919
920 There is no noticeable overhead when handling a big number of children
921 (O(1) each time a child terminates).
922 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800923 def __init__(self):
924 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925 self._lock = threading.Lock()
926 self._zombies = {}
927 self._forks = 0
928
929 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800930 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800931 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800932 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800933
934 def __enter__(self):
935 with self._lock:
936 self._forks += 1
937
938 return self
939
940 def __exit__(self, a, b, c):
941 with self._lock:
942 self._forks -= 1
943
944 if self._forks or not self._zombies:
945 return
946
947 collateral_victims = str(self._zombies)
948 self._zombies.clear()
949
950 logger.warning(
951 "Caught subprocesses termination from unknown pids: %s",
952 collateral_victims)
953
954 def add_child_handler(self, pid, callback, *args):
955 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400956
957 if self._loop is None:
958 raise RuntimeError(
959 "Cannot add child handler, "
960 "the child watcher does not have a loop attached")
961
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800962 with self._lock:
963 try:
964 returncode = self._zombies.pop(pid)
965 except KeyError:
966 # The child is running.
967 self._callbacks[pid] = callback, args
968 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800970 # The child is dead already. We can fire the callback.
971 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972
Guido van Rossum2bcae702013-11-13 15:50:08 -0800973 def remove_child_handler(self, pid):
974 try:
975 del self._callbacks[pid]
976 return True
977 except KeyError:
978 return False
979
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980 def _do_waitpid_all(self):
981 # Because of signal coalescing, we must keep calling waitpid() as
982 # long as we're able to reap a child.
983 while True:
984 try:
985 pid, status = os.waitpid(-1, os.WNOHANG)
986 except ChildProcessError:
987 # No more child processes exist.
988 return
989 else:
990 if pid == 0:
991 # A child process is still alive.
992 return
993
994 returncode = self._compute_returncode(status)
995
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800996 with self._lock:
997 try:
998 callback, args = self._callbacks.pop(pid)
999 except KeyError:
1000 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001001 if self._forks:
1002 # It may not be registered yet.
1003 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +02001004 if self._loop.get_debug():
1005 logger.debug('unknown process %s exited '
1006 'with returncode %s',
1007 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001009 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001010 else:
1011 if self._loop.get_debug():
1012 logger.debug('process %s exited with returncode %s',
1013 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001014
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001015 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001016 logger.warning(
1017 "Caught subprocess termination from unknown pid: "
1018 "%d -> %d", pid, returncode)
1019 else:
1020 callback(pid, returncode, *args)
1021
1022
1023class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001024 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001025 _loop_factory = _UnixSelectorEventLoop
1026
1027 def __init__(self):
1028 super().__init__()
1029 self._watcher = None
1030
1031 def _init_watcher(self):
1032 with events._lock:
1033 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001034 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001035 if isinstance(threading.current_thread(),
1036 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001037 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001038
1039 def set_event_loop(self, loop):
1040 """Set the event loop.
1041
1042 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001043 .set_event_loop() from the main thread will call .attach_loop(loop) on
1044 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001045 """
1046
1047 super().set_event_loop(loop)
1048
1049 if self._watcher is not None and \
1050 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001051 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001052
1053 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001054 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001055
1056 If not yet set, a SafeChildWatcher object is automatically created.
1057 """
1058 if self._watcher is None:
1059 self._init_watcher()
1060
1061 return self._watcher
1062
1063 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001064 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001065
1066 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1067
1068 if self._watcher is not None:
1069 self._watcher.close()
1070
1071 self._watcher = watcher
1072
1073SelectorEventLoop = _UnixSelectorEventLoop
1074DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy