blob: be98f334cecd96c871eb552c84ee944c67171e41 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080017from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010019from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020021from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020023from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinner915bcb02014-02-01 22:49:59 +010027__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080028 'AbstractChildWatcher', 'SafeChildWatcher',
29 'FastChildWatcher', 'DefaultEventLoopPolicy',
30 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032if sys.platform == 'win32': # pragma: no cover
33 raise ImportError('Signals are not really supported on Windows')
34
35
Victor Stinnerfe5649c2014-07-17 22:43:40 +020036def _sighandler_noop(signum, frame):
37 """Dummy signal handler."""
38 pass
39
40
Yury Selivanovd7c15182016-11-15 15:26:34 -050041try:
42 _fspath = os.fspath
43except AttributeError:
44 # Python 3.5 or earlier
45 _fspath = lambda path: path
46
47
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080048class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050049 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Yury Selivanovb057c522014-02-18 12:15:06 -050051 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 """
53
54 def __init__(self, selector=None):
55 super().__init__(selector)
56 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
58 def _socketpair(self):
59 return socket.socketpair()
60
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080061 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020062 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080063 for sig in list(self._signal_handlers):
64 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080065
Victor Stinnerfe5649c2014-07-17 22:43:40 +020066 def _process_self_data(self, data):
67 for signum in data:
68 if not signum:
69 # ignore null bytes written by _write_to_self()
70 continue
71 self._handle_signal(signum)
72
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 def add_signal_handler(self, sig, callback, *args):
74 """Add a handler for a signal. UNIX only.
75
76 Raise ValueError if the signal number is invalid or uncatchable.
77 Raise RuntimeError if there is a problem setting up the handler.
78 """
Victor Stinner2d99d932014-11-20 15:03:52 +010079 if (coroutines.iscoroutine(callback)
80 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010081 raise TypeError("coroutines cannot be used "
82 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010084 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 try:
86 # set_wakeup_fd() raises ValueError if this is not the
87 # main thread. By calling it early we ensure that an
88 # event loop running in another thread cannot add a signal
89 # handler.
90 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020091 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 raise RuntimeError(str(exc))
93
Yury Selivanov569efa22014-02-18 18:02:19 -050094 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095 self._signal_handlers[sig] = handle
96
97 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020098 # Register a dummy signal handler to ask Python to write the signal
99 # number in the wakup file descriptor. _process_self_data() will
100 # read signal numbers from this file descriptor to handle signals.
101 signal.signal(sig, _sighandler_noop)
102
Charles-François Natali74e7cf32013-12-05 22:47:19 +0100103 # Set SA_RESTART to limit EINTR occurrences.
104 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 except OSError as exc:
106 del self._signal_handlers[sig]
107 if not self._signal_handlers:
108 try:
109 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200110 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700111 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
113 if exc.errno == errno.EINVAL:
114 raise RuntimeError('sig {} cannot be caught'.format(sig))
115 else:
116 raise
117
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200118 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 """Internal helper that is the actual signal handler."""
120 handle = self._signal_handlers.get(sig)
121 if handle is None:
122 return # Assume it's some race condition.
123 if handle._cancelled:
124 self.remove_signal_handler(sig) # Remove it properly.
125 else:
126 self._add_callback_signalsafe(handle)
127
128 def remove_signal_handler(self, sig):
129 """Remove a handler for a signal. UNIX only.
130
131 Return True if a signal handler was removed, False if not.
132 """
133 self._check_signal(sig)
134 try:
135 del self._signal_handlers[sig]
136 except KeyError:
137 return False
138
139 if sig == signal.SIGINT:
140 handler = signal.default_int_handler
141 else:
142 handler = signal.SIG_DFL
143
144 try:
145 signal.signal(sig, handler)
146 except OSError as exc:
147 if exc.errno == errno.EINVAL:
148 raise RuntimeError('sig {} cannot be caught'.format(sig))
149 else:
150 raise
151
152 if not self._signal_handlers:
153 try:
154 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200155 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700156 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 return True
159
160 def _check_signal(self, sig):
161 """Internal helper to validate a signal.
162
163 Raise ValueError if the signal number is invalid or uncatchable.
164 Raise RuntimeError if there is a problem setting up the handler.
165 """
166 if not isinstance(sig, int):
167 raise TypeError('sig must be an int, not {!r}'.format(sig))
168
169 if not (1 <= sig < signal.NSIG):
170 raise ValueError(
171 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
172
173 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
174 extra=None):
175 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
176
177 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
178 extra=None):
179 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
180
Victor Stinnerf951d282014-06-29 00:46:45 +0200181 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 def _make_subprocess_transport(self, protocol, args, shell,
183 stdin, stdout, stderr, bufsize,
184 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400186 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800187 transp = _UnixSubprocessTransport(self, protocol, args, shell,
188 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100189 waiter=waiter, extra=extra,
190 **kwargs)
191
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800192 watcher.add_child_handler(transp.get_pid(),
193 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100194 try:
195 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100196 except Exception as exc:
197 # Workaround CPython bug #23353: using yield/yield-from in an
198 # except block of a generator doesn't clear properly
199 # sys.exc_info()
200 err = exc
201 else:
202 err = None
203
204 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100205 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100206 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100207 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800208
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 return transp
210
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800211 def _child_watcher_callback(self, pid, returncode, transp):
212 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
Victor Stinnerf951d282014-06-29 00:46:45 +0200214 @coroutine
Yury Selivanov423fd362017-11-20 17:26:28 -0500215 def create_unix_connection(self, protocol_factory, path=None, *,
Yury Selivanovb057c522014-02-18 12:15:06 -0500216 ssl=None, sock=None,
217 server_hostname=None):
218 assert server_hostname is None or isinstance(server_hostname, str)
219 if ssl:
220 if server_hostname is None:
221 raise ValueError(
222 'you have to pass server_hostname when using ssl')
223 else:
224 if server_hostname is not None:
225 raise ValueError('server_hostname is only meaningful with ssl')
226
227 if path is not None:
228 if sock is not None:
229 raise ValueError(
230 'path and sock can not be specified at the same time')
231
Yury Selivanov423fd362017-11-20 17:26:28 -0500232 path = _fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500234 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 sock.setblocking(False)
236 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100237 except:
238 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 raise
240
241 else:
242 if sock is None:
243 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400244 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500245 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400246 raise ValueError(
247 'A UNIX Domain Stream Socket was expected, got {!r}'
248 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 sock.setblocking(False)
250
251 transport, protocol = yield from self._create_connection_transport(
252 sock, protocol_factory, ssl, server_hostname)
253 return transport, protocol
254
Victor Stinnerf951d282014-06-29 00:46:45 +0200255 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500256 def create_unix_server(self, protocol_factory, path=None, *,
257 sock=None, backlog=100, ssl=None):
258 if isinstance(ssl, bool):
259 raise TypeError('ssl argument must be an SSLContext or None')
260
261 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200262 if sock is not None:
263 raise ValueError(
264 'path and sock can not be specified at the same time')
265
Yury Selivanovd7c15182016-11-15 15:26:34 -0500266 path = _fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500267 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268
Yury Selivanov908d55d2016-10-09 12:15:08 -0400269 # Check for abstract socket. `str` and `bytes` paths are supported.
270 if path[0] not in (0, '\x00'):
271 try:
272 if stat.S_ISSOCK(os.stat(path).st_mode):
273 os.remove(path)
274 except FileNotFoundError:
275 pass
276 except OSError as err:
277 # Directory may have permissions only to create socket.
278 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
279
Yury Selivanovb057c522014-02-18 12:15:06 -0500280 try:
281 sock.bind(path)
282 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100283 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500284 if exc.errno == errno.EADDRINUSE:
285 # Let's improve the error message by adding
286 # with what exact address it occurs.
287 msg = 'Address {!r} is already in use'.format(path)
288 raise OSError(errno.EADDRINUSE, msg) from None
289 else:
290 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200291 except:
292 sock.close()
293 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500294 else:
295 if sock is None:
296 raise ValueError(
297 'path was not specified, and no sock specified')
298
Yury Selivanov36e7e972016-10-07 12:39:57 -0400299 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500300 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500301 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400302 'A UNIX Domain Stream Socket was expected, got {!r}'
303 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500304
305 server = base_events.Server(self, [sock])
306 sock.listen(backlog)
307 sock.setblocking(False)
308 self._start_serving(protocol_factory, sock, ssl, server)
309 return server
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200312if hasattr(os, 'set_blocking'):
313 def _set_nonblocking(fd):
314 os.set_blocking(fd, False)
315else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400316 import fcntl
317
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200318 def _set_nonblocking(fd):
319 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
320 flags = flags | os.O_NONBLOCK
321 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322
323
324class _UnixReadPipeTransport(transports.ReadTransport):
325
Yury Selivanovdec1a452014-02-18 22:27:48 -0500326 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
328 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
329 super().__init__(extra)
330 self._extra['pipe'] = pipe
331 self._loop = loop
332 self._pipe = pipe
333 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700334 self._protocol = protocol
335 self._closing = False
336
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700337 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800338 if not (stat.S_ISFIFO(mode) or
339 stat.S_ISSOCK(mode) or
340 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700341 self._pipe = None
342 self._fileno = None
343 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700344 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700345
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700347
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100349 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400350 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100351 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100353 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500354 self._loop.call_soon(futures._set_result_unless_cancelled,
355 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356
Victor Stinnere912e652014-07-12 03:11:53 +0200357 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100358 info = [self.__class__.__name__]
359 if self._pipe is None:
360 info.append('closed')
361 elif self._closing:
362 info.append('closing')
363 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400364 selector = getattr(self._loop, '_selector', None)
365 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200366 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400367 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200368 self._fileno, selectors.EVENT_READ)
369 if polling:
370 info.append('polling')
371 else:
372 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400373 elif self._pipe is not None:
374 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200375 else:
376 info.append('closed')
377 return '<%s>' % ' '.join(info)
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def _read_ready(self):
380 try:
381 data = os.read(self._fileno, self.max_size)
382 except (BlockingIOError, InterruptedError):
383 pass
384 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100385 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 else:
387 if data:
388 self._protocol.data_received(data)
389 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200390 if self._loop.get_debug():
391 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400393 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._loop.call_soon(self._protocol.eof_received)
395 self._loop.call_soon(self._call_connection_lost, None)
396
Guido van Rossum57497ad2013-10-18 07:58:20 -0700397 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400398 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Guido van Rossum57497ad2013-10-18 07:58:20 -0700400 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400401 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400403 def set_protocol(self, protocol):
404 self._protocol = protocol
405
406 def get_protocol(self):
407 return self._protocol
408
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500409 def is_closing(self):
410 return self._closing
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 def close(self):
413 if not self._closing:
414 self._close(None)
415
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900416 def __del__(self):
417 if self._pipe is not None:
418 warnings.warn("unclosed transport %r" % self, ResourceWarning,
419 source=self)
420 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100421
Victor Stinner0ee29c22014-02-19 01:40:41 +0100422 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200424 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
425 if self._loop.get_debug():
426 logger.debug("%r: %s", self, message, exc_info=True)
427 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500428 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100429 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500430 'exception': exc,
431 'transport': self,
432 'protocol': self._protocol,
433 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 self._close(exc)
435
436 def _close(self, exc):
437 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400438 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 self._loop.call_soon(self._call_connection_lost, exc)
440
441 def _call_connection_lost(self, exc):
442 try:
443 self._protocol.connection_lost(exc)
444 finally:
445 self._pipe.close()
446 self._pipe = None
447 self._protocol = None
448 self._loop = None
449
450
Yury Selivanov3cb99142014-02-18 18:41:13 -0500451class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800452 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453
454 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100455 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 self._pipe = pipe
458 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400460 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 self._conn_lost = 0
462 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700463
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700464 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700465 is_char = stat.S_ISCHR(mode)
466 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700467 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700468 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700469 self._pipe = None
470 self._fileno = None
471 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100472 raise ValueError("Pipe transport is only for "
473 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700474
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100477
478 # On AIX, the reader trick (to be notified when the read end of the
479 # socket is closed) only works for sockets. On other platforms it
480 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700481 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100482 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400483 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100484 self._fileno, self._read_ready)
485
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100487 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500488 self._loop.call_soon(futures._set_result_unless_cancelled,
489 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490
Victor Stinnere912e652014-07-12 03:11:53 +0200491 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100492 info = [self.__class__.__name__]
493 if self._pipe is None:
494 info.append('closed')
495 elif self._closing:
496 info.append('closing')
497 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400498 selector = getattr(self._loop, '_selector', None)
499 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200500 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400501 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200502 self._fileno, selectors.EVENT_WRITE)
503 if polling:
504 info.append('polling')
505 else:
506 info.append('idle')
507
508 bufsize = self.get_write_buffer_size()
509 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400510 elif self._pipe is not None:
511 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200512 else:
513 info.append('closed')
514 return '<%s>' % ' '.join(info)
515
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800516 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400517 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800518
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700520 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200521 if self._loop.get_debug():
522 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100523 if self._buffer:
524 self._close(BrokenPipeError())
525 else:
526 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527
528 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800529 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
530 if isinstance(data, bytearray):
531 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 if not data:
533 return
534
535 if self._conn_lost or self._closing:
536 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700537 logger.warning('pipe closed by peer or '
538 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 self._conn_lost += 1
540 return
541
542 if not self._buffer:
543 # Attempt to send it right away first.
544 try:
545 n = os.write(self._fileno, data)
546 except (BlockingIOError, InterruptedError):
547 n = 0
548 except Exception as exc:
549 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100550 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551 return
552 if n == len(data):
553 return
554 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400555 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400556 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400558 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800559 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
561 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400565 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400567 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400569 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 self._conn_lost += 1
571 # Remove writer here, _fatal_error() doesn't it
572 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400573 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100574 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400576 if n == len(self._buffer):
577 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400578 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800579 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400580 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400581 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 self._call_connection_lost(None)
583 return
584 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400585 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586
587 def can_write_eof(self):
588 return True
589
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 def write_eof(self):
591 if self._closing:
592 return
593 assert self._pipe
594 self._closing = True
595 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400596 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 self._loop.call_soon(self._call_connection_lost, None)
598
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400599 def set_protocol(self, protocol):
600 self._protocol = protocol
601
602 def get_protocol(self):
603 return self._protocol
604
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500605 def is_closing(self):
606 return self._closing
607
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100609 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 # write_eof is all what we needed to close the write pipe
611 self.write_eof()
612
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900613 def __del__(self):
614 if self._pipe is not None:
615 warnings.warn("unclosed transport %r" % self, ResourceWarning,
616 source=self)
617 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100618
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 def abort(self):
620 self._close(None)
621
Victor Stinner0ee29c22014-02-19 01:40:41 +0100622 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200624 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200625 if self._loop.get_debug():
626 logger.debug("%r: %s", self, message, exc_info=True)
627 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500628 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100629 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500630 'exception': exc,
631 'transport': self,
632 'protocol': self._protocol,
633 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700634 self._close(exc)
635
636 def _close(self, exc=None):
637 self._closing = True
638 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400639 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700640 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400641 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 self._loop.call_soon(self._call_connection_lost, exc)
643
644 def _call_connection_lost(self, exc):
645 try:
646 self._protocol.connection_lost(exc)
647 finally:
648 self._pipe.close()
649 self._pipe = None
650 self._protocol = None
651 self._loop = None
652
653
Victor Stinner1e40f102014-12-11 23:30:17 +0100654if hasattr(os, 'set_inheritable'):
655 # Python 3.4 and newer
656 _set_inheritable = os.set_inheritable
657else:
658 import fcntl
659
660 def _set_inheritable(fd, inheritable):
661 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
662
663 old = fcntl.fcntl(fd, fcntl.F_GETFD)
664 if not inheritable:
665 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
666 else:
667 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
668
669
Guido van Rossum59691282013-10-30 14:52:03 -0700670class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700671
Guido van Rossum59691282013-10-30 14:52:03 -0700672 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700673 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700675 # Use a socket pair for stdin, since not all platforms
676 # support selecting read events on the write end of a
677 # socket (which we use in order to detect closing of the
678 # other end). Notably this is needed on AIX, and works
679 # just fine on other platforms.
680 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100681
682 # Mark the write end of the stdin pipe as non-inheritable,
683 # needed by close_fds=False on Python 3.3 and older
684 # (Python 3.4 implements the PEP 446, socketpair returns
685 # non-inheritable sockets)
686 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 self._proc = subprocess.Popen(
688 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
689 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700690 if stdin_w is not None:
691 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200692 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800693
694
695class AbstractChildWatcher:
696 """Abstract base class for monitoring child processes.
697
698 Objects derived from this class monitor a collection of subprocesses and
699 report their termination or interruption by a signal.
700
701 New callbacks are registered with .add_child_handler(). Starting a new
702 process must be done within a 'with' block to allow the watcher to suspend
703 its activity until the new process if fully registered (this is needed to
704 prevent a race condition in some implementations).
705
706 Example:
707 with watcher:
708 proc = subprocess.Popen("sleep 1")
709 watcher.add_child_handler(proc.pid, callback)
710
711 Notes:
712 Implementations of this class must be thread-safe.
713
714 Since child watcher objects may catch the SIGCHLD signal and call
715 waitpid(-1), there should be only one active object per process.
716 """
717
718 def add_child_handler(self, pid, callback, *args):
719 """Register a new child handler.
720
721 Arrange for callback(pid, returncode, *args) to be called when
722 process 'pid' terminates. Specifying another callback for the same
723 process replaces the previous handler.
724
Victor Stinneracdb7822014-07-14 18:33:40 +0200725 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726 """
727 raise NotImplementedError()
728
729 def remove_child_handler(self, pid):
730 """Removes the handler for process 'pid'.
731
732 The function returns True if the handler was successfully removed,
733 False if there was nothing to remove."""
734
735 raise NotImplementedError()
736
Guido van Rossum2bcae702013-11-13 15:50:08 -0800737 def attach_loop(self, loop):
738 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800739
Guido van Rossum2bcae702013-11-13 15:50:08 -0800740 If the watcher was previously attached to an event loop, then it is
741 first detached before attaching to the new loop.
742
743 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800744 """
745 raise NotImplementedError()
746
747 def close(self):
748 """Close the watcher.
749
750 This must be called to make sure that any underlying resource is freed.
751 """
752 raise NotImplementedError()
753
754 def __enter__(self):
755 """Enter the watcher's context and allow starting new processes
756
757 This function must return self"""
758 raise NotImplementedError()
759
760 def __exit__(self, a, b, c):
761 """Exit the watcher's context"""
762 raise NotImplementedError()
763
764
765class BaseChildWatcher(AbstractChildWatcher):
766
Guido van Rossum2bcae702013-11-13 15:50:08 -0800767 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800768 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400769 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800770
771 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800772 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773
774 def _do_waitpid(self, expected_pid):
775 raise NotImplementedError()
776
777 def _do_waitpid_all(self):
778 raise NotImplementedError()
779
Guido van Rossum2bcae702013-11-13 15:50:08 -0800780 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800781 assert loop is None or isinstance(loop, events.AbstractEventLoop)
782
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400783 if self._loop is not None and loop is None and self._callbacks:
784 warnings.warn(
785 'A loop is being detached '
786 'from a child watcher with pending handlers',
787 RuntimeWarning)
788
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789 if self._loop is not None:
790 self._loop.remove_signal_handler(signal.SIGCHLD)
791
792 self._loop = loop
793 if loop is not None:
794 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
795
796 # Prevent a race condition in case a child terminated
797 # during the switch.
798 self._do_waitpid_all()
799
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 def _sig_chld(self):
801 try:
802 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500803 except Exception as exc:
804 # self._loop should always be available here
805 # as '_sig_chld' is added as a signal handler
806 # in 'attach_loop'
807 self._loop.call_exception_handler({
808 'message': 'Unknown exception in SIGCHLD handler',
809 'exception': exc,
810 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800811
812 def _compute_returncode(self, status):
813 if os.WIFSIGNALED(status):
814 # The child process died because of a signal.
815 return -os.WTERMSIG(status)
816 elif os.WIFEXITED(status):
817 # The child process exited (e.g sys.exit()).
818 return os.WEXITSTATUS(status)
819 else:
820 # The child exited, but we don't understand its status.
821 # This shouldn't happen, but if it does, let's just
822 # return that status; perhaps that helps debug it.
823 return status
824
825
826class SafeChildWatcher(BaseChildWatcher):
827 """'Safe' child watcher implementation.
828
829 This implementation avoids disrupting other code spawning processes by
830 polling explicitly each process in the SIGCHLD handler instead of calling
831 os.waitpid(-1).
832
833 This is a safe solution but it has a significant overhead when handling a
834 big number of children (O(n) each time SIGCHLD is raised)
835 """
836
Guido van Rossum2bcae702013-11-13 15:50:08 -0800837 def close(self):
838 self._callbacks.clear()
839 super().close()
840
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800841 def __enter__(self):
842 return self
843
844 def __exit__(self, a, b, c):
845 pass
846
847 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400848 if self._loop is None:
849 raise RuntimeError(
850 "Cannot add child handler, "
851 "the child watcher does not have a loop attached")
852
Victor Stinner47cd10d2015-01-30 00:05:19 +0100853 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854
855 # Prevent a race condition in case the child is already terminated.
856 self._do_waitpid(pid)
857
Guido van Rossum2bcae702013-11-13 15:50:08 -0800858 def remove_child_handler(self, pid):
859 try:
860 del self._callbacks[pid]
861 return True
862 except KeyError:
863 return False
864
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 def _do_waitpid_all(self):
866
867 for pid in list(self._callbacks):
868 self._do_waitpid(pid)
869
870 def _do_waitpid(self, expected_pid):
871 assert expected_pid > 0
872
873 try:
874 pid, status = os.waitpid(expected_pid, os.WNOHANG)
875 except ChildProcessError:
876 # The child process is already reaped
877 # (may happen if waitpid() is called elsewhere).
878 pid = expected_pid
879 returncode = 255
880 logger.warning(
881 "Unknown child process pid %d, will report returncode 255",
882 pid)
883 else:
884 if pid == 0:
885 # The child process is still alive.
886 return
887
888 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200889 if self._loop.get_debug():
890 logger.debug('process %s exited with returncode %s',
891 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892
893 try:
894 callback, args = self._callbacks.pop(pid)
895 except KeyError: # pragma: no cover
896 # May happen if .remove_child_handler() is called
897 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200898 if self._loop.get_debug():
899 logger.warning("Child watcher got an unexpected pid: %r",
900 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800901 else:
902 callback(pid, returncode, *args)
903
904
905class FastChildWatcher(BaseChildWatcher):
906 """'Fast' child watcher implementation.
907
908 This implementation reaps every terminated processes by calling
909 os.waitpid(-1) directly, possibly breaking other code spawning processes
910 and waiting for their termination.
911
912 There is no noticeable overhead when handling a big number of children
913 (O(1) each time a child terminates).
914 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800915 def __init__(self):
916 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800917 self._lock = threading.Lock()
918 self._zombies = {}
919 self._forks = 0
920
921 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800922 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800923 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800924 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925
926 def __enter__(self):
927 with self._lock:
928 self._forks += 1
929
930 return self
931
932 def __exit__(self, a, b, c):
933 with self._lock:
934 self._forks -= 1
935
936 if self._forks or not self._zombies:
937 return
938
939 collateral_victims = str(self._zombies)
940 self._zombies.clear()
941
942 logger.warning(
943 "Caught subprocesses termination from unknown pids: %s",
944 collateral_victims)
945
946 def add_child_handler(self, pid, callback, *args):
947 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400948
949 if self._loop is None:
950 raise RuntimeError(
951 "Cannot add child handler, "
952 "the child watcher does not have a loop attached")
953
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800954 with self._lock:
955 try:
956 returncode = self._zombies.pop(pid)
957 except KeyError:
958 # The child is running.
959 self._callbacks[pid] = callback, args
960 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800961
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800962 # The child is dead already. We can fire the callback.
963 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800964
Guido van Rossum2bcae702013-11-13 15:50:08 -0800965 def remove_child_handler(self, pid):
966 try:
967 del self._callbacks[pid]
968 return True
969 except KeyError:
970 return False
971
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 def _do_waitpid_all(self):
973 # Because of signal coalescing, we must keep calling waitpid() as
974 # long as we're able to reap a child.
975 while True:
976 try:
977 pid, status = os.waitpid(-1, os.WNOHANG)
978 except ChildProcessError:
979 # No more child processes exist.
980 return
981 else:
982 if pid == 0:
983 # A child process is still alive.
984 return
985
986 returncode = self._compute_returncode(status)
987
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800988 with self._lock:
989 try:
990 callback, args = self._callbacks.pop(pid)
991 except KeyError:
992 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800993 if self._forks:
994 # It may not be registered yet.
995 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200996 if self._loop.get_debug():
997 logger.debug('unknown process %s exited '
998 'with returncode %s',
999 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001000 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001001 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001002 else:
1003 if self._loop.get_debug():
1004 logger.debug('process %s exited with returncode %s',
1005 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001006
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001007 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001008 logger.warning(
1009 "Caught subprocess termination from unknown pid: "
1010 "%d -> %d", pid, returncode)
1011 else:
1012 callback(pid, returncode, *args)
1013
1014
1015class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001016 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001017 _loop_factory = _UnixSelectorEventLoop
1018
1019 def __init__(self):
1020 super().__init__()
1021 self._watcher = None
1022
1023 def _init_watcher(self):
1024 with events._lock:
1025 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001026 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001027 if isinstance(threading.current_thread(),
1028 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001029 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001030
1031 def set_event_loop(self, loop):
1032 """Set the event loop.
1033
1034 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001035 .set_event_loop() from the main thread will call .attach_loop(loop) on
1036 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001037 """
1038
1039 super().set_event_loop(loop)
1040
1041 if self._watcher is not None and \
1042 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001043 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001044
1045 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001046 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001047
1048 If not yet set, a SafeChildWatcher object is automatically created.
1049 """
1050 if self._watcher is None:
1051 self._init_watcher()
1052
1053 return self._watcher
1054
1055 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001056 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001057
1058 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1059
1060 if self._watcher is not None:
1061 self._watcher.close()
1062
1063 self._watcher = watcher
1064
1065SelectorEventLoop = _UnixSelectorEventLoop
1066DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy