blob: 0308b02a52d6e7e28491e98ad697bd4f9799097e [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
Victor Stinner4271dfd2017-11-28 15:19:56 +01005import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010012import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Yury Selivanovb057c522014-02-18 12:15:06 -050015from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070016from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070023from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
Victor Stinner915bcb02014-02-01 22:49:59 +010026__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080027 'AbstractChildWatcher', 'SafeChildWatcher',
28 'FastChildWatcher', 'DefaultEventLoopPolicy',
29 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031if sys.platform == 'win32': # pragma: no cover
32 raise ImportError('Signals are not really supported on Windows')
33
34
Victor Stinnerfe5649c2014-07-17 22:43:40 +020035def _sighandler_noop(signum, frame):
36 """Dummy signal handler."""
37 pass
38
39
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080040class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050041 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
Yury Selivanovb057c522014-02-18 12:15:06 -050043 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044 """
45
46 def __init__(self, selector=None):
47 super().__init__(selector)
48 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080050 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020051 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054
Victor Stinnerfe5649c2014-07-17 22:43:40 +020055 def _process_self_data(self, data):
56 for signum in data:
57 if not signum:
58 # ignore null bytes written by _write_to_self()
59 continue
60 self._handle_signal(signum)
61
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 def add_signal_handler(self, sig, callback, *args):
63 """Add a handler for a signal. UNIX only.
64
65 Raise ValueError if the signal number is invalid or uncatchable.
66 Raise RuntimeError if there is a problem setting up the handler.
67 """
Victor Stinner2d99d932014-11-20 15:03:52 +010068 if (coroutines.iscoroutine(callback)
Andrew Svetlovcc839202017-11-29 18:23:43 +020069 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010070 raise TypeError("coroutines cannot be used "
71 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010073 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 try:
75 # set_wakeup_fd() raises ValueError if this is not the
76 # main thread. By calling it early we ensure that an
77 # event loop running in another thread cannot add a signal
78 # handler.
79 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020080 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 raise RuntimeError(str(exc))
82
Yury Selivanov569efa22014-02-18 18:02:19 -050083 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._signal_handlers[sig] = handle
85
86 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020087 # Register a dummy signal handler to ask Python to write the signal
88 # number in the wakup file descriptor. _process_self_data() will
89 # read signal numbers from this file descriptor to handle signals.
90 signal.signal(sig, _sighandler_noop)
91
Charles-François Natali74e7cf32013-12-05 22:47:19 +010092 # Set SA_RESTART to limit EINTR occurrences.
93 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 except OSError as exc:
95 del self._signal_handlers[sig]
96 if not self._signal_handlers:
97 try:
98 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020099 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700100 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101
102 if exc.errno == errno.EINVAL:
103 raise RuntimeError('sig {} cannot be caught'.format(sig))
104 else:
105 raise
106
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200107 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 """Internal helper that is the actual signal handler."""
109 handle = self._signal_handlers.get(sig)
110 if handle is None:
111 return # Assume it's some race condition.
112 if handle._cancelled:
113 self.remove_signal_handler(sig) # Remove it properly.
114 else:
115 self._add_callback_signalsafe(handle)
116
117 def remove_signal_handler(self, sig):
118 """Remove a handler for a signal. UNIX only.
119
120 Return True if a signal handler was removed, False if not.
121 """
122 self._check_signal(sig)
123 try:
124 del self._signal_handlers[sig]
125 except KeyError:
126 return False
127
128 if sig == signal.SIGINT:
129 handler = signal.default_int_handler
130 else:
131 handler = signal.SIG_DFL
132
133 try:
134 signal.signal(sig, handler)
135 except OSError as exc:
136 if exc.errno == errno.EINVAL:
137 raise RuntimeError('sig {} cannot be caught'.format(sig))
138 else:
139 raise
140
141 if not self._signal_handlers:
142 try:
143 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200144 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700145 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146
147 return True
148
149 def _check_signal(self, sig):
150 """Internal helper to validate a signal.
151
152 Raise ValueError if the signal number is invalid or uncatchable.
153 Raise RuntimeError if there is a problem setting up the handler.
154 """
155 if not isinstance(sig, int):
156 raise TypeError('sig must be an int, not {!r}'.format(sig))
157
158 if not (1 <= sig < signal.NSIG):
159 raise ValueError(
160 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
161
162 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
165
166 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
167 extra=None):
168 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
169
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200170 async def _make_subprocess_transport(self, protocol, args, shell,
171 stdin, stdout, stderr, bufsize,
172 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800173 with events.get_child_watcher() as watcher:
Yury Selivanov7661db62016-05-16 15:38:39 -0400174 waiter = self.create_future()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 transp = _UnixSubprocessTransport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100177 waiter=waiter, extra=extra,
178 **kwargs)
179
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 watcher.add_child_handler(transp.get_pid(),
181 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100182 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 await waiter
184 except Exception:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100185 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200186 await transp._wait()
187 raise
Guido van Rossum4835f172014-01-10 13:28:59 -0800188
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 return transp
190
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800191 def _child_watcher_callback(self, pid, returncode, transp):
192 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200194 async def create_unix_connection(self, protocol_factory, path=None, *,
195 ssl=None, sock=None,
196 server_hostname=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500197 assert server_hostname is None or isinstance(server_hostname, str)
198 if ssl:
199 if server_hostname is None:
200 raise ValueError(
201 'you have to pass server_hostname when using ssl')
202 else:
203 if server_hostname is not None:
204 raise ValueError('server_hostname is only meaningful with ssl')
205
206 if path is not None:
207 if sock is not None:
208 raise ValueError(
209 'path and sock can not be specified at the same time')
210
Andrew Svetlovcc839202017-11-29 18:23:43 +0200211 path = os.fspath(path)
Victor Stinner79a29522014-02-19 01:45:59 +0100212 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500214 sock.setblocking(False)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200215 await self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100216 except:
217 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500218 raise
219
220 else:
221 if sock is None:
222 raise ValueError('no path and sock were specified')
Yury Selivanov36e7e972016-10-07 12:39:57 -0400223 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500224 not base_events._is_stream_socket(sock)):
Yury Selivanov36e7e972016-10-07 12:39:57 -0400225 raise ValueError(
226 'A UNIX Domain Stream Socket was expected, got {!r}'
227 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200230 transport, protocol = await self._create_connection_transport(
Yury Selivanovb057c522014-02-18 12:15:06 -0500231 sock, protocol_factory, ssl, server_hostname)
232 return transport, protocol
233
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200234 async def create_unix_server(self, protocol_factory, path=None, *,
235 sock=None, backlog=100, ssl=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 if isinstance(ssl, bool):
237 raise TypeError('ssl argument must be an SSLContext or None')
238
239 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200240 if sock is not None:
241 raise ValueError(
242 'path and sock can not be specified at the same time')
243
Andrew Svetlovcc839202017-11-29 18:23:43 +0200244 path = os.fspath(path)
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
246
Yury Selivanov908d55d2016-10-09 12:15:08 -0400247 # Check for abstract socket. `str` and `bytes` paths are supported.
248 if path[0] not in (0, '\x00'):
249 try:
250 if stat.S_ISSOCK(os.stat(path).st_mode):
251 os.remove(path)
252 except FileNotFoundError:
253 pass
254 except OSError as err:
255 # Directory may have permissions only to create socket.
Andrew Svetlovcc839202017-11-29 18:23:43 +0200256 logger.error('Unable to check or remove stale UNIX socket '
257 '%r: %r', path, err)
Yury Selivanov908d55d2016-10-09 12:15:08 -0400258
Yury Selivanovb057c522014-02-18 12:15:06 -0500259 try:
260 sock.bind(path)
261 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100262 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500263 if exc.errno == errno.EADDRINUSE:
264 # Let's improve the error message by adding
265 # with what exact address it occurs.
266 msg = 'Address {!r} is already in use'.format(path)
267 raise OSError(errno.EADDRINUSE, msg) from None
268 else:
269 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200270 except:
271 sock.close()
272 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500273 else:
274 if sock is None:
275 raise ValueError(
276 'path was not specified, and no sock specified')
277
Yury Selivanov36e7e972016-10-07 12:39:57 -0400278 if (sock.family != socket.AF_UNIX or
Yury Selivanova1a8b7d2016-11-09 15:47:00 -0500279 not base_events._is_stream_socket(sock)):
Yury Selivanovb057c522014-02-18 12:15:06 -0500280 raise ValueError(
Yury Selivanov36e7e972016-10-07 12:39:57 -0400281 'A UNIX Domain Stream Socket was expected, got {!r}'
282 .format(sock))
Yury Selivanovb057c522014-02-18 12:15:06 -0500283
284 server = base_events.Server(self, [sock])
285 sock.listen(backlog)
286 sock.setblocking(False)
287 self._start_serving(protocol_factory, sock, ssl, server)
288 return server
289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291class _UnixReadPipeTransport(transports.ReadTransport):
292
Yury Selivanovdec1a452014-02-18 22:27:48 -0500293 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
295 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
296 super().__init__(extra)
297 self._extra['pipe'] = pipe
298 self._loop = loop
299 self._pipe = pipe
300 self._fileno = pipe.fileno()
Guido van Rossum47867872016-08-31 09:42:38 -0700301 self._protocol = protocol
302 self._closing = False
303
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700304 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800305 if not (stat.S_ISFIFO(mode) or
306 stat.S_ISSOCK(mode) or
307 stat.S_ISCHR(mode)):
Guido van Rossum47867872016-08-31 09:42:38 -0700308 self._pipe = None
309 self._fileno = None
310 self._protocol = None
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700311 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum47867872016-08-31 09:42:38 -0700312
Andrew Svetlovcc839202017-11-29 18:23:43 +0200313 os.set_blocking(self._fileno, False)
Guido van Rossum47867872016-08-31 09:42:38 -0700314
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100316 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400317 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100318 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100320 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500321 self._loop.call_soon(futures._set_result_unless_cancelled,
322 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
Victor Stinnere912e652014-07-12 03:11:53 +0200324 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100325 info = [self.__class__.__name__]
326 if self._pipe is None:
327 info.append('closed')
328 elif self._closing:
329 info.append('closing')
330 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400331 selector = getattr(self._loop, '_selector', None)
332 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200333 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400334 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200335 self._fileno, selectors.EVENT_READ)
336 if polling:
337 info.append('polling')
338 else:
339 info.append('idle')
Yury Selivanov5dc09332016-05-13 16:04:43 -0400340 elif self._pipe is not None:
341 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200342 else:
343 info.append('closed')
344 return '<%s>' % ' '.join(info)
345
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 def _read_ready(self):
347 try:
348 data = os.read(self._fileno, self.max_size)
349 except (BlockingIOError, InterruptedError):
350 pass
351 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100352 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 else:
354 if data:
355 self._protocol.data_received(data)
356 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200357 if self._loop.get_debug():
358 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400360 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 self._loop.call_soon(self._protocol.eof_received)
362 self._loop.call_soon(self._call_connection_lost, None)
363
Guido van Rossum57497ad2013-10-18 07:58:20 -0700364 def pause_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400365 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366
Guido van Rossum57497ad2013-10-18 07:58:20 -0700367 def resume_reading(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400368 self._loop._add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400370 def set_protocol(self, protocol):
371 self._protocol = protocol
372
373 def get_protocol(self):
374 return self._protocol
375
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500376 def is_closing(self):
377 return self._closing
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 def close(self):
380 if not self._closing:
381 self._close(None)
382
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900383 def __del__(self):
384 if self._pipe is not None:
385 warnings.warn("unclosed transport %r" % self, ResourceWarning,
386 source=self)
387 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100388
Victor Stinner0ee29c22014-02-19 01:40:41 +0100389 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200391 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
392 if self._loop.get_debug():
393 logger.debug("%r: %s", self, message, exc_info=True)
394 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500395 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100396 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500397 'exception': exc,
398 'transport': self,
399 'protocol': self._protocol,
400 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 self._close(exc)
402
403 def _close(self, exc):
404 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400405 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406 self._loop.call_soon(self._call_connection_lost, exc)
407
408 def _call_connection_lost(self, exc):
409 try:
410 self._protocol.connection_lost(exc)
411 finally:
412 self._pipe.close()
413 self._pipe = None
414 self._protocol = None
415 self._loop = None
416
417
Yury Selivanov3cb99142014-02-18 18:41:13 -0500418class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800419 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420
421 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100422 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 self._pipe = pipe
425 self._fileno = pipe.fileno()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 self._protocol = protocol
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400427 self._buffer = bytearray()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 self._conn_lost = 0
429 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700430
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700431 mode = os.fstat(self._fileno).st_mode
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700432 is_char = stat.S_ISCHR(mode)
433 is_fifo = stat.S_ISFIFO(mode)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700434 is_socket = stat.S_ISSOCK(mode)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700435 if not (is_char or is_fifo or is_socket):
Guido van Rossum47867872016-08-31 09:42:38 -0700436 self._pipe = None
437 self._fileno = None
438 self._protocol = None
Victor Stinner8dffc452014-01-25 15:32:06 +0100439 raise ValueError("Pipe transport is only for "
440 "pipes, sockets and character devices")
Guido van Rossum47867872016-08-31 09:42:38 -0700441
Andrew Svetlovcc839202017-11-29 18:23:43 +0200442 os.set_blocking(self._fileno, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100444
445 # On AIX, the reader trick (to be notified when the read end of the
446 # socket is closed) only works for sockets. On other platforms it
447 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
Guido van Rossum8b7918a2016-08-31 09:40:18 -0700448 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
Victor Stinner29342622015-01-29 14:15:19 +0100449 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400450 self._loop.call_soon(self._loop._add_reader,
Victor Stinner29342622015-01-29 14:15:19 +0100451 self._fileno, self._read_ready)
452
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100454 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500455 self._loop.call_soon(futures._set_result_unless_cancelled,
456 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457
Victor Stinnere912e652014-07-12 03:11:53 +0200458 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100459 info = [self.__class__.__name__]
460 if self._pipe is None:
461 info.append('closed')
462 elif self._closing:
463 info.append('closing')
464 info.append('fd=%s' % self._fileno)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400465 selector = getattr(self._loop, '_selector', None)
466 if self._pipe is not None and selector is not None:
Victor Stinnere912e652014-07-12 03:11:53 +0200467 polling = selector_events._test_selector_event(
Yury Selivanov5dc09332016-05-13 16:04:43 -0400468 selector,
Victor Stinnere912e652014-07-12 03:11:53 +0200469 self._fileno, selectors.EVENT_WRITE)
470 if polling:
471 info.append('polling')
472 else:
473 info.append('idle')
474
475 bufsize = self.get_write_buffer_size()
476 info.append('bufsize=%s' % bufsize)
Yury Selivanov5dc09332016-05-13 16:04:43 -0400477 elif self._pipe is not None:
478 info.append('open')
Victor Stinnere912e652014-07-12 03:11:53 +0200479 else:
480 info.append('closed')
481 return '<%s>' % ' '.join(info)
482
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800483 def get_write_buffer_size(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400484 return len(self._buffer)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800485
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700487 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200488 if self._loop.get_debug():
489 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100490 if self._buffer:
491 self._close(BrokenPipeError())
492 else:
493 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494
495 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800496 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
497 if isinstance(data, bytearray):
498 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 if not data:
500 return
501
502 if self._conn_lost or self._closing:
503 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700504 logger.warning('pipe closed by peer or '
505 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 self._conn_lost += 1
507 return
508
509 if not self._buffer:
510 # Attempt to send it right away first.
511 try:
512 n = os.write(self._fileno, data)
513 except (BlockingIOError, InterruptedError):
514 n = 0
515 except Exception as exc:
516 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100517 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 return
519 if n == len(data):
520 return
521 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400522 data = memoryview(data)[n:]
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400523 self._loop._add_writer(self._fileno, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400525 self._buffer += data
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800526 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527
528 def _write_ready(self):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400529 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 try:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400532 n = os.write(self._fileno, self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 except (BlockingIOError, InterruptedError):
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400534 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 except Exception as exc:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400536 self._buffer.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 self._conn_lost += 1
538 # Remove writer here, _fatal_error() doesn't it
539 # because _buffer is empty.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400540 self._loop._remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100541 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 else:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400543 if n == len(self._buffer):
544 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400545 self._loop._remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800546 self._maybe_resume_protocol() # May append to buffer.
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400547 if self._closing:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400548 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 self._call_connection_lost(None)
550 return
551 elif n > 0:
Yury Selivanov4c5bf3b2016-09-15 16:51:48 -0400552 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
554 def can_write_eof(self):
555 return True
556
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 def write_eof(self):
558 if self._closing:
559 return
560 assert self._pipe
561 self._closing = True
562 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400563 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 self._loop.call_soon(self._call_connection_lost, None)
565
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400566 def set_protocol(self, protocol):
567 self._protocol = protocol
568
569 def get_protocol(self):
570 return self._protocol
571
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500572 def is_closing(self):
573 return self._closing
574
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100576 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 # write_eof is all what we needed to close the write pipe
578 self.write_eof()
579
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900580 def __del__(self):
581 if self._pipe is not None:
582 warnings.warn("unclosed transport %r" % self, ResourceWarning,
583 source=self)
584 self._pipe.close()
Victor Stinner978a9af2015-01-29 17:50:58 +0100585
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 def abort(self):
587 self._close(None)
588
Victor Stinner0ee29c22014-02-19 01:40:41 +0100589 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200591 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200592 if self._loop.get_debug():
593 logger.debug("%r: %s", self, message, exc_info=True)
594 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500595 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100596 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500597 'exception': exc,
598 'transport': self,
599 'protocol': self._protocol,
600 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 self._close(exc)
602
603 def _close(self, exc=None):
604 self._closing = True
605 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400606 self._loop._remove_writer(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400608 self._loop._remove_reader(self._fileno)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 self._loop.call_soon(self._call_connection_lost, exc)
610
611 def _call_connection_lost(self, exc):
612 try:
613 self._protocol.connection_lost(exc)
614 finally:
615 self._pipe.close()
616 self._pipe = None
617 self._protocol = None
618 self._loop = None
619
620
Guido van Rossum59691282013-10-30 14:52:03 -0700621class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622
Guido van Rossum59691282013-10-30 14:52:03 -0700623 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700624 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700626 # Use a socket pair for stdin, since not all platforms
627 # support selecting read events on the write end of a
628 # socket (which we use in order to detect closing of the
629 # other end). Notably this is needed on AIX, and works
630 # just fine on other platforms.
Victor Stinnera10dc3e2017-11-28 11:15:26 +0100631 stdin, stdin_w = socket.socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 self._proc = subprocess.Popen(
633 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
634 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700635 if stdin_w is not None:
636 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200637 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800638
639
640class AbstractChildWatcher:
641 """Abstract base class for monitoring child processes.
642
643 Objects derived from this class monitor a collection of subprocesses and
644 report their termination or interruption by a signal.
645
646 New callbacks are registered with .add_child_handler(). Starting a new
647 process must be done within a 'with' block to allow the watcher to suspend
648 its activity until the new process if fully registered (this is needed to
649 prevent a race condition in some implementations).
650
651 Example:
652 with watcher:
653 proc = subprocess.Popen("sleep 1")
654 watcher.add_child_handler(proc.pid, callback)
655
656 Notes:
657 Implementations of this class must be thread-safe.
658
659 Since child watcher objects may catch the SIGCHLD signal and call
660 waitpid(-1), there should be only one active object per process.
661 """
662
663 def add_child_handler(self, pid, callback, *args):
664 """Register a new child handler.
665
666 Arrange for callback(pid, returncode, *args) to be called when
667 process 'pid' terminates. Specifying another callback for the same
668 process replaces the previous handler.
669
Victor Stinneracdb7822014-07-14 18:33:40 +0200670 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800671 """
672 raise NotImplementedError()
673
674 def remove_child_handler(self, pid):
675 """Removes the handler for process 'pid'.
676
677 The function returns True if the handler was successfully removed,
678 False if there was nothing to remove."""
679
680 raise NotImplementedError()
681
Guido van Rossum2bcae702013-11-13 15:50:08 -0800682 def attach_loop(self, loop):
683 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800684
Guido van Rossum2bcae702013-11-13 15:50:08 -0800685 If the watcher was previously attached to an event loop, then it is
686 first detached before attaching to the new loop.
687
688 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800689 """
690 raise NotImplementedError()
691
692 def close(self):
693 """Close the watcher.
694
695 This must be called to make sure that any underlying resource is freed.
696 """
697 raise NotImplementedError()
698
699 def __enter__(self):
700 """Enter the watcher's context and allow starting new processes
701
702 This function must return self"""
703 raise NotImplementedError()
704
705 def __exit__(self, a, b, c):
706 """Exit the watcher's context"""
707 raise NotImplementedError()
708
709
710class BaseChildWatcher(AbstractChildWatcher):
711
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713 self._loop = None
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400714 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800715
716 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800717 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800718
719 def _do_waitpid(self, expected_pid):
720 raise NotImplementedError()
721
722 def _do_waitpid_all(self):
723 raise NotImplementedError()
724
Guido van Rossum2bcae702013-11-13 15:50:08 -0800725 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726 assert loop is None or isinstance(loop, events.AbstractEventLoop)
727
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400728 if self._loop is not None and loop is None and self._callbacks:
729 warnings.warn(
730 'A loop is being detached '
731 'from a child watcher with pending handlers',
732 RuntimeWarning)
733
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800734 if self._loop is not None:
735 self._loop.remove_signal_handler(signal.SIGCHLD)
736
737 self._loop = loop
738 if loop is not None:
739 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
740
741 # Prevent a race condition in case a child terminated
742 # during the switch.
743 self._do_waitpid_all()
744
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800745 def _sig_chld(self):
746 try:
747 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500748 except Exception as exc:
749 # self._loop should always be available here
750 # as '_sig_chld' is added as a signal handler
751 # in 'attach_loop'
752 self._loop.call_exception_handler({
753 'message': 'Unknown exception in SIGCHLD handler',
754 'exception': exc,
755 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800756
757 def _compute_returncode(self, status):
758 if os.WIFSIGNALED(status):
759 # The child process died because of a signal.
760 return -os.WTERMSIG(status)
761 elif os.WIFEXITED(status):
762 # The child process exited (e.g sys.exit()).
763 return os.WEXITSTATUS(status)
764 else:
765 # The child exited, but we don't understand its status.
766 # This shouldn't happen, but if it does, let's just
767 # return that status; perhaps that helps debug it.
768 return status
769
770
771class SafeChildWatcher(BaseChildWatcher):
772 """'Safe' child watcher implementation.
773
774 This implementation avoids disrupting other code spawning processes by
775 polling explicitly each process in the SIGCHLD handler instead of calling
776 os.waitpid(-1).
777
778 This is a safe solution but it has a significant overhead when handling a
779 big number of children (O(n) each time SIGCHLD is raised)
780 """
781
Guido van Rossum2bcae702013-11-13 15:50:08 -0800782 def close(self):
783 self._callbacks.clear()
784 super().close()
785
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800786 def __enter__(self):
787 return self
788
789 def __exit__(self, a, b, c):
790 pass
791
792 def add_child_handler(self, pid, callback, *args):
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400793 if self._loop is None:
794 raise RuntimeError(
795 "Cannot add child handler, "
796 "the child watcher does not have a loop attached")
797
Victor Stinner47cd10d2015-01-30 00:05:19 +0100798 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800799
800 # Prevent a race condition in case the child is already terminated.
801 self._do_waitpid(pid)
802
Guido van Rossum2bcae702013-11-13 15:50:08 -0800803 def remove_child_handler(self, pid):
804 try:
805 del self._callbacks[pid]
806 return True
807 except KeyError:
808 return False
809
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800810 def _do_waitpid_all(self):
811
812 for pid in list(self._callbacks):
813 self._do_waitpid(pid)
814
815 def _do_waitpid(self, expected_pid):
816 assert expected_pid > 0
817
818 try:
819 pid, status = os.waitpid(expected_pid, os.WNOHANG)
820 except ChildProcessError:
821 # The child process is already reaped
822 # (may happen if waitpid() is called elsewhere).
823 pid = expected_pid
824 returncode = 255
825 logger.warning(
826 "Unknown child process pid %d, will report returncode 255",
827 pid)
828 else:
829 if pid == 0:
830 # The child process is still alive.
831 return
832
833 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200834 if self._loop.get_debug():
835 logger.debug('process %s exited with returncode %s',
836 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800837
838 try:
839 callback, args = self._callbacks.pop(pid)
840 except KeyError: # pragma: no cover
841 # May happen if .remove_child_handler() is called
842 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200843 if self._loop.get_debug():
844 logger.warning("Child watcher got an unexpected pid: %r",
845 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800846 else:
847 callback(pid, returncode, *args)
848
849
850class FastChildWatcher(BaseChildWatcher):
851 """'Fast' child watcher implementation.
852
853 This implementation reaps every terminated processes by calling
854 os.waitpid(-1) directly, possibly breaking other code spawning processes
855 and waiting for their termination.
856
857 There is no noticeable overhead when handling a big number of children
858 (O(1) each time a child terminates).
859 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800860 def __init__(self):
861 super().__init__()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862 self._lock = threading.Lock()
863 self._zombies = {}
864 self._forks = 0
865
866 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800867 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800869 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870
871 def __enter__(self):
872 with self._lock:
873 self._forks += 1
874
875 return self
876
877 def __exit__(self, a, b, c):
878 with self._lock:
879 self._forks -= 1
880
881 if self._forks or not self._zombies:
882 return
883
884 collateral_victims = str(self._zombies)
885 self._zombies.clear()
886
887 logger.warning(
888 "Caught subprocesses termination from unknown pids: %s",
889 collateral_victims)
890
891 def add_child_handler(self, pid, callback, *args):
892 assert self._forks, "Must use the context manager"
Yury Selivanov9eb6c672016-10-05 16:57:12 -0400893
894 if self._loop is None:
895 raise RuntimeError(
896 "Cannot add child handler, "
897 "the child watcher does not have a loop attached")
898
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800899 with self._lock:
900 try:
901 returncode = self._zombies.pop(pid)
902 except KeyError:
903 # The child is running.
904 self._callbacks[pid] = callback, args
905 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800906
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800907 # The child is dead already. We can fire the callback.
908 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800909
Guido van Rossum2bcae702013-11-13 15:50:08 -0800910 def remove_child_handler(self, pid):
911 try:
912 del self._callbacks[pid]
913 return True
914 except KeyError:
915 return False
916
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800917 def _do_waitpid_all(self):
918 # Because of signal coalescing, we must keep calling waitpid() as
919 # long as we're able to reap a child.
920 while True:
921 try:
922 pid, status = os.waitpid(-1, os.WNOHANG)
923 except ChildProcessError:
924 # No more child processes exist.
925 return
926 else:
927 if pid == 0:
928 # A child process is still alive.
929 return
930
931 returncode = self._compute_returncode(status)
932
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800933 with self._lock:
934 try:
935 callback, args = self._callbacks.pop(pid)
936 except KeyError:
937 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800938 if self._forks:
939 # It may not be registered yet.
940 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200941 if self._loop.get_debug():
942 logger.debug('unknown process %s exited '
943 'with returncode %s',
944 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800945 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800946 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200947 else:
948 if self._loop.get_debug():
949 logger.debug('process %s exited with returncode %s',
950 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800951
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800952 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800953 logger.warning(
954 "Caught subprocess termination from unknown pid: "
955 "%d -> %d", pid, returncode)
956 else:
957 callback(pid, returncode, *args)
958
959
960class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100961 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800962 _loop_factory = _UnixSelectorEventLoop
963
964 def __init__(self):
965 super().__init__()
966 self._watcher = None
967
968 def _init_watcher(self):
969 with events._lock:
970 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800971 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800972 if isinstance(threading.current_thread(),
973 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800974 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800975
976 def set_event_loop(self, loop):
977 """Set the event loop.
978
979 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800980 .set_event_loop() from the main thread will call .attach_loop(loop) on
981 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800982 """
983
984 super().set_event_loop(loop)
985
Andrew Svetlovcc839202017-11-29 18:23:43 +0200986 if (self._watcher is not None and
987 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800988 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800989
990 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200991 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800992
993 If not yet set, a SafeChildWatcher object is automatically created.
994 """
995 if self._watcher is None:
996 self._init_watcher()
997
998 return self._watcher
999
1000 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +02001001 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001002
1003 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1004
1005 if self._watcher is not None:
1006 self._watcher.close()
1007
1008 self._watcher = watcher
1009
1010SelectorEventLoop = _UnixSelectorEventLoop
1011DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy