blob: 65b61db66ac74e53619bbf8b45890ce09017cad8 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400180 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400237 if (sock.family != socket.AF_UNIX or
238 sock.type != socket.SOCK_STREAM):
239 raise ValueError(
240 'A UNIX Domain Stream Socket was expected, got {!r}'
241 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500242 sock.setblocking(False)
243
244 transport, protocol = yield from self._create_connection_transport(
245 sock, protocol_factory, ssl, server_hostname)
246 return transport, protocol
247
Victor Stinnerf951d282014-06-29 00:46:45 +0200248 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 def create_unix_server(self, protocol_factory, path=None, *,
250 sock=None, backlog=100, ssl=None):
251 if isinstance(ssl, bool):
252 raise TypeError('ssl argument must be an SSLContext or None')
253
254 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200255 if sock is not None:
256 raise ValueError(
257 'path and sock can not be specified at the same time')
258
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
260
Yury Selivanov908d55d2016-10-09 12:15:08 -0400261 # Check for abstract socket. `str` and `bytes` paths are supported.
262 if path[0] not in (0, '\x00'):
263 try:
264 if stat.S_ISSOCK(os.stat(path).st_mode):
265 os.remove(path)
266 except FileNotFoundError:
267 pass
268 except OSError as err:
269 # Directory may have permissions only to create socket.
270 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
271
Yury Selivanovb057c522014-02-18 12:15:06 -0500272 try:
273 sock.bind(path)
274 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100275 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500276 if exc.errno == errno.EADDRINUSE:
277 # Let's improve the error message by adding
278 # with what exact address it occurs.
279 msg = 'Address {!r} is already in use'.format(path)
280 raise OSError(errno.EADDRINUSE, msg) from None
281 else:
282 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200283 except:
284 sock.close()
285 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 else:
287 if sock is None:
288 raise ValueError(
289 'path was not specified, and no sock specified')
290
Yury Selivanov36e7e972016-10-07 12:39:57 -0400291 if (sock.family != socket.AF_UNIX or
292 sock.type != socket.SOCK_STREAM):
Yury Selivanovb057c522014-02-18 12:15:06 -0500293 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400294 'A UNIX Domain Stream Socket was expected, got {!r}'
295 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500296
297 server = base_events.Server(self, [sock])
298 sock.listen(backlog)
299 sock.setblocking(False)
300 self._start_serving(protocol_factory, sock, ssl, server)
301 return server
302
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200304if hasattr(os, 'set_blocking'):
305 def _set_nonblocking(fd):
306 os.set_blocking(fd, False)
307else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400308 import fcntl
309
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200310 def _set_nonblocking(fd):
311 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
312 flags = flags | os.O_NONBLOCK
313 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314
315
316class _UnixReadPipeTransport(transports.ReadTransport):
317
Yury Selivanovdec1a452014-02-18 22:27:48 -0500318 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319
320 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
321 super().__init__(extra)
322 self._extra['pipe'] = pipe
323 self._loop = loop
324 self._pipe = pipe
325 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700326 self._protocol = protocol
327 self._closing = False
328
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700329 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800330 if not (stat.S_ISFIFO(mode) or
331 stat.S_ISSOCK(mode) or
332 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700333 self._pipe = None
334 self._fileno = None
335 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700336 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 _set_nonblocking(self._fileno)
Guido van Rossum47867872016-08-31 09:42:38 -0700339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100341 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400342 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100343 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100345 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500346 self._loop.call_soon(futures._set_result_unless_cancelled,
347 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348
Victor Stinnere912e652014-07-12 03:11:53 +0200349 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100350 info = [self.__class__.__name__]
351 if self._pipe is None:
352 info.append('closed')
353 elif self._closing:
354 info.append('closing')
355 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400356 selector = getattr(self._loop, '_selector', None)
357 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200358 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400359 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200360 self._fileno, selectors.EVENT_READ)
361 if polling:
362 info.append('polling')
363 else:
364 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400365 elif self._pipe is not None:
366 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200367 else:
368 info.append('closed')
369 return '<%s>' % ' '.join(info)
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 def _read_ready(self):
372 try:
373 data = os.read(self._fileno, self.max_size)
374 except (BlockingIOError, InterruptedError):
375 pass
376 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100377 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 else:
379 if data:
380 self._protocol.data_received(data)
381 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200382 if self._loop.get_debug():
383 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400385 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 self._loop.call_soon(self._protocol.eof_received)
387 self._loop.call_soon(self._call_connection_lost, None)
388
Guido van Rossum57497ad2013-10-18 07:58:20 -0700389 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400390 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
Guido van Rossum57497ad2013-10-18 07:58:20 -0700392 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400393 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400395 def set_protocol(self, protocol):
396 self._protocol = protocol
397
398 def get_protocol(self):
399 return self._protocol
400
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500401 def is_closing(self):
402 return self._closing
403
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 def close(self):
405 if not self._closing:
406 self._close(None)
407
Victor Stinner978a9af2015-01-29 17:50:58 +0100408 # On Python 3.3 and older, objects with a destructor part of a reference
409 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
410 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400411 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100412 def __del__(self):
413 if self._pipe is not None:
414 warnings.warn("unclosed transport %r" % self, ResourceWarning)
415 self._pipe.close()
416
Victor Stinner0ee29c22014-02-19 01:40:41 +0100417 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200419 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
420 if self._loop.get_debug():
421 logger.debug("%r: %s", self, message, exc_info=True)
422 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500423 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100424 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500425 'exception': exc,
426 'transport': self,
427 'protocol': self._protocol,
428 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 self._close(exc)
430
431 def _close(self, exc):
432 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400433 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 self._loop.call_soon(self._call_connection_lost, exc)
435
436 def _call_connection_lost(self, exc):
437 try:
438 self._protocol.connection_lost(exc)
439 finally:
440 self._pipe.close()
441 self._pipe = None
442 self._protocol = None
443 self._loop = None
444
445
Yury Selivanov3cb99142014-02-18 18:41:13 -0500446class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800447 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100450 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 self._pipe = pipe
453 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700454 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400455 self._buffer = bytearray()
Guido van Rossum47867872016-08-31 09:42:38 -0700456 self._conn_lost = 0
457 self._closing = False # Set when close() or write_eof() called.
458
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700459 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700460 is_char = stat.S_ISCHR(mode)
461 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700462 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700463 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700464 self._pipe = None
465 self._fileno = None
466 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100467 raise ValueError("Pipe transport is only for "
468 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700469
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 _set_nonblocking(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100472
473 # On AIX, the reader trick (to be notified when the read end of the
474 # socket is closed) only works for sockets. On other platforms it
475 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700476 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100477 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400478 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100479 self._fileno, self._read_ready)
480
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100482 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500483 self._loop.call_soon(futures._set_result_unless_cancelled,
484 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485
Victor Stinnere912e652014-07-12 03:11:53 +0200486 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100487 info = [self.__class__.__name__]
488 if self._pipe is None:
489 info.append('closed')
490 elif self._closing:
491 info.append('closing')
492 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400493 selector = getattr(self._loop, '_selector', None)
494 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200495 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400496 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200497 self._fileno, selectors.EVENT_WRITE)
498 if polling:
499 info.append('polling')
500 else:
501 info.append('idle')
502
503 bufsize = self.get_write_buffer_size()
504 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400505 elif self._pipe is not None:
506 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200507 else:
508 info.append('closed')
509 return '<%s>' % ' '.join(info)
510
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800511 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400512 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800513
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700515 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200516 if self._loop.get_debug():
517 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100518 if self._buffer:
519 self._close(BrokenPipeError())
520 else:
521 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522
523 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800524 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
525 if isinstance(data, bytearray):
526 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 if not data:
528 return
529
530 if self._conn_lost or self._closing:
531 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700532 logger.warning('pipe closed by peer or '
533 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 self._conn_lost += 1
535 return
536
537 if not self._buffer:
538 # Attempt to send it right away first.
539 try:
540 n = os.write(self._fileno, data)
541 except (BlockingIOError, InterruptedError):
542 n = 0
543 except Exception as exc:
544 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100545 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 return
547 if n == len(data):
548 return
549 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400550 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400551 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400553 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800554 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555
556 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400557 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400560 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400562 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400564 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 self._conn_lost += 1
566 # Remove writer here, _fatal_error() doesn't it
567 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400568 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100569 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400571 if n == len(self._buffer):
572 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400573 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800574 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400575 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400576 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 self._call_connection_lost(None)
578 return
579 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400580 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581
582 def can_write_eof(self):
583 return True
584
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 def write_eof(self):
586 if self._closing:
587 return
588 assert self._pipe
589 self._closing = True
590 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400591 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 self._loop.call_soon(self._call_connection_lost, None)
593
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400594 def set_protocol(self, protocol):
595 self._protocol = protocol
596
597 def get_protocol(self):
598 return self._protocol
599
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500600 def is_closing(self):
601 return self._closing
602
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100604 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 # write_eof is all what we needed to close the write pipe
606 self.write_eof()
607
Victor Stinner978a9af2015-01-29 17:50:58 +0100608 # On Python 3.3 and older, objects with a destructor part of a reference
609 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
610 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400611 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100612 def __del__(self):
613 if self._pipe is not None:
614 warnings.warn("unclosed transport %r" % self, ResourceWarning)
615 self._pipe.close()
616
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 def abort(self):
618 self._close(None)
619
Victor Stinner0ee29c22014-02-19 01:40:41 +0100620 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200622 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200623 if self._loop.get_debug():
624 logger.debug("%r: %s", self, message, exc_info=True)
625 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500626 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100627 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500628 'exception': exc,
629 'transport': self,
630 'protocol': self._protocol,
631 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 self._close(exc)
633
634 def _close(self, exc=None):
635 self._closing = True
636 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400637 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400639 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700640 self._loop.call_soon(self._call_connection_lost, exc)
641
642 def _call_connection_lost(self, exc):
643 try:
644 self._protocol.connection_lost(exc)
645 finally:
646 self._pipe.close()
647 self._pipe = None
648 self._protocol = None
649 self._loop = None
650
651
Victor Stinner1e40f102014-12-11 23:30:17 +0100652if hasattr(os, 'set_inheritable'):
653 # Python 3.4 and newer
654 _set_inheritable = os.set_inheritable
655else:
656 import fcntl
657
658 def _set_inheritable(fd, inheritable):
659 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
660
661 old = fcntl.fcntl(fd, fcntl.F_GETFD)
662 if not inheritable:
663 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
664 else:
665 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
666
667
Guido van Rossum59691282013-10-30 14:52:03 -0700668class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669
Guido van Rossum59691282013-10-30 14:52:03 -0700670 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700671 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700673 # Use a socket pair for stdin, since not all platforms
674 # support selecting read events on the write end of a
675 # socket (which we use in order to detect closing of the
676 # other end). Notably this is needed on AIX, and works
677 # just fine on other platforms.
678 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100679
680 # Mark the write end of the stdin pipe as non-inheritable,
681 # needed by close_fds=False on Python 3.3 and older
682 # (Python 3.4 implements the PEP 446, socketpair returns
683 # non-inheritable sockets)
684 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 self._proc = subprocess.Popen(
686 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
687 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700688 if stdin_w is not None:
689 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200690 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800691
692
693class AbstractChildWatcher:
694 """Abstract base class for monitoring child processes.
695
696 Objects derived from this class monitor a collection of subprocesses and
697 report their termination or interruption by a signal.
698
699 New callbacks are registered with .add_child_handler(). Starting a new
700 process must be done within a 'with' block to allow the watcher to suspend
701 its activity until the new process if fully registered (this is needed to
702 prevent a race condition in some implementations).
703
704 Example:
705 with watcher:
706 proc = subprocess.Popen("sleep 1")
707 watcher.add_child_handler(proc.pid, callback)
708
709 Notes:
710 Implementations of this class must be thread-safe.
711
712 Since child watcher objects may catch the SIGCHLD signal and call
713 waitpid(-1), there should be only one active object per process.
714 """
715
716 def add_child_handler(self, pid, callback, *args):
717 """Register a new child handler.
718
719 Arrange for callback(pid, returncode, *args) to be called when
720 process 'pid' terminates. Specifying another callback for the same
721 process replaces the previous handler.
722
Victor Stinneracdb7822014-07-14 18:33:40 +0200723 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724 """
725 raise NotImplementedError()
726
727 def remove_child_handler(self, pid):
728 """Removes the handler for process 'pid'.
729
730 The function returns True if the handler was successfully removed,
731 False if there was nothing to remove."""
732
733 raise NotImplementedError()
734
Guido van Rossum2bcae702013-11-13 15:50:08 -0800735 def attach_loop(self, loop):
736 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800737
Guido van Rossum2bcae702013-11-13 15:50:08 -0800738 If the watcher was previously attached to an event loop, then it is
739 first detached before attaching to the new loop.
740
741 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800742 """
743 raise NotImplementedError()
744
745 def close(self):
746 """Close the watcher.
747
748 This must be called to make sure that any underlying resource is freed.
749 """
750 raise NotImplementedError()
751
752 def __enter__(self):
753 """Enter the watcher's context and allow starting new processes
754
755 This function must return self"""
756 raise NotImplementedError()
757
758 def __exit__(self, a, b, c):
759 """Exit the watcher's context"""
760 raise NotImplementedError()
761
762
763class BaseChildWatcher(AbstractChildWatcher):
764
Guido van Rossum2bcae702013-11-13 15:50:08 -0800765 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800766 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400767 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800768
769 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800770 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800771
772 def _do_waitpid(self, expected_pid):
773 raise NotImplementedError()
774
775 def _do_waitpid_all(self):
776 raise NotImplementedError()
777
Guido van Rossum2bcae702013-11-13 15:50:08 -0800778 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800779 assert loop is None or isinstance(loop, events.AbstractEventLoop)
780
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400781 if self._loop is not None and loop is None and self._callbacks:
782 warnings.warn(
783 'A loop is being detached '
784 'from a child watcher with pending handlers',
785 RuntimeWarning)
786
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800787 if self._loop is not None:
788 self._loop.remove_signal_handler(signal.SIGCHLD)
789
790 self._loop = loop
791 if loop is not None:
792 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
793
794 # Prevent a race condition in case a child terminated
795 # during the switch.
796 self._do_waitpid_all()
797
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800798 def _sig_chld(self):
799 try:
800 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500801 except Exception as exc:
802 # self._loop should always be available here
803 # as '_sig_chld' is added as a signal handler
804 # in 'attach_loop'
805 self._loop.call_exception_handler({
806 'message': 'Unknown exception in SIGCHLD handler',
807 'exception': exc,
808 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809
810 def _compute_returncode(self, status):
811 if os.WIFSIGNALED(status):
812 # The child process died because of a signal.
813 return -os.WTERMSIG(status)
814 elif os.WIFEXITED(status):
815 # The child process exited (e.g sys.exit()).
816 return os.WEXITSTATUS(status)
817 else:
818 # The child exited, but we don't understand its status.
819 # This shouldn't happen, but if it does, let's just
820 # return that status; perhaps that helps debug it.
821 return status
822
823
824class SafeChildWatcher(BaseChildWatcher):
825 """'Safe' child watcher implementation.
826
827 This implementation avoids disrupting other code spawning processes by
828 polling explicitly each process in the SIGCHLD handler instead of calling
829 os.waitpid(-1).
830
831 This is a safe solution but it has a significant overhead when handling a
832 big number of children (O(n) each time SIGCHLD is raised)
833 """
834
Guido van Rossum2bcae702013-11-13 15:50:08 -0800835 def close(self):
836 self._callbacks.clear()
837 super().close()
838
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800839 def __enter__(self):
840 return self
841
842 def __exit__(self, a, b, c):
843 pass
844
845 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400846 if self._loop is None:
847 raise RuntimeError(
848 "Cannot add child handler, "
849 "the child watcher does not have a loop attached")
850
Victor Stinner47cd10d2015-01-30 00:05:19 +0100851 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800852
853 # Prevent a race condition in case the child is already terminated.
854 self._do_waitpid(pid)
855
Guido van Rossum2bcae702013-11-13 15:50:08 -0800856 def remove_child_handler(self, pid):
857 try:
858 del self._callbacks[pid]
859 return True
860 except KeyError:
861 return False
862
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863 def _do_waitpid_all(self):
864
865 for pid in list(self._callbacks):
866 self._do_waitpid(pid)
867
868 def _do_waitpid(self, expected_pid):
869 assert expected_pid > 0
870
871 try:
872 pid, status = os.waitpid(expected_pid, os.WNOHANG)
873 except ChildProcessError:
874 # The child process is already reaped
875 # (may happen if waitpid() is called elsewhere).
876 pid = expected_pid
877 returncode = 255
878 logger.warning(
879 "Unknown child process pid %d, will report returncode 255",
880 pid)
881 else:
882 if pid == 0:
883 # The child process is still alive.
884 return
885
886 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200887 if self._loop.get_debug():
888 logger.debug('process %s exited with returncode %s',
889 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800890
891 try:
892 callback, args = self._callbacks.pop(pid)
893 except KeyError: # pragma: no cover
894 # May happen if .remove_child_handler() is called
895 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200896 if self._loop.get_debug():
897 logger.warning("Child watcher got an unexpected pid: %r",
898 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800899 else:
900 callback(pid, returncode, *args)
901
902
903class FastChildWatcher(BaseChildWatcher):
904 """'Fast' child watcher implementation.
905
906 This implementation reaps every terminated processes by calling
907 os.waitpid(-1) directly, possibly breaking other code spawning processes
908 and waiting for their termination.
909
910 There is no noticeable overhead when handling a big number of children
911 (O(1) each time a child terminates).
912 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800913 def __init__(self):
914 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800915 self._lock = threading.Lock()
916 self._zombies = {}
917 self._forks = 0
918
919 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800920 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800922 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800923
924 def __enter__(self):
925 with self._lock:
926 self._forks += 1
927
928 return self
929
930 def __exit__(self, a, b, c):
931 with self._lock:
932 self._forks -= 1
933
934 if self._forks or not self._zombies:
935 return
936
937 collateral_victims = str(self._zombies)
938 self._zombies.clear()
939
940 logger.warning(
941 "Caught subprocesses termination from unknown pids: %s",
942 collateral_victims)
943
944 def add_child_handler(self, pid, callback, *args):
945 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400946
947 if self._loop is None:
948 raise RuntimeError(
949 "Cannot add child handler, "
950 "the child watcher does not have a loop attached")
951
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800952 with self._lock:
953 try:
954 returncode = self._zombies.pop(pid)
955 except KeyError:
956 # The child is running.
957 self._callbacks[pid] = callback, args
958 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800959
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800960 # The child is dead already. We can fire the callback.
961 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800962
Guido van Rossum2bcae702013-11-13 15:50:08 -0800963 def remove_child_handler(self, pid):
964 try:
965 del self._callbacks[pid]
966 return True
967 except KeyError:
968 return False
969
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800970 def _do_waitpid_all(self):
971 # Because of signal coalescing, we must keep calling waitpid() as
972 # long as we're able to reap a child.
973 while True:
974 try:
975 pid, status = os.waitpid(-1, os.WNOHANG)
976 except ChildProcessError:
977 # No more child processes exist.
978 return
979 else:
980 if pid == 0:
981 # A child process is still alive.
982 return
983
984 returncode = self._compute_returncode(status)
985
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800986 with self._lock:
987 try:
988 callback, args = self._callbacks.pop(pid)
989 except KeyError:
990 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800991 if self._forks:
992 # It may not be registered yet.
993 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200994 if self._loop.get_debug():
995 logger.debug('unknown process %s exited '
996 'with returncode %s',
997 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800998 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800999 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +02001000 else:
1001 if self._loop.get_debug():
1002 logger.debug('process %s exited with returncode %s',
1003 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001004
Guido van Rossumab27a9f2014-01-25 16:32:17 -08001005 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001006 logger.warning(
1007 "Caught subprocess termination from unknown pid: "
1008 "%d -> %d", pid, returncode)
1009 else:
1010 callback(pid, returncode, *args)
1011
1012
1013class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +01001014 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001015 _loop_factory = _UnixSelectorEventLoop
1016
1017 def __init__(self):
1018 super().__init__()
1019 self._watcher = None
1020
1021 def _init_watcher(self):
1022 with events._lock:
1023 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -08001024 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001025 if isinstance(threading.current_thread(),
1026 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001027 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001028
1029 def set_event_loop(self, loop):
1030 """Set the event loop.
1031
1032 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -08001033 .set_event_loop() from the main thread will call .attach_loop(loop) on
1034 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001035 """
1036
1037 super().set_event_loop(loop)
1038
1039 if self._watcher is not None and \
1040 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -08001041 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001042
1043 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001044 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001045
1046 If not yet set, a SafeChildWatcher object is automatically created.
1047 """
1048 if self._watcher is None:
1049 self._init_watcher()
1050
1051 return self._watcher
1052
1053 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001054 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001055
1056 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1057
1058 if self._watcher is not None:
1059 self._watcher.close()
1060
1061 self._watcher = watcher
1062
1063SelectorEventLoop = _UnixSelectorEventLoop
1064DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy