blob: aad1068f554258dff483b8baf87a84277f5bf441 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100180 waiter = futures.Future(loop=self)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
237 sock.setblocking(False)
238
239 transport, protocol = yield from self._create_connection_transport(
240 sock, protocol_factory, ssl, server_hostname)
241 return transport, protocol
242
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 def create_unix_server(self, protocol_factory, path=None, *,
245 sock=None, backlog=100, ssl=None):
246 if isinstance(ssl, bool):
247 raise TypeError('ssl argument must be an SSLContext or None')
248
249 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
255
256 try:
257 sock.bind(path)
258 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100259 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if exc.errno == errno.EADDRINUSE:
261 # Let's improve the error message by adding
262 # with what exact address it occurs.
263 msg = 'Address {!r} is already in use'.format(path)
264 raise OSError(errno.EADDRINUSE, msg) from None
265 else:
266 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200267 except:
268 sock.close()
269 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 else:
271 if sock is None:
272 raise ValueError(
273 'path was not specified, and no sock specified')
274
275 if sock.family != socket.AF_UNIX:
276 raise ValueError(
277 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
278
279 server = base_events.Server(self, [sock])
280 sock.listen(backlog)
281 sock.setblocking(False)
282 self._start_serving(protocol_factory, sock, ssl, server)
283 return server
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200286if hasattr(os, 'set_blocking'):
287 def _set_nonblocking(fd):
288 os.set_blocking(fd, False)
289else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400290 import fcntl
291
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200292 def _set_nonblocking(fd):
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
294 flags = flags | os.O_NONBLOCK
295 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
297
298class _UnixReadPipeTransport(transports.ReadTransport):
299
Yury Selivanovdec1a452014-02-18 22:27:48 -0500300 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
303 super().__init__(extra)
304 self._extra['pipe'] = pipe
305 self._loop = loop
306 self._pipe = pipe
307 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700308 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800309 if not (stat.S_ISFIFO(mode) or
310 stat.S_ISSOCK(mode) or
311 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700312 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 _set_nonblocking(self._fileno)
314 self._protocol = protocol
315 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100317 # only start reading when connection_made() has been called
318 self._loop.call_soon(self._loop.add_reader,
319 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100321 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500322 self._loop.call_soon(futures._set_result_unless_cancelled,
323 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
Victor Stinnere912e652014-07-12 03:11:53 +0200325 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100326 info = [self.__class__.__name__]
327 if self._pipe is None:
328 info.append('closed')
329 elif self._closing:
330 info.append('closing')
331 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200332 if self._pipe is not None:
333 polling = selector_events._test_selector_event(
334 self._loop._selector,
335 self._fileno, selectors.EVENT_READ)
336 if polling:
337 info.append('polling')
338 else:
339 info.append('idle')
340 else:
341 info.append('closed')
342 return '<%s>' % ' '.join(info)
343
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 def _read_ready(self):
345 try:
346 data = os.read(self._fileno, self.max_size)
347 except (BlockingIOError, InterruptedError):
348 pass
349 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100350 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 else:
352 if data:
353 self._protocol.data_received(data)
354 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200355 if self._loop.get_debug():
356 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._closing = True
358 self._loop.remove_reader(self._fileno)
359 self._loop.call_soon(self._protocol.eof_received)
360 self._loop.call_soon(self._call_connection_lost, None)
361
Guido van Rossum57497ad2013-10-18 07:58:20 -0700362 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 self._loop.remove_reader(self._fileno)
364
Guido van Rossum57497ad2013-10-18 07:58:20 -0700365 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._loop.add_reader(self._fileno, self._read_ready)
367
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500368 def is_closing(self):
369 return self._closing
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371 def close(self):
372 if not self._closing:
373 self._close(None)
374
Victor Stinner978a9af2015-01-29 17:50:58 +0100375 # On Python 3.3 and older, objects with a destructor part of a reference
376 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
377 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400378 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100379 def __del__(self):
380 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100381 warnings.warn("unclosed transport %r" % self, ResourceWarning,
382 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100383 self._pipe.close()
384
Victor Stinner0ee29c22014-02-19 01:40:41 +0100385 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200387 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
388 if self._loop.get_debug():
389 logger.debug("%r: %s", self, message, exc_info=True)
390 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500391 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100392 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500393 'exception': exc,
394 'transport': self,
395 'protocol': self._protocol,
396 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 self._close(exc)
398
399 def _close(self, exc):
400 self._closing = True
401 self._loop.remove_reader(self._fileno)
402 self._loop.call_soon(self._call_connection_lost, exc)
403
404 def _call_connection_lost(self, exc):
405 try:
406 self._protocol.connection_lost(exc)
407 finally:
408 self._pipe.close()
409 self._pipe = None
410 self._protocol = None
411 self._loop = None
412
413
Yury Selivanov3cb99142014-02-18 18:41:13 -0500414class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800415 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416
417 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100418 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 self._pipe = pipe
421 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700422 mode = os.fstat(self._fileno).st_mode
423 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100424 if not (is_socket or
425 stat.S_ISFIFO(mode) or
426 stat.S_ISCHR(mode)):
427 raise ValueError("Pipe transport is only for "
428 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 _set_nonblocking(self._fileno)
430 self._protocol = protocol
431 self._buffer = []
432 self._conn_lost = 0
433 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700434
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100436
437 # On AIX, the reader trick (to be notified when the read end of the
438 # socket is closed) only works for sockets. On other platforms it
439 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
440 if is_socket or not sys.platform.startswith("aix"):
441 # only start reading when connection_made() has been called
442 self._loop.call_soon(self._loop.add_reader,
443 self._fileno, self._read_ready)
444
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100446 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500447 self._loop.call_soon(futures._set_result_unless_cancelled,
448 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449
Victor Stinnere912e652014-07-12 03:11:53 +0200450 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100451 info = [self.__class__.__name__]
452 if self._pipe is None:
453 info.append('closed')
454 elif self._closing:
455 info.append('closing')
456 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200457 if self._pipe is not None:
458 polling = selector_events._test_selector_event(
459 self._loop._selector,
460 self._fileno, selectors.EVENT_WRITE)
461 if polling:
462 info.append('polling')
463 else:
464 info.append('idle')
465
466 bufsize = self.get_write_buffer_size()
467 info.append('bufsize=%s' % bufsize)
468 else:
469 info.append('closed')
470 return '<%s>' % ' '.join(info)
471
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800472 def get_write_buffer_size(self):
473 return sum(len(data) for data in self._buffer)
474
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700476 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200477 if self._loop.get_debug():
478 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100479 if self._buffer:
480 self._close(BrokenPipeError())
481 else:
482 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
484 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800485 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
486 if isinstance(data, bytearray):
487 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 if not data:
489 return
490
491 if self._conn_lost or self._closing:
492 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700493 logger.warning('pipe closed by peer or '
494 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700495 self._conn_lost += 1
496 return
497
498 if not self._buffer:
499 # Attempt to send it right away first.
500 try:
501 n = os.write(self._fileno, data)
502 except (BlockingIOError, InterruptedError):
503 n = 0
504 except Exception as exc:
505 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100506 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 return
508 if n == len(data):
509 return
510 elif n > 0:
511 data = data[n:]
512 self._loop.add_writer(self._fileno, self._write_ready)
513
514 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800515 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516
517 def _write_ready(self):
518 data = b''.join(self._buffer)
519 assert data, 'Data should not be empty'
520
521 self._buffer.clear()
522 try:
523 n = os.write(self._fileno, data)
524 except (BlockingIOError, InterruptedError):
525 self._buffer.append(data)
526 except Exception as exc:
527 self._conn_lost += 1
528 # Remove writer here, _fatal_error() doesn't it
529 # because _buffer is empty.
530 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100531 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 else:
533 if n == len(data):
534 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800535 self._maybe_resume_protocol() # May append to buffer.
536 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 self._loop.remove_reader(self._fileno)
538 self._call_connection_lost(None)
539 return
540 elif n > 0:
541 data = data[n:]
542
543 self._buffer.append(data) # Try again later.
544
545 def can_write_eof(self):
546 return True
547
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 def write_eof(self):
549 if self._closing:
550 return
551 assert self._pipe
552 self._closing = True
553 if not self._buffer:
554 self._loop.remove_reader(self._fileno)
555 self._loop.call_soon(self._call_connection_lost, None)
556
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500557 def is_closing(self):
558 return self._closing
559
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100561 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 # write_eof is all what we needed to close the write pipe
563 self.write_eof()
564
Victor Stinner978a9af2015-01-29 17:50:58 +0100565 # On Python 3.3 and older, objects with a destructor part of a reference
566 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
567 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400568 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100569 def __del__(self):
570 if self._pipe is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100571 warnings.warn("unclosed transport %r" % self, ResourceWarning,
572 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100573 self._pipe.close()
574
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 def abort(self):
576 self._close(None)
577
Victor Stinner0ee29c22014-02-19 01:40:41 +0100578 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 # should be called by exception handler only
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200580 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnerb2614752014-08-25 23:20:52 +0200581 if self._loop.get_debug():
582 logger.debug("%r: %s", self, message, exc_info=True)
583 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500584 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100585 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500586 'exception': exc,
587 'transport': self,
588 'protocol': self._protocol,
589 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 self._close(exc)
591
592 def _close(self, exc=None):
593 self._closing = True
594 if self._buffer:
595 self._loop.remove_writer(self._fileno)
596 self._buffer.clear()
597 self._loop.remove_reader(self._fileno)
598 self._loop.call_soon(self._call_connection_lost, exc)
599
600 def _call_connection_lost(self, exc):
601 try:
602 self._protocol.connection_lost(exc)
603 finally:
604 self._pipe.close()
605 self._pipe = None
606 self._protocol = None
607 self._loop = None
608
609
Victor Stinner1e40f102014-12-11 23:30:17 +0100610if hasattr(os, 'set_inheritable'):
611 # Python 3.4 and newer
612 _set_inheritable = os.set_inheritable
613else:
614 import fcntl
615
616 def _set_inheritable(fd, inheritable):
617 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
618
619 old = fcntl.fcntl(fd, fcntl.F_GETFD)
620 if not inheritable:
621 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
622 else:
623 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
624
625
Guido van Rossum59691282013-10-30 14:52:03 -0700626class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627
Guido van Rossum59691282013-10-30 14:52:03 -0700628 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700629 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700630 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700631 # Use a socket pair for stdin, since not all platforms
632 # support selecting read events on the write end of a
633 # socket (which we use in order to detect closing of the
634 # other end). Notably this is needed on AIX, and works
635 # just fine on other platforms.
636 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100637
638 # Mark the write end of the stdin pipe as non-inheritable,
639 # needed by close_fds=False on Python 3.3 and older
640 # (Python 3.4 implements the PEP 446, socketpair returns
641 # non-inheritable sockets)
642 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643 self._proc = subprocess.Popen(
644 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
645 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700646 if stdin_w is not None:
647 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200648 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800649
650
651class AbstractChildWatcher:
652 """Abstract base class for monitoring child processes.
653
654 Objects derived from this class monitor a collection of subprocesses and
655 report their termination or interruption by a signal.
656
657 New callbacks are registered with .add_child_handler(). Starting a new
658 process must be done within a 'with' block to allow the watcher to suspend
659 its activity until the new process if fully registered (this is needed to
660 prevent a race condition in some implementations).
661
662 Example:
663 with watcher:
664 proc = subprocess.Popen("sleep 1")
665 watcher.add_child_handler(proc.pid, callback)
666
667 Notes:
668 Implementations of this class must be thread-safe.
669
670 Since child watcher objects may catch the SIGCHLD signal and call
671 waitpid(-1), there should be only one active object per process.
672 """
673
674 def add_child_handler(self, pid, callback, *args):
675 """Register a new child handler.
676
677 Arrange for callback(pid, returncode, *args) to be called when
678 process 'pid' terminates. Specifying another callback for the same
679 process replaces the previous handler.
680
Victor Stinneracdb7822014-07-14 18:33:40 +0200681 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800682 """
683 raise NotImplementedError()
684
685 def remove_child_handler(self, pid):
686 """Removes the handler for process 'pid'.
687
688 The function returns True if the handler was successfully removed,
689 False if there was nothing to remove."""
690
691 raise NotImplementedError()
692
Guido van Rossum2bcae702013-11-13 15:50:08 -0800693 def attach_loop(self, loop):
694 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800695
Guido van Rossum2bcae702013-11-13 15:50:08 -0800696 If the watcher was previously attached to an event loop, then it is
697 first detached before attaching to the new loop.
698
699 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800700 """
701 raise NotImplementedError()
702
703 def close(self):
704 """Close the watcher.
705
706 This must be called to make sure that any underlying resource is freed.
707 """
708 raise NotImplementedError()
709
710 def __enter__(self):
711 """Enter the watcher's context and allow starting new processes
712
713 This function must return self"""
714 raise NotImplementedError()
715
716 def __exit__(self, a, b, c):
717 """Exit the watcher's context"""
718 raise NotImplementedError()
719
720
721class BaseChildWatcher(AbstractChildWatcher):
722
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725
726 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800727 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800728
729 def _do_waitpid(self, expected_pid):
730 raise NotImplementedError()
731
732 def _do_waitpid_all(self):
733 raise NotImplementedError()
734
Guido van Rossum2bcae702013-11-13 15:50:08 -0800735 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800736 assert loop is None or isinstance(loop, events.AbstractEventLoop)
737
738 if self._loop is not None:
739 self._loop.remove_signal_handler(signal.SIGCHLD)
740
741 self._loop = loop
742 if loop is not None:
743 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
744
745 # Prevent a race condition in case a child terminated
746 # during the switch.
747 self._do_waitpid_all()
748
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800749 def _sig_chld(self):
750 try:
751 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500752 except Exception as exc:
753 # self._loop should always be available here
754 # as '_sig_chld' is added as a signal handler
755 # in 'attach_loop'
756 self._loop.call_exception_handler({
757 'message': 'Unknown exception in SIGCHLD handler',
758 'exception': exc,
759 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800760
761 def _compute_returncode(self, status):
762 if os.WIFSIGNALED(status):
763 # The child process died because of a signal.
764 return -os.WTERMSIG(status)
765 elif os.WIFEXITED(status):
766 # The child process exited (e.g sys.exit()).
767 return os.WEXITSTATUS(status)
768 else:
769 # The child exited, but we don't understand its status.
770 # This shouldn't happen, but if it does, let's just
771 # return that status; perhaps that helps debug it.
772 return status
773
774
775class SafeChildWatcher(BaseChildWatcher):
776 """'Safe' child watcher implementation.
777
778 This implementation avoids disrupting other code spawning processes by
779 polling explicitly each process in the SIGCHLD handler instead of calling
780 os.waitpid(-1).
781
782 This is a safe solution but it has a significant overhead when handling a
783 big number of children (O(n) each time SIGCHLD is raised)
784 """
785
Guido van Rossum2bcae702013-11-13 15:50:08 -0800786 def __init__(self):
787 super().__init__()
788 self._callbacks = {}
789
790 def close(self):
791 self._callbacks.clear()
792 super().close()
793
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800794 def __enter__(self):
795 return self
796
797 def __exit__(self, a, b, c):
798 pass
799
800 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100801 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800802
803 # Prevent a race condition in case the child is already terminated.
804 self._do_waitpid(pid)
805
Guido van Rossum2bcae702013-11-13 15:50:08 -0800806 def remove_child_handler(self, pid):
807 try:
808 del self._callbacks[pid]
809 return True
810 except KeyError:
811 return False
812
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800813 def _do_waitpid_all(self):
814
815 for pid in list(self._callbacks):
816 self._do_waitpid(pid)
817
818 def _do_waitpid(self, expected_pid):
819 assert expected_pid > 0
820
821 try:
822 pid, status = os.waitpid(expected_pid, os.WNOHANG)
823 except ChildProcessError:
824 # The child process is already reaped
825 # (may happen if waitpid() is called elsewhere).
826 pid = expected_pid
827 returncode = 255
828 logger.warning(
829 "Unknown child process pid %d, will report returncode 255",
830 pid)
831 else:
832 if pid == 0:
833 # The child process is still alive.
834 return
835
836 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200837 if self._loop.get_debug():
838 logger.debug('process %s exited with returncode %s',
839 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840
841 try:
842 callback, args = self._callbacks.pop(pid)
843 except KeyError: # pragma: no cover
844 # May happen if .remove_child_handler() is called
845 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200846 if self._loop.get_debug():
847 logger.warning("Child watcher got an unexpected pid: %r",
848 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849 else:
850 callback(pid, returncode, *args)
851
852
853class FastChildWatcher(BaseChildWatcher):
854 """'Fast' child watcher implementation.
855
856 This implementation reaps every terminated processes by calling
857 os.waitpid(-1) directly, possibly breaking other code spawning processes
858 and waiting for their termination.
859
860 There is no noticeable overhead when handling a big number of children
861 (O(1) each time a child terminates).
862 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800863 def __init__(self):
864 super().__init__()
865 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800866 self._lock = threading.Lock()
867 self._zombies = {}
868 self._forks = 0
869
870 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800871 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800872 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800873 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800874
875 def __enter__(self):
876 with self._lock:
877 self._forks += 1
878
879 return self
880
881 def __exit__(self, a, b, c):
882 with self._lock:
883 self._forks -= 1
884
885 if self._forks or not self._zombies:
886 return
887
888 collateral_victims = str(self._zombies)
889 self._zombies.clear()
890
891 logger.warning(
892 "Caught subprocesses termination from unknown pids: %s",
893 collateral_victims)
894
895 def add_child_handler(self, pid, callback, *args):
896 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800897 with self._lock:
898 try:
899 returncode = self._zombies.pop(pid)
900 except KeyError:
901 # The child is running.
902 self._callbacks[pid] = callback, args
903 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800905 # The child is dead already. We can fire the callback.
906 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800907
Guido van Rossum2bcae702013-11-13 15:50:08 -0800908 def remove_child_handler(self, pid):
909 try:
910 del self._callbacks[pid]
911 return True
912 except KeyError:
913 return False
914
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800915 def _do_waitpid_all(self):
916 # Because of signal coalescing, we must keep calling waitpid() as
917 # long as we're able to reap a child.
918 while True:
919 try:
920 pid, status = os.waitpid(-1, os.WNOHANG)
921 except ChildProcessError:
922 # No more child processes exist.
923 return
924 else:
925 if pid == 0:
926 # A child process is still alive.
927 return
928
929 returncode = self._compute_returncode(status)
930
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800931 with self._lock:
932 try:
933 callback, args = self._callbacks.pop(pid)
934 except KeyError:
935 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800936 if self._forks:
937 # It may not be registered yet.
938 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200939 if self._loop.get_debug():
940 logger.debug('unknown process %s exited '
941 'with returncode %s',
942 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800943 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800944 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200945 else:
946 if self._loop.get_debug():
947 logger.debug('process %s exited with returncode %s',
948 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800949
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800950 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800951 logger.warning(
952 "Caught subprocess termination from unknown pid: "
953 "%d -> %d", pid, returncode)
954 else:
955 callback(pid, returncode, *args)
956
957
958class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100959 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800960 _loop_factory = _UnixSelectorEventLoop
961
962 def __init__(self):
963 super().__init__()
964 self._watcher = None
965
966 def _init_watcher(self):
967 with events._lock:
968 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800969 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800970 if isinstance(threading.current_thread(),
971 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800972 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800973
974 def set_event_loop(self, loop):
975 """Set the event loop.
976
977 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800978 .set_event_loop() from the main thread will call .attach_loop(loop) on
979 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800980 """
981
982 super().set_event_loop(loop)
983
984 if self._watcher is not None and \
985 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800986 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800987
988 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200989 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800990
991 If not yet set, a SafeChildWatcher object is automatically created.
992 """
993 if self._watcher is None:
994 self._init_watcher()
995
996 return self._watcher
997
998 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200999 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08001000
1001 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1002
1003 if self._watcher is not None:
1004 self._watcher.close()
1005
1006 self._watcher = watcher
1007
1008SelectorEventLoop = _UnixSelectorEventLoop
1009DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy