blob: 2843678bbafc48c17346f23bcaa1a0127796b113 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400180 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400237 if (sock.family != socket.AF_UNIX or
238 sock.type != socket.SOCK_STREAM):
239 raise ValueError(
240 'A UNIX Domain Stream Socket was expected, got {!r}'
241 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500242 sock.setblocking(False)
243
244 transport, protocol = yield from self._create_connection_transport(
245 sock, protocol_factory, ssl, server_hostname)
246 return transport, protocol
247
Victor Stinnerf951d282014-06-29 00:46:45 +0200248 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 def create_unix_server(self, protocol_factory, path=None, *,
250 sock=None, backlog=100, ssl=None):
251 if isinstance(ssl, bool):
252 raise TypeError('ssl argument must be an SSLContext or None')
253
254 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200255 if sock is not None:
256 raise ValueError(
257 'path and sock can not be specified at the same time')
258
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
260
Yury Selivanov908d55d2016-10-09 12:15:08 -0400261 # Check for abstract socket. `str` and `bytes` paths are supported.
262 if path[0] not in (0, '\x00'):
263 try:
264 if stat.S_ISSOCK(os.stat(path).st_mode):
265 os.remove(path)
266 except FileNotFoundError:
267 pass
268 except OSError as err:
269 # Directory may have permissions only to create socket.
270 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
271
Yury Selivanovb057c522014-02-18 12:15:06 -0500272 try:
273 sock.bind(path)
274 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100275 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500276 if exc.errno == errno.EADDRINUSE:
277 # Let's improve the error message by adding
278 # with what exact address it occurs.
279 msg = 'Address {!r} is already in use'.format(path)
280 raise OSError(errno.EADDRINUSE, msg) from None
281 else:
282 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200283 except:
284 sock.close()
285 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 else:
287 if sock is None:
288 raise ValueError(
289 'path was not specified, and no sock specified')
290
Yury Selivanov36e7e972016-10-07 12:39:57 -0400291 if (sock.family != socket.AF_UNIX or
292 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500293 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400294 'A UNIX Domain Stream Socket was expected, got {!r}'
295 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500296
297 server = base_events.Server(self, [sock])
298 sock.listen(backlog)
299 sock.setblocking(False)
300 self._start_serving(protocol_factory, sock, ssl, server)
301 return server
302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200304if hasattr(os, 'set_blocking'):
305 def _set_nonblocking(fd):
306 os.set_blocking(fd, False)
307else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400308 import fcntl
309
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200310 def _set_nonblocking(fd):
311 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
312 flags = flags | os.O_NONBLOCK
313 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
316class _UnixReadPipeTransport(transports.ReadTransport):
317
Yury Selivanovdec1a452014-02-18 22:27:48 -0500318 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319
320 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
321 super().__init__(extra)
322 self._extra['pipe'] = pipe
323 self._loop = loop
324 self._pipe = pipe
325 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700326 self._protocol = protocol
327 self._closing = False
328
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700329 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800330 if not (stat.S_ISFIFO(mode) or
331 stat.S_ISSOCK(mode) or
332 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700333 self._pipe = None
334 self._fileno = None
335 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700336 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100341 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400342 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100343 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100345 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500346 self._loop.call_soon(futures._set_result_unless_cancelled,
347 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Victor Stinnere912e652014-07-12 03:11:53 +0200349 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100350 info = [self.__class__.__name__]
351 if self._pipe is None:
352 info.append('closed')
353 elif self._closing:
354 info.append('closing')
355 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400356 selector = getattr(self._loop, '_selector', None)
357 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200358 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400359 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200360 self._fileno, selectors.EVENT_READ)
361 if polling:
362 info.append('polling')
363 else:
364 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400365 elif self._pipe is not None:
366 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200367 else:
368 info.append('closed')
369 return '<%s>' % ' '.join(info)
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 def _read_ready(self):
372 try:
373 data = os.read(self._fileno, self.max_size)
374 except (BlockingIOError, InterruptedError):
375 pass
376 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100377 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 else:
379 if data:
380 self._protocol.data_received(data)
381 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200382 if self._loop.get_debug():
383 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400385 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 self._loop.call_soon(self._protocol.eof_received)
387 self._loop.call_soon(self._call_connection_lost, None)
388
Guido van Rossum57497ad2013-10-18 07:58:20 -0700389 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400390 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
Guido van Rossum57497ad2013-10-18 07:58:20 -0700392 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400393 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400395 def set_protocol(self, protocol):
396 self._protocol = protocol
397
398 def get_protocol(self):
399 return self._protocol
400
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500401 def is_closing(self):
402 return self._closing
403
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 def close(self):
405 if not self._closing:
406 self._close(None)
407
Victor Stinner978a9af2015-01-29 17:50:58 +0100408 # On Python 3.3 and older, objects with a destructor part of a reference
409 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
410 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400411 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100412 def __del__(self):
413 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100414 warnings.warn("unclosed transport %r" % self, ResourceWarning,
415 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100416 self._pipe.close()
417
Victor Stinner0ee29c22014-02-19 01:40:41 +0100418 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200420 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
421 if self._loop.get_debug():
422 logger.debug("%r: %s", self, message, exc_info=True)
423 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500424 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100425 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500426 'exception': exc,
427 'transport': self,
428 'protocol': self._protocol,
429 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 self._close(exc)
431
432 def _close(self, exc):
433 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400434 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 self._loop.call_soon(self._call_connection_lost, exc)
436
437 def _call_connection_lost(self, exc):
438 try:
439 self._protocol.connection_lost(exc)
440 finally:
441 self._pipe.close()
442 self._pipe = None
443 self._protocol = None
444 self._loop = None
445
446
Yury Selivanov3cb99142014-02-18 18:41:13 -0500447class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800448 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
450 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100451 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 self._pipe = pipe
454 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400456 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 self._conn_lost = 0
458 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700459
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700460 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700461 is_char = stat.S_ISCHR(mode)
462 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700463 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700464 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700465 self._pipe = None
466 self._fileno = None
467 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100468 raise ValueError("Pipe transport is only for "
469 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100473
474 # On AIX, the reader trick (to be notified when the read end of the
475 # socket is closed) only works for sockets. On other platforms it
476 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700477 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100478 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400479 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100480 self._fileno, self._read_ready)
481
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100483 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500484 self._loop.call_soon(futures._set_result_unless_cancelled,
485 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486
Victor Stinnere912e652014-07-12 03:11:53 +0200487 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100488 info = [self.__class__.__name__]
489 if self._pipe is None:
490 info.append('closed')
491 elif self._closing:
492 info.append('closing')
493 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400494 selector = getattr(self._loop, '_selector', None)
495 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200496 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400497 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200498 self._fileno, selectors.EVENT_WRITE)
499 if polling:
500 info.append('polling')
501 else:
502 info.append('idle')
503
504 bufsize = self.get_write_buffer_size()
505 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400506 elif self._pipe is not None:
507 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200508 else:
509 info.append('closed')
510 return '<%s>' % ' '.join(info)
511
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800512 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400513 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800514
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700516 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200517 if self._loop.get_debug():
518 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100519 if self._buffer:
520 self._close(BrokenPipeError())
521 else:
522 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523
524 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800525 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
526 if isinstance(data, bytearray):
527 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 if not data:
529 return
530
531 if self._conn_lost or self._closing:
532 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700533 logger.warning('pipe closed by peer or '
534 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 self._conn_lost += 1
536 return
537
538 if not self._buffer:
539 # Attempt to send it right away first.
540 try:
541 n = os.write(self._fileno, data)
542 except (BlockingIOError, InterruptedError):
543 n = 0
544 except Exception as exc:
545 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100546 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 return
548 if n == len(data):
549 return
550 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400551 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400552 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400554 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800555 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556
557 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400558 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400561 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400563 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400565 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 self._conn_lost += 1
567 # Remove writer here, _fatal_error() doesn't it
568 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400569 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100570 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400572 if n == len(self._buffer):
573 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400574 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800575 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400576 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400577 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 self._call_connection_lost(None)
579 return
580 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400581 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582
583 def can_write_eof(self):
584 return True
585
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 def write_eof(self):
587 if self._closing:
588 return
589 assert self._pipe
590 self._closing = True
591 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400592 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 self._loop.call_soon(self._call_connection_lost, None)
594
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400595 def set_protocol(self, protocol):
596 self._protocol = protocol
597
598 def get_protocol(self):
599 return self._protocol
600
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500601 def is_closing(self):
602 return self._closing
603
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100605 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 # write_eof is all what we needed to close the write pipe
607 self.write_eof()
608
Victor Stinner978a9af2015-01-29 17:50:58 +0100609 # On Python 3.3 and older, objects with a destructor part of a reference
610 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
611 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400612 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100613 def __del__(self):
614 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100615 warnings.warn("unclosed transport %r" % self, ResourceWarning,
616 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100617 self._pipe.close()
618
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 def abort(self):
620 self._close(None)
621
Victor Stinner0ee29c22014-02-19 01:40:41 +0100622 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200624 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200625 if self._loop.get_debug():
626 logger.debug("%r: %s", self, message, exc_info=True)
627 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500628 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100629 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500630 'exception': exc,
631 'transport': self,
632 'protocol': self._protocol,
633 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 self._close(exc)
635
636 def _close(self, exc=None):
637 self._closing = True
638 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400639 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700640 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400641 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 self._loop.call_soon(self._call_connection_lost, exc)
643
644 def _call_connection_lost(self, exc):
645 try:
646 self._protocol.connection_lost(exc)
647 finally:
648 self._pipe.close()
649 self._pipe = None
650 self._protocol = None
651 self._loop = None
652
653
Victor Stinner1e40f102014-12-11 23:30:17 +0100654if hasattr(os, 'set_inheritable'):
655 # Python 3.4 and newer
656 _set_inheritable = os.set_inheritable
657else:
658 import fcntl
659
660 def _set_inheritable(fd, inheritable):
661 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
662
663 old = fcntl.fcntl(fd, fcntl.F_GETFD)
664 if not inheritable:
665 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
666 else:
667 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
668
669
Guido van Rossum59691282013-10-30 14:52:03 -0700670class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
Guido van Rossum59691282013-10-30 14:52:03 -0700672 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700673 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700675 # Use a socket pair for stdin, since not all platforms
676 # support selecting read events on the write end of a
677 # socket (which we use in order to detect closing of the
678 # other end). Notably this is needed on AIX, and works
679 # just fine on other platforms.
680 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100681
682 # Mark the write end of the stdin pipe as non-inheritable,
683 # needed by close_fds=False on Python 3.3 and older
684 # (Python 3.4 implements the PEP 446, socketpair returns
685 # non-inheritable sockets)
686 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 self._proc = subprocess.Popen(
688 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
689 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700690 if stdin_w is not None:
691 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200692 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800693
694
695class AbstractChildWatcher:
696 """Abstract base class for monitoring child processes.
697
698 Objects derived from this class monitor a collection of subprocesses and
699 report their termination or interruption by a signal.
700
701 New callbacks are registered with .add_child_handler(). Starting a new
702 process must be done within a 'with' block to allow the watcher to suspend
703 its activity until the new process if fully registered (this is needed to
704 prevent a race condition in some implementations).
705
706 Example:
707 with watcher:
708 proc = subprocess.Popen("sleep 1")
709 watcher.add_child_handler(proc.pid, callback)
710
711 Notes:
712 Implementations of this class must be thread-safe.
713
714 Since child watcher objects may catch the SIGCHLD signal and call
715 waitpid(-1), there should be only one active object per process.
716 """
717
718 def add_child_handler(self, pid, callback, *args):
719 """Register a new child handler.
720
721 Arrange for callback(pid, returncode, *args) to be called when
722 process 'pid' terminates. Specifying another callback for the same
723 process replaces the previous handler.
724
Victor Stinneracdb7822014-07-14 18:33:40 +0200725 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726 """
727 raise NotImplementedError()
728
729 def remove_child_handler(self, pid):
730 """Removes the handler for process 'pid'.
731
732 The function returns True if the handler was successfully removed,
733 False if there was nothing to remove."""
734
735 raise NotImplementedError()
736
Guido van Rossum2bcae702013-11-13 15:50:08 -0800737 def attach_loop(self, loop):
738 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800739
Guido van Rossum2bcae702013-11-13 15:50:08 -0800740 If the watcher was previously attached to an event loop, then it is
741 first detached before attaching to the new loop.
742
743 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800744 """
745 raise NotImplementedError()
746
747 def close(self):
748 """Close the watcher.
749
750 This must be called to make sure that any underlying resource is freed.
751 """
752 raise NotImplementedError()
753
754 def __enter__(self):
755 """Enter the watcher's context and allow starting new processes
756
757 This function must return self"""
758 raise NotImplementedError()
759
760 def __exit__(self, a, b, c):
761 """Exit the watcher's context"""
762 raise NotImplementedError()
763
764
765class BaseChildWatcher(AbstractChildWatcher):
766
Guido van Rossum2bcae702013-11-13 15:50:08 -0800767 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800768 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400769 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800770
771 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800772 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773
774 def _do_waitpid(self, expected_pid):
775 raise NotImplementedError()
776
777 def _do_waitpid_all(self):
778 raise NotImplementedError()
779
Guido van Rossum2bcae702013-11-13 15:50:08 -0800780 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800781 assert loop is None or isinstance(loop, events.AbstractEventLoop)
782
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400783 if self._loop is not None and loop is None and self._callbacks:
784 warnings.warn(
785 'A loop is being detached '
786 'from a child watcher with pending handlers',
787 RuntimeWarning)
788
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789 if self._loop is not None:
790 self._loop.remove_signal_handler(signal.SIGCHLD)
791
792 self._loop = loop
793 if loop is not None:
794 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
795
796 # Prevent a race condition in case a child terminated
797 # during the switch.
798 self._do_waitpid_all()
799
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 def _sig_chld(self):
801 try:
802 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500803 except Exception as exc:
804 # self._loop should always be available here
805 # as '_sig_chld' is added as a signal handler
806 # in 'attach_loop'
807 self._loop.call_exception_handler({
808 'message': 'Unknown exception in SIGCHLD handler',
809 'exception': exc,
810 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800811
812 def _compute_returncode(self, status):
813 if os.WIFSIGNALED(status):
814 # The child process died because of a signal.
815 return -os.WTERMSIG(status)
816 elif os.WIFEXITED(status):
817 # The child process exited (e.g sys.exit()).
818 return os.WEXITSTATUS(status)
819 else:
820 # The child exited, but we don't understand its status.
821 # This shouldn't happen, but if it does, let's just
822 # return that status; perhaps that helps debug it.
823 return status
824
825
826class SafeChildWatcher(BaseChildWatcher):
827 """'Safe' child watcher implementation.
828
829 This implementation avoids disrupting other code spawning processes by
830 polling explicitly each process in the SIGCHLD handler instead of calling
831 os.waitpid(-1).
832
833 This is a safe solution but it has a significant overhead when handling a
834 big number of children (O(n) each time SIGCHLD is raised)
835 """
836
Guido van Rossum2bcae702013-11-13 15:50:08 -0800837 def close(self):
838 self._callbacks.clear()
839 super().close()
840
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800841 def __enter__(self):
842 return self
843
844 def __exit__(self, a, b, c):
845 pass
846
847 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400848 if self._loop is None:
849 raise RuntimeError(
850 "Cannot add child handler, "
851 "the child watcher does not have a loop attached")
852
Victor Stinner47cd10d2015-01-30 00:05:19 +0100853 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854
855 # Prevent a race condition in case the child is already terminated.
856 self._do_waitpid(pid)
857
Guido van Rossum2bcae702013-11-13 15:50:08 -0800858 def remove_child_handler(self, pid):
859 try:
860 del self._callbacks[pid]
861 return True
862 except KeyError:
863 return False
864
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 def _do_waitpid_all(self):
866
867 for pid in list(self._callbacks):
868 self._do_waitpid(pid)
869
870 def _do_waitpid(self, expected_pid):
871 assert expected_pid > 0
872
873 try:
874 pid, status = os.waitpid(expected_pid, os.WNOHANG)
875 except ChildProcessError:
876 # The child process is already reaped
877 # (may happen if waitpid() is called elsewhere).
878 pid = expected_pid
879 returncode = 255
880 logger.warning(
881 "Unknown child process pid %d, will report returncode 255",
882 pid)
883 else:
884 if pid == 0:
885 # The child process is still alive.
886 return
887
888 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200889 if self._loop.get_debug():
890 logger.debug('process %s exited with returncode %s',
891 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892
893 try:
894 callback, args = self._callbacks.pop(pid)
895 except KeyError: # pragma: no cover
896 # May happen if .remove_child_handler() is called
897 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200898 if self._loop.get_debug():
899 logger.warning("Child watcher got an unexpected pid: %r",
900 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800901 else:
902 callback(pid, returncode, *args)
903
904
905class FastChildWatcher(BaseChildWatcher):
906 """'Fast' child watcher implementation.
907
908 This implementation reaps every terminated processes by calling
909 os.waitpid(-1) directly, possibly breaking other code spawning processes
910 and waiting for their termination.
911
912 There is no noticeable overhead when handling a big number of children
913 (O(1) each time a child terminates).
914 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800915 def __init__(self):
916 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800917 self._lock = threading.Lock()
918 self._zombies = {}
919 self._forks = 0
920
921 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800922 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800923 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800924 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925
926 def __enter__(self):
927 with self._lock:
928 self._forks += 1
929
930 return self
931
932 def __exit__(self, a, b, c):
933 with self._lock:
934 self._forks -= 1
935
936 if self._forks or not self._zombies:
937 return
938
939 collateral_victims = str(self._zombies)
940 self._zombies.clear()
941
942 logger.warning(
943 "Caught subprocesses termination from unknown pids: %s",
944 collateral_victims)
945
946 def add_child_handler(self, pid, callback, *args):
947 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400948
949 if self._loop is None:
950 raise RuntimeError(
951 "Cannot add child handler, "
952 "the child watcher does not have a loop attached")
953
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800954 with self._lock:
955 try:
956 returncode = self._zombies.pop(pid)
957 except KeyError:
958 # The child is running.
959 self._callbacks[pid] = callback, args
960 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800961
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800962 # The child is dead already. We can fire the callback.
963 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800964
Guido van Rossum2bcae702013-11-13 15:50:08 -0800965 def remove_child_handler(self, pid):
966 try:
967 del self._callbacks[pid]
968 return True
969 except KeyError:
970 return False
971
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 def _do_waitpid_all(self):
973 # Because of signal coalescing, we must keep calling waitpid() as
974 # long as we're able to reap a child.
975 while True:
976 try:
977 pid, status = os.waitpid(-1, os.WNOHANG)
978 except ChildProcessError:
979 # No more child processes exist.
980 return
981 else:
982 if pid == 0:
983 # A child process is still alive.
984 return
985
986 returncode = self._compute_returncode(status)
987
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800988 with self._lock:
989 try:
990 callback, args = self._callbacks.pop(pid)
991 except KeyError:
992 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800993 if self._forks:
994 # It may not be registered yet.
995 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200996 if self._loop.get_debug():
997 logger.debug('unknown process %s exited '
998 'with returncode %s',
999 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001000 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001001 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001002 else:
1003 if self._loop.get_debug():
1004 logger.debug('process %s exited with returncode %s',
1005 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001006
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001007 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008 logger.warning(
1009 "Caught subprocess termination from unknown pid: "
1010 "%d -> %d", pid, returncode)
1011 else:
1012 callback(pid, returncode, *args)
1013
1014
1015class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001016 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001017 _loop_factory = _UnixSelectorEventLoop
1018
1019 def __init__(self):
1020 super().__init__()
1021 self._watcher = None
1022
1023 def _init_watcher(self):
1024 with events._lock:
1025 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001026 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001027 if isinstance(threading.current_thread(),
1028 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001029 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001030
1031 def set_event_loop(self, loop):
1032 """Set the event loop.
1033
1034 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001035 .set_event_loop() from the main thread will call .attach_loop(loop) on
1036 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001037 """
1038
1039 super().set_event_loop(loop)
1040
1041 if self._watcher is not None and \
1042 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001043 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001044
1045 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001046 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001047
1048 If not yet set, a SafeChildWatcher object is automatically created.
1049 """
1050 if self._watcher is None:
1051 self._init_watcher()
1052
1053 return self._watcher
1054
1055 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001056 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001057
1058 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1059
1060 if self._watcher is not None:
1061 self._watcher.close()
1062
1063 self._watcher = watcher
1064
1065SelectorEventLoop = _UnixSelectorEventLoop
1066DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy