blob: e1f5c52be00cb08858d8610426320c856b60f521 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400180 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
237 sock.setblocking(False)
238
239 transport, protocol = yield from self._create_connection_transport(
240 sock, protocol_factory, ssl, server_hostname)
241 return transport, protocol
242
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 def create_unix_server(self, protocol_factory, path=None, *,
245 sock=None, backlog=100, ssl=None):
246 if isinstance(ssl, bool):
247 raise TypeError('ssl argument must be an SSLContext or None')
248
249 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
255
256 try:
257 sock.bind(path)
258 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100259 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if exc.errno == errno.EADDRINUSE:
261 # Let's improve the error message by adding
262 # with what exact address it occurs.
263 msg = 'Address {!r} is already in use'.format(path)
264 raise OSError(errno.EADDRINUSE, msg) from None
265 else:
266 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200267 except:
268 sock.close()
269 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 else:
271 if sock is None:
272 raise ValueError(
273 'path was not specified, and no sock specified')
274
275 if sock.family != socket.AF_UNIX:
276 raise ValueError(
277 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
278
279 server = base_events.Server(self, [sock])
280 sock.listen(backlog)
281 sock.setblocking(False)
282 self._start_serving(protocol_factory, sock, ssl, server)
283 return server
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200286if hasattr(os, 'set_blocking'):
287 def _set_nonblocking(fd):
288 os.set_blocking(fd, False)
289else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400290 import fcntl
291
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200292 def _set_nonblocking(fd):
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
294 flags = flags | os.O_NONBLOCK
295 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
297
298class _UnixReadPipeTransport(transports.ReadTransport):
299
Yury Selivanovdec1a452014-02-18 22:27:48 -0500300 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
303 super().__init__(extra)
304 self._extra['pipe'] = pipe
305 self._loop = loop
306 self._pipe = pipe
307 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700308 self._protocol = protocol
309 self._closing = False
310
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700311 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800312 if not (stat.S_ISFIFO(mode) or
313 stat.S_ISSOCK(mode) or
314 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700315 self._pipe = None
316 self._fileno = None
317 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700318 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700319
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700321
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100323 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400324 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100325 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100327 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500328 self._loop.call_soon(futures._set_result_unless_cancelled,
329 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330
Victor Stinnere912e652014-07-12 03:11:53 +0200331 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100332 info = [self.__class__.__name__]
333 if self._pipe is None:
334 info.append('closed')
335 elif self._closing:
336 info.append('closing')
337 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400338 selector = getattr(self._loop, '_selector', None)
339 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200340 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400341 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200342 self._fileno, selectors.EVENT_READ)
343 if polling:
344 info.append('polling')
345 else:
346 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400347 elif self._pipe is not None:
348 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200349 else:
350 info.append('closed')
351 return '<%s>' % ' '.join(info)
352
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 def _read_ready(self):
354 try:
355 data = os.read(self._fileno, self.max_size)
356 except (BlockingIOError, InterruptedError):
357 pass
358 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100359 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 else:
361 if data:
362 self._protocol.data_received(data)
363 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200364 if self._loop.get_debug():
365 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400367 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 self._loop.call_soon(self._protocol.eof_received)
369 self._loop.call_soon(self._call_connection_lost, None)
370
Guido van Rossum57497ad2013-10-18 07:58:20 -0700371 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400372 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
Guido van Rossum57497ad2013-10-18 07:58:20 -0700374 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400375 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400377 def set_protocol(self, protocol):
378 self._protocol = protocol
379
380 def get_protocol(self):
381 return self._protocol
382
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500383 def is_closing(self):
384 return self._closing
385
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 def close(self):
387 if not self._closing:
388 self._close(None)
389
Victor Stinner978a9af2015-01-29 17:50:58 +0100390 # On Python 3.3 and older, objects with a destructor part of a reference
391 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
392 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400393 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100394 def __del__(self):
395 if self._pipe is not None:
396 warnings.warn("unclosed transport %r" % self, ResourceWarning)
397 self._pipe.close()
398
Victor Stinner0ee29c22014-02-19 01:40:41 +0100399 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200401 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
402 if self._loop.get_debug():
403 logger.debug("%r: %s", self, message, exc_info=True)
404 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500405 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100406 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500407 'exception': exc,
408 'transport': self,
409 'protocol': self._protocol,
410 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 self._close(exc)
412
413 def _close(self, exc):
414 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400415 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 self._loop.call_soon(self._call_connection_lost, exc)
417
418 def _call_connection_lost(self, exc):
419 try:
420 self._protocol.connection_lost(exc)
421 finally:
422 self._pipe.close()
423 self._pipe = None
424 self._protocol = None
425 self._loop = None
426
427
Yury Selivanov3cb99142014-02-18 18:41:13 -0500428class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800429 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430
431 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100432 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 self._pipe = pipe
435 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700436 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400437 self._buffer = bytearray()
Guido van Rossum47867872016-08-31 09:42:38 -0700438 self._conn_lost = 0
439 self._closing = False # Set when close() or write_eof() called.
440
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700441 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700442 is_char = stat.S_ISCHR(mode)
443 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700444 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700445 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700446 self._pipe = None
447 self._fileno = None
448 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100449 raise ValueError("Pipe transport is only for "
450 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700451
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100454
455 # On AIX, the reader trick (to be notified when the read end of the
456 # socket is closed) only works for sockets. On other platforms it
457 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700458 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100459 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400460 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100461 self._fileno, self._read_ready)
462
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700463 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100464 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500465 self._loop.call_soon(futures._set_result_unless_cancelled,
466 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
Victor Stinnere912e652014-07-12 03:11:53 +0200468 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100469 info = [self.__class__.__name__]
470 if self._pipe is None:
471 info.append('closed')
472 elif self._closing:
473 info.append('closing')
474 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400475 selector = getattr(self._loop, '_selector', None)
476 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200477 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400478 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200479 self._fileno, selectors.EVENT_WRITE)
480 if polling:
481 info.append('polling')
482 else:
483 info.append('idle')
484
485 bufsize = self.get_write_buffer_size()
486 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400487 elif self._pipe is not None:
488 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200489 else:
490 info.append('closed')
491 return '<%s>' % ' '.join(info)
492
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800493 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400494 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800495
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700497 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200498 if self._loop.get_debug():
499 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100500 if self._buffer:
501 self._close(BrokenPipeError())
502 else:
503 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504
505 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800506 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
507 if isinstance(data, bytearray):
508 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 if not data:
510 return
511
512 if self._conn_lost or self._closing:
513 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700514 logger.warning('pipe closed by peer or '
515 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516 self._conn_lost += 1
517 return
518
519 if not self._buffer:
520 # Attempt to send it right away first.
521 try:
522 n = os.write(self._fileno, data)
523 except (BlockingIOError, InterruptedError):
524 n = 0
525 except Exception as exc:
526 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100527 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 return
529 if n == len(data):
530 return
531 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400532 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400533 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400535 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800536 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537
538 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400539 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400542 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400544 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400546 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 self._conn_lost += 1
548 # Remove writer here, _fatal_error() doesn't it
549 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400550 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100551 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400553 if n == len(self._buffer):
554 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400555 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800556 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400558 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 self._call_connection_lost(None)
560 return
561 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563
564 def can_write_eof(self):
565 return True
566
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567 def write_eof(self):
568 if self._closing:
569 return
570 assert self._pipe
571 self._closing = True
572 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400573 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 self._loop.call_soon(self._call_connection_lost, None)
575
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400576 def set_protocol(self, protocol):
577 self._protocol = protocol
578
579 def get_protocol(self):
580 return self._protocol
581
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500582 def is_closing(self):
583 return self._closing
584
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100586 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 # write_eof is all what we needed to close the write pipe
588 self.write_eof()
589
Victor Stinner978a9af2015-01-29 17:50:58 +0100590 # On Python 3.3 and older, objects with a destructor part of a reference
591 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
592 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400593 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100594 def __del__(self):
595 if self._pipe is not None:
596 warnings.warn("unclosed transport %r" % self, ResourceWarning)
597 self._pipe.close()
598
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 def abort(self):
600 self._close(None)
601
Victor Stinner0ee29c22014-02-19 01:40:41 +0100602 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200604 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200605 if self._loop.get_debug():
606 logger.debug("%r: %s", self, message, exc_info=True)
607 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500608 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100609 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500610 'exception': exc,
611 'transport': self,
612 'protocol': self._protocol,
613 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 self._close(exc)
615
616 def _close(self, exc=None):
617 self._closing = True
618 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400619 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400621 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 self._loop.call_soon(self._call_connection_lost, exc)
623
624 def _call_connection_lost(self, exc):
625 try:
626 self._protocol.connection_lost(exc)
627 finally:
628 self._pipe.close()
629 self._pipe = None
630 self._protocol = None
631 self._loop = None
632
633
Victor Stinner1e40f102014-12-11 23:30:17 +0100634if hasattr(os, 'set_inheritable'):
635 # Python 3.4 and newer
636 _set_inheritable = os.set_inheritable
637else:
638 import fcntl
639
640 def _set_inheritable(fd, inheritable):
641 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
642
643 old = fcntl.fcntl(fd, fcntl.F_GETFD)
644 if not inheritable:
645 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
646 else:
647 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
648
649
Guido van Rossum59691282013-10-30 14:52:03 -0700650class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651
Guido van Rossum59691282013-10-30 14:52:03 -0700652 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700653 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700655 # Use a socket pair for stdin, since not all platforms
656 # support selecting read events on the write end of a
657 # socket (which we use in order to detect closing of the
658 # other end). Notably this is needed on AIX, and works
659 # just fine on other platforms.
660 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100661
662 # Mark the write end of the stdin pipe as non-inheritable,
663 # needed by close_fds=False on Python 3.3 and older
664 # (Python 3.4 implements the PEP 446, socketpair returns
665 # non-inheritable sockets)
666 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 self._proc = subprocess.Popen(
668 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
669 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700670 if stdin_w is not None:
671 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200672 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800673
674
675class AbstractChildWatcher:
676 """Abstract base class for monitoring child processes.
677
678 Objects derived from this class monitor a collection of subprocesses and
679 report their termination or interruption by a signal.
680
681 New callbacks are registered with .add_child_handler(). Starting a new
682 process must be done within a 'with' block to allow the watcher to suspend
683 its activity until the new process if fully registered (this is needed to
684 prevent a race condition in some implementations).
685
686 Example:
687 with watcher:
688 proc = subprocess.Popen("sleep 1")
689 watcher.add_child_handler(proc.pid, callback)
690
691 Notes:
692 Implementations of this class must be thread-safe.
693
694 Since child watcher objects may catch the SIGCHLD signal and call
695 waitpid(-1), there should be only one active object per process.
696 """
697
698 def add_child_handler(self, pid, callback, *args):
699 """Register a new child handler.
700
701 Arrange for callback(pid, returncode, *args) to be called when
702 process 'pid' terminates. Specifying another callback for the same
703 process replaces the previous handler.
704
Victor Stinneracdb7822014-07-14 18:33:40 +0200705 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800706 """
707 raise NotImplementedError()
708
709 def remove_child_handler(self, pid):
710 """Removes the handler for process 'pid'.
711
712 The function returns True if the handler was successfully removed,
713 False if there was nothing to remove."""
714
715 raise NotImplementedError()
716
Guido van Rossum2bcae702013-11-13 15:50:08 -0800717 def attach_loop(self, loop):
718 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800719
Guido van Rossum2bcae702013-11-13 15:50:08 -0800720 If the watcher was previously attached to an event loop, then it is
721 first detached before attaching to the new loop.
722
723 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724 """
725 raise NotImplementedError()
726
727 def close(self):
728 """Close the watcher.
729
730 This must be called to make sure that any underlying resource is freed.
731 """
732 raise NotImplementedError()
733
734 def __enter__(self):
735 """Enter the watcher's context and allow starting new processes
736
737 This function must return self"""
738 raise NotImplementedError()
739
740 def __exit__(self, a, b, c):
741 """Exit the watcher's context"""
742 raise NotImplementedError()
743
744
745class BaseChildWatcher(AbstractChildWatcher):
746
Guido van Rossum2bcae702013-11-13 15:50:08 -0800747 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800748 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400749 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800750
751 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800752 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753
754 def _do_waitpid(self, expected_pid):
755 raise NotImplementedError()
756
757 def _do_waitpid_all(self):
758 raise NotImplementedError()
759
Guido van Rossum2bcae702013-11-13 15:50:08 -0800760 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800761 assert loop is None or isinstance(loop, events.AbstractEventLoop)
762
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400763 if self._loop is not None and loop is None and self._callbacks:
764 warnings.warn(
765 'A loop is being detached '
766 'from a child watcher with pending handlers',
767 RuntimeWarning)
768
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800769 if self._loop is not None:
770 self._loop.remove_signal_handler(signal.SIGCHLD)
771
772 self._loop = loop
773 if loop is not None:
774 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
775
776 # Prevent a race condition in case a child terminated
777 # during the switch.
778 self._do_waitpid_all()
779
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800780 def _sig_chld(self):
781 try:
782 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500783 except Exception as exc:
784 # self._loop should always be available here
785 # as '_sig_chld' is added as a signal handler
786 # in 'attach_loop'
787 self._loop.call_exception_handler({
788 'message': 'Unknown exception in SIGCHLD handler',
789 'exception': exc,
790 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800791
792 def _compute_returncode(self, status):
793 if os.WIFSIGNALED(status):
794 # The child process died because of a signal.
795 return -os.WTERMSIG(status)
796 elif os.WIFEXITED(status):
797 # The child process exited (e.g sys.exit()).
798 return os.WEXITSTATUS(status)
799 else:
800 # The child exited, but we don't understand its status.
801 # This shouldn't happen, but if it does, let's just
802 # return that status; perhaps that helps debug it.
803 return status
804
805
806class SafeChildWatcher(BaseChildWatcher):
807 """'Safe' child watcher implementation.
808
809 This implementation avoids disrupting other code spawning processes by
810 polling explicitly each process in the SIGCHLD handler instead of calling
811 os.waitpid(-1).
812
813 This is a safe solution but it has a significant overhead when handling a
814 big number of children (O(n) each time SIGCHLD is raised)
815 """
816
Guido van Rossum2bcae702013-11-13 15:50:08 -0800817 def close(self):
818 self._callbacks.clear()
819 super().close()
820
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821 def __enter__(self):
822 return self
823
824 def __exit__(self, a, b, c):
825 pass
826
827 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400828 if self._loop is None:
829 raise RuntimeError(
830 "Cannot add child handler, "
831 "the child watcher does not have a loop attached")
832
Victor Stinner47cd10d2015-01-30 00:05:19 +0100833 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800834
835 # Prevent a race condition in case the child is already terminated.
836 self._do_waitpid(pid)
837
Guido van Rossum2bcae702013-11-13 15:50:08 -0800838 def remove_child_handler(self, pid):
839 try:
840 del self._callbacks[pid]
841 return True
842 except KeyError:
843 return False
844
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800845 def _do_waitpid_all(self):
846
847 for pid in list(self._callbacks):
848 self._do_waitpid(pid)
849
850 def _do_waitpid(self, expected_pid):
851 assert expected_pid > 0
852
853 try:
854 pid, status = os.waitpid(expected_pid, os.WNOHANG)
855 except ChildProcessError:
856 # The child process is already reaped
857 # (may happen if waitpid() is called elsewhere).
858 pid = expected_pid
859 returncode = 255
860 logger.warning(
861 "Unknown child process pid %d, will report returncode 255",
862 pid)
863 else:
864 if pid == 0:
865 # The child process is still alive.
866 return
867
868 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200869 if self._loop.get_debug():
870 logger.debug('process %s exited with returncode %s',
871 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800872
873 try:
874 callback, args = self._callbacks.pop(pid)
875 except KeyError: # pragma: no cover
876 # May happen if .remove_child_handler() is called
877 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200878 if self._loop.get_debug():
879 logger.warning("Child watcher got an unexpected pid: %r",
880 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800881 else:
882 callback(pid, returncode, *args)
883
884
885class FastChildWatcher(BaseChildWatcher):
886 """'Fast' child watcher implementation.
887
888 This implementation reaps every terminated processes by calling
889 os.waitpid(-1) directly, possibly breaking other code spawning processes
890 and waiting for their termination.
891
892 There is no noticeable overhead when handling a big number of children
893 (O(1) each time a child terminates).
894 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800895 def __init__(self):
896 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800897 self._lock = threading.Lock()
898 self._zombies = {}
899 self._forks = 0
900
901 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800902 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800904 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800905
906 def __enter__(self):
907 with self._lock:
908 self._forks += 1
909
910 return self
911
912 def __exit__(self, a, b, c):
913 with self._lock:
914 self._forks -= 1
915
916 if self._forks or not self._zombies:
917 return
918
919 collateral_victims = str(self._zombies)
920 self._zombies.clear()
921
922 logger.warning(
923 "Caught subprocesses termination from unknown pids: %s",
924 collateral_victims)
925
926 def add_child_handler(self, pid, callback, *args):
927 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400928
929 if self._loop is None:
930 raise RuntimeError(
931 "Cannot add child handler, "
932 "the child watcher does not have a loop attached")
933
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800934 with self._lock:
935 try:
936 returncode = self._zombies.pop(pid)
937 except KeyError:
938 # The child is running.
939 self._callbacks[pid] = callback, args
940 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800941
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800942 # The child is dead already. We can fire the callback.
943 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800944
Guido van Rossum2bcae702013-11-13 15:50:08 -0800945 def remove_child_handler(self, pid):
946 try:
947 del self._callbacks[pid]
948 return True
949 except KeyError:
950 return False
951
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800952 def _do_waitpid_all(self):
953 # Because of signal coalescing, we must keep calling waitpid() as
954 # long as we're able to reap a child.
955 while True:
956 try:
957 pid, status = os.waitpid(-1, os.WNOHANG)
958 except ChildProcessError:
959 # No more child processes exist.
960 return
961 else:
962 if pid == 0:
963 # A child process is still alive.
964 return
965
966 returncode = self._compute_returncode(status)
967
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800968 with self._lock:
969 try:
970 callback, args = self._callbacks.pop(pid)
971 except KeyError:
972 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800973 if self._forks:
974 # It may not be registered yet.
975 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200976 if self._loop.get_debug():
977 logger.debug('unknown process %s exited '
978 'with returncode %s',
979 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800981 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200982 else:
983 if self._loop.get_debug():
984 logger.debug('process %s exited with returncode %s',
985 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800986
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800987 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800988 logger.warning(
989 "Caught subprocess termination from unknown pid: "
990 "%d -> %d", pid, returncode)
991 else:
992 callback(pid, returncode, *args)
993
994
995class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100996 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800997 _loop_factory = _UnixSelectorEventLoop
998
999 def __init__(self):
1000 super().__init__()
1001 self._watcher = None
1002
1003 def _init_watcher(self):
1004 with events._lock:
1005 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001006 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001007 if isinstance(threading.current_thread(),
1008 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001009 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001010
1011 def set_event_loop(self, loop):
1012 """Set the event loop.
1013
1014 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001015 .set_event_loop() from the main thread will call .attach_loop(loop) on
1016 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001017 """
1018
1019 super().set_event_loop(loop)
1020
1021 if self._watcher is not None and \
1022 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001023 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001024
1025 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001026 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001027
1028 If not yet set, a SafeChildWatcher object is automatically created.
1029 """
1030 if self._watcher is None:
1031 self._init_watcher()
1032
1033 return self._watcher
1034
1035 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001036 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001037
1038 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1039
1040 if self._watcher is not None:
1041 self._watcher.close()
1042
1043 self._watcher = watcher
1044
1045SelectorEventLoop = _UnixSelectorEventLoop
1046DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy