blob: f75e89f3175e706853d544c855e3273331db97b9 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Victor Stinner978a9af2015-01-29 17:50:58 +010011import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Yury Selivanov2a8911c2015-08-04 15:56:33 -040016from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080018from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import events
Victor Stinner47cd10d2015-01-30 00:05:19 +010020from . import futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020022from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020024from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070025from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
Victor Stinner915bcb02014-02-01 22:49:59 +010028__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080029 'AbstractChildWatcher', 'SafeChildWatcher',
30 'FastChildWatcher', 'DefaultEventLoopPolicy',
31 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033if sys.platform == 'win32': # pragma: no cover
34 raise ImportError('Signals are not really supported on Windows')
35
36
Victor Stinnerfe5649c2014-07-17 22:43:40 +020037def _sighandler_noop(signum, frame):
38 """Dummy signal handler."""
39 pass
40
41
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080042class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050043 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Yury Selivanovb057c522014-02-18 12:15:06 -050045 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046 """
47
48 def __init__(self, selector=None):
49 super().__init__(selector)
50 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
52 def _socketpair(self):
53 return socket.socketpair()
54
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020056 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080057 for sig in list(self._signal_handlers):
58 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080059
Victor Stinnerfe5649c2014-07-17 22:43:40 +020060 def _process_self_data(self, data):
61 for signum in data:
62 if not signum:
63 # ignore null bytes written by _write_to_self()
64 continue
65 self._handle_signal(signum)
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067 def add_signal_handler(self, sig, callback, *args):
68 """Add a handler for a signal. UNIX only.
69
70 Raise ValueError if the signal number is invalid or uncatchable.
71 Raise RuntimeError if there is a problem setting up the handler.
72 """
Victor Stinner2d99d932014-11-20 15:03:52 +010073 if (coroutines.iscoroutine(callback)
74 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010075 raise TypeError("coroutines cannot be used "
76 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010078 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 try:
80 # set_wakeup_fd() raises ValueError if this is not the
81 # main thread. By calling it early we ensure that an
82 # event loop running in another thread cannot add a signal
83 # handler.
84 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020085 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 raise RuntimeError(str(exc))
87
Yury Selivanov569efa22014-02-18 18:02:19 -050088 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089 self._signal_handlers[sig] = handle
90
91 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020092 # Register a dummy signal handler to ask Python to write the signal
93 # number in the wakup file descriptor. _process_self_data() will
94 # read signal numbers from this file descriptor to handle signals.
95 signal.signal(sig, _sighandler_noop)
96
Charles-François Natali74e7cf32013-12-05 22:47:19 +010097 # Set SA_RESTART to limit EINTR occurrences.
98 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 except OSError as exc:
100 del self._signal_handlers[sig]
101 if not self._signal_handlers:
102 try:
103 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200104 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700105 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 if exc.errno == errno.EINVAL:
108 raise RuntimeError('sig {} cannot be caught'.format(sig))
109 else:
110 raise
111
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200112 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 """Internal helper that is the actual signal handler."""
114 handle = self._signal_handlers.get(sig)
115 if handle is None:
116 return # Assume it's some race condition.
117 if handle._cancelled:
118 self.remove_signal_handler(sig) # Remove it properly.
119 else:
120 self._add_callback_signalsafe(handle)
121
122 def remove_signal_handler(self, sig):
123 """Remove a handler for a signal. UNIX only.
124
125 Return True if a signal handler was removed, False if not.
126 """
127 self._check_signal(sig)
128 try:
129 del self._signal_handlers[sig]
130 except KeyError:
131 return False
132
133 if sig == signal.SIGINT:
134 handler = signal.default_int_handler
135 else:
136 handler = signal.SIG_DFL
137
138 try:
139 signal.signal(sig, handler)
140 except OSError as exc:
141 if exc.errno == errno.EINVAL:
142 raise RuntimeError('sig {} cannot be caught'.format(sig))
143 else:
144 raise
145
146 if not self._signal_handlers:
147 try:
148 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200149 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700150 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 return True
153
154 def _check_signal(self, sig):
155 """Internal helper to validate a signal.
156
157 Raise ValueError if the signal number is invalid or uncatchable.
158 Raise RuntimeError if there is a problem setting up the handler.
159 """
160 if not isinstance(sig, int):
161 raise TypeError('sig must be an int, not {!r}'.format(sig))
162
163 if not (1 <= sig < signal.NSIG):
164 raise ValueError(
165 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
166
167 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
168 extra=None):
169 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
170
171 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
172 extra=None):
173 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 with events.get_child_watcher() as watcher:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100180 waiter = futures.Future(loop=self)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 transp = _UnixSubprocessTransport(self, protocol, args, shell,
182 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100183 waiter=waiter, extra=extra,
184 **kwargs)
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 watcher.add_child_handler(transp.get_pid(),
187 self._child_watcher_callback, transp)
Victor Stinner47cd10d2015-01-30 00:05:19 +0100188 try:
189 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100190 except Exception as exc:
191 # Workaround CPython bug #23353: using yield/yield-from in an
192 # except block of a generator doesn't clear properly
193 # sys.exc_info()
194 err = exc
195 else:
196 err = None
197
198 if err is not None:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100199 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100200 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100201 raise err
Guido van Rossum4835f172014-01-10 13:28:59 -0800202
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700203 return transp
204
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800205 def _child_watcher_callback(self, pid, returncode, transp):
206 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
Victor Stinnerf951d282014-06-29 00:46:45 +0200208 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 def create_unix_connection(self, protocol_factory, path, *,
210 ssl=None, sock=None,
211 server_hostname=None):
212 assert server_hostname is None or isinstance(server_hostname, str)
213 if ssl:
214 if server_hostname is None:
215 raise ValueError(
216 'you have to pass server_hostname when using ssl')
217 else:
218 if server_hostname is not None:
219 raise ValueError('server_hostname is only meaningful with ssl')
220
221 if path is not None:
222 if sock is not None:
223 raise ValueError(
224 'path and sock can not be specified at the same time')
225
Victor Stinner79a29522014-02-19 01:45:59 +0100226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500227 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500228 sock.setblocking(False)
229 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100230 except:
231 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 raise
233
234 else:
235 if sock is None:
236 raise ValueError('no path and sock were specified')
237 sock.setblocking(False)
238
239 transport, protocol = yield from self._create_connection_transport(
240 sock, protocol_factory, ssl, server_hostname)
241 return transport, protocol
242
Victor Stinnerf951d282014-06-29 00:46:45 +0200243 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500244 def create_unix_server(self, protocol_factory, path=None, *,
245 sock=None, backlog=100, ssl=None):
246 if isinstance(ssl, bool):
247 raise TypeError('ssl argument must be an SSLContext or None')
248
249 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200250 if sock is not None:
251 raise ValueError(
252 'path and sock can not be specified at the same time')
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
255
256 try:
257 sock.bind(path)
258 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100259 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500260 if exc.errno == errno.EADDRINUSE:
261 # Let's improve the error message by adding
262 # with what exact address it occurs.
263 msg = 'Address {!r} is already in use'.format(path)
264 raise OSError(errno.EADDRINUSE, msg) from None
265 else:
266 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200267 except:
268 sock.close()
269 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500270 else:
271 if sock is None:
272 raise ValueError(
273 'path was not specified, and no sock specified')
274
275 if sock.family != socket.AF_UNIX:
276 raise ValueError(
277 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
278
279 server = base_events.Server(self, [sock])
280 sock.listen(backlog)
281 sock.setblocking(False)
282 self._start_serving(protocol_factory, sock, ssl, server)
283 return server
284
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200286if hasattr(os, 'set_blocking'):
287 def _set_nonblocking(fd):
288 os.set_blocking(fd, False)
289else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400290 import fcntl
291
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200292 def _set_nonblocking(fd):
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
294 flags = flags | os.O_NONBLOCK
295 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
297
298class _UnixReadPipeTransport(transports.ReadTransport):
299
Yury Selivanovdec1a452014-02-18 22:27:48 -0500300 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
303 super().__init__(extra)
304 self._extra['pipe'] = pipe
305 self._loop = loop
306 self._pipe = pipe
307 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700308 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800309 if not (stat.S_ISFIFO(mode) or
310 stat.S_ISSOCK(mode) or
311 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700312 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 _set_nonblocking(self._fileno)
314 self._protocol = protocol
315 self._closing = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100317 # only start reading when connection_made() has been called
318 self._loop.call_soon(self._loop.add_reader,
319 self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100321 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200322 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323
Victor Stinnere912e652014-07-12 03:11:53 +0200324 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100325 info = [self.__class__.__name__]
326 if self._pipe is None:
327 info.append('closed')
328 elif self._closing:
329 info.append('closing')
330 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200331 if self._pipe is not None:
332 polling = selector_events._test_selector_event(
333 self._loop._selector,
334 self._fileno, selectors.EVENT_READ)
335 if polling:
336 info.append('polling')
337 else:
338 info.append('idle')
339 else:
340 info.append('closed')
341 return '<%s>' % ' '.join(info)
342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 def _read_ready(self):
344 try:
345 data = os.read(self._fileno, self.max_size)
346 except (BlockingIOError, InterruptedError):
347 pass
348 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100349 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 else:
351 if data:
352 self._protocol.data_received(data)
353 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200354 if self._loop.get_debug():
355 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 self._closing = True
357 self._loop.remove_reader(self._fileno)
358 self._loop.call_soon(self._protocol.eof_received)
359 self._loop.call_soon(self._call_connection_lost, None)
360
Guido van Rossum57497ad2013-10-18 07:58:20 -0700361 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 self._loop.remove_reader(self._fileno)
363
Guido van Rossum57497ad2013-10-18 07:58:20 -0700364 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 self._loop.add_reader(self._fileno, self._read_ready)
366
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500367 def is_closing(self):
368 return self._closing
369
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 def close(self):
371 if not self._closing:
372 self._close(None)
373
Victor Stinner978a9af2015-01-29 17:50:58 +0100374 # On Python 3.3 and older, objects with a destructor part of a reference
375 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
376 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400377 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100378 def __del__(self):
379 if self._pipe is not None:
380 warnings.warn("unclosed transport %r" % self, ResourceWarning)
381 self._pipe.close()
382
Victor Stinner0ee29c22014-02-19 01:40:41 +0100383 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200385 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
386 if self._loop.get_debug():
387 logger.debug("%r: %s", self, message, exc_info=True)
388 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500389 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100390 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500391 'exception': exc,
392 'transport': self,
393 'protocol': self._protocol,
394 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 self._close(exc)
396
397 def _close(self, exc):
398 self._closing = True
399 self._loop.remove_reader(self._fileno)
400 self._loop.call_soon(self._call_connection_lost, exc)
401
402 def _call_connection_lost(self, exc):
403 try:
404 self._protocol.connection_lost(exc)
405 finally:
406 self._pipe.close()
407 self._pipe = None
408 self._protocol = None
409 self._loop = None
410
411
Yury Selivanov3cb99142014-02-18 18:41:13 -0500412class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800413 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
415 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100416 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 self._pipe = pipe
419 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700420 mode = os.fstat(self._fileno).st_mode
421 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100422 if not (is_socket or
423 stat.S_ISFIFO(mode) or
424 stat.S_ISCHR(mode)):
425 raise ValueError("Pipe transport is only for "
426 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 _set_nonblocking(self._fileno)
428 self._protocol = protocol
429 self._buffer = []
430 self._conn_lost = 0
431 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700432
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner29342622015-01-29 14:15:19 +0100434
435 # On AIX, the reader trick (to be notified when the read end of the
436 # socket is closed) only works for sockets. On other platforms it
437 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
438 if is_socket or not sys.platform.startswith("aix"):
439 # only start reading when connection_made() has been called
440 self._loop.call_soon(self._loop.add_reader,
441 self._fileno, self._read_ready)
442
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100444 # only wake up the waiter when connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200445 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
Victor Stinnere912e652014-07-12 03:11:53 +0200447 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100448 info = [self.__class__.__name__]
449 if self._pipe is None:
450 info.append('closed')
451 elif self._closing:
452 info.append('closing')
453 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200454 if self._pipe is not None:
455 polling = selector_events._test_selector_event(
456 self._loop._selector,
457 self._fileno, selectors.EVENT_WRITE)
458 if polling:
459 info.append('polling')
460 else:
461 info.append('idle')
462
463 bufsize = self.get_write_buffer_size()
464 info.append('bufsize=%s' % bufsize)
465 else:
466 info.append('closed')
467 return '<%s>' % ' '.join(info)
468
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800469 def get_write_buffer_size(self):
470 return sum(len(data) for data in self._buffer)
471
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700473 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200474 if self._loop.get_debug():
475 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100476 if self._buffer:
477 self._close(BrokenPipeError())
478 else:
479 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800482 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
483 if isinstance(data, bytearray):
484 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 if not data:
486 return
487
488 if self._conn_lost or self._closing:
489 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700490 logger.warning('pipe closed by peer or '
491 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 self._conn_lost += 1
493 return
494
495 if not self._buffer:
496 # Attempt to send it right away first.
497 try:
498 n = os.write(self._fileno, data)
499 except (BlockingIOError, InterruptedError):
500 n = 0
501 except Exception as exc:
502 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100503 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 return
505 if n == len(data):
506 return
507 elif n > 0:
508 data = data[n:]
509 self._loop.add_writer(self._fileno, self._write_ready)
510
511 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800512 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513
514 def _write_ready(self):
515 data = b''.join(self._buffer)
516 assert data, 'Data should not be empty'
517
518 self._buffer.clear()
519 try:
520 n = os.write(self._fileno, data)
521 except (BlockingIOError, InterruptedError):
522 self._buffer.append(data)
523 except Exception as exc:
524 self._conn_lost += 1
525 # Remove writer here, _fatal_error() doesn't it
526 # because _buffer is empty.
527 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100528 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 else:
530 if n == len(data):
531 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800532 self._maybe_resume_protocol() # May append to buffer.
533 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 self._loop.remove_reader(self._fileno)
535 self._call_connection_lost(None)
536 return
537 elif n > 0:
538 data = data[n:]
539
540 self._buffer.append(data) # Try again later.
541
542 def can_write_eof(self):
543 return True
544
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 def write_eof(self):
546 if self._closing:
547 return
548 assert self._pipe
549 self._closing = True
550 if not self._buffer:
551 self._loop.remove_reader(self._fileno)
552 self._loop.call_soon(self._call_connection_lost, None)
553
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500554 def is_closing(self):
555 return self._closing
556
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100558 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 # write_eof is all what we needed to close the write pipe
560 self.write_eof()
561
Victor Stinner978a9af2015-01-29 17:50:58 +0100562 # On Python 3.3 and older, objects with a destructor part of a reference
563 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
564 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400565 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100566 def __del__(self):
567 if self._pipe is not None:
568 warnings.warn("unclosed transport %r" % self, ResourceWarning)
569 self._pipe.close()
570
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 def abort(self):
572 self._close(None)
573
Victor Stinner0ee29c22014-02-19 01:40:41 +0100574 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200576 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
577 if self._loop.get_debug():
578 logger.debug("%r: %s", self, message, exc_info=True)
579 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500580 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100581 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500582 'exception': exc,
583 'transport': self,
584 'protocol': self._protocol,
585 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586 self._close(exc)
587
588 def _close(self, exc=None):
589 self._closing = True
590 if self._buffer:
591 self._loop.remove_writer(self._fileno)
592 self._buffer.clear()
593 self._loop.remove_reader(self._fileno)
594 self._loop.call_soon(self._call_connection_lost, exc)
595
596 def _call_connection_lost(self, exc):
597 try:
598 self._protocol.connection_lost(exc)
599 finally:
600 self._pipe.close()
601 self._pipe = None
602 self._protocol = None
603 self._loop = None
604
605
Victor Stinner1e40f102014-12-11 23:30:17 +0100606if hasattr(os, 'set_inheritable'):
607 # Python 3.4 and newer
608 _set_inheritable = os.set_inheritable
609else:
610 import fcntl
611
612 def _set_inheritable(fd, inheritable):
613 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
614
615 old = fcntl.fcntl(fd, fcntl.F_GETFD)
616 if not inheritable:
617 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
618 else:
619 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
620
621
Guido van Rossum59691282013-10-30 14:52:03 -0700622class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623
Guido van Rossum59691282013-10-30 14:52:03 -0700624 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700625 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700627 # Use a socket pair for stdin, since not all platforms
628 # support selecting read events on the write end of a
629 # socket (which we use in order to detect closing of the
630 # other end). Notably this is needed on AIX, and works
631 # just fine on other platforms.
632 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100633
634 # Mark the write end of the stdin pipe as non-inheritable,
635 # needed by close_fds=False on Python 3.3 and older
636 # (Python 3.4 implements the PEP 446, socketpair returns
637 # non-inheritable sockets)
638 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 self._proc = subprocess.Popen(
640 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
641 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700642 if stdin_w is not None:
643 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200644 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800645
646
647class AbstractChildWatcher:
648 """Abstract base class for monitoring child processes.
649
650 Objects derived from this class monitor a collection of subprocesses and
651 report their termination or interruption by a signal.
652
653 New callbacks are registered with .add_child_handler(). Starting a new
654 process must be done within a 'with' block to allow the watcher to suspend
655 its activity until the new process if fully registered (this is needed to
656 prevent a race condition in some implementations).
657
658 Example:
659 with watcher:
660 proc = subprocess.Popen("sleep 1")
661 watcher.add_child_handler(proc.pid, callback)
662
663 Notes:
664 Implementations of this class must be thread-safe.
665
666 Since child watcher objects may catch the SIGCHLD signal and call
667 waitpid(-1), there should be only one active object per process.
668 """
669
670 def add_child_handler(self, pid, callback, *args):
671 """Register a new child handler.
672
673 Arrange for callback(pid, returncode, *args) to be called when
674 process 'pid' terminates. Specifying another callback for the same
675 process replaces the previous handler.
676
Victor Stinneracdb7822014-07-14 18:33:40 +0200677 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800678 """
679 raise NotImplementedError()
680
681 def remove_child_handler(self, pid):
682 """Removes the handler for process 'pid'.
683
684 The function returns True if the handler was successfully removed,
685 False if there was nothing to remove."""
686
687 raise NotImplementedError()
688
Guido van Rossum2bcae702013-11-13 15:50:08 -0800689 def attach_loop(self, loop):
690 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800691
Guido van Rossum2bcae702013-11-13 15:50:08 -0800692 If the watcher was previously attached to an event loop, then it is
693 first detached before attaching to the new loop.
694
695 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800696 """
697 raise NotImplementedError()
698
699 def close(self):
700 """Close the watcher.
701
702 This must be called to make sure that any underlying resource is freed.
703 """
704 raise NotImplementedError()
705
706 def __enter__(self):
707 """Enter the watcher's context and allow starting new processes
708
709 This function must return self"""
710 raise NotImplementedError()
711
712 def __exit__(self, a, b, c):
713 """Exit the watcher's context"""
714 raise NotImplementedError()
715
716
717class BaseChildWatcher(AbstractChildWatcher):
718
Guido van Rossum2bcae702013-11-13 15:50:08 -0800719 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800720 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800721
722 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800724
725 def _do_waitpid(self, expected_pid):
726 raise NotImplementedError()
727
728 def _do_waitpid_all(self):
729 raise NotImplementedError()
730
Guido van Rossum2bcae702013-11-13 15:50:08 -0800731 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800732 assert loop is None or isinstance(loop, events.AbstractEventLoop)
733
734 if self._loop is not None:
735 self._loop.remove_signal_handler(signal.SIGCHLD)
736
737 self._loop = loop
738 if loop is not None:
739 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
740
741 # Prevent a race condition in case a child terminated
742 # during the switch.
743 self._do_waitpid_all()
744
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800745 def _sig_chld(self):
746 try:
747 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500748 except Exception as exc:
749 # self._loop should always be available here
750 # as '_sig_chld' is added as a signal handler
751 # in 'attach_loop'
752 self._loop.call_exception_handler({
753 'message': 'Unknown exception in SIGCHLD handler',
754 'exception': exc,
755 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800756
757 def _compute_returncode(self, status):
758 if os.WIFSIGNALED(status):
759 # The child process died because of a signal.
760 return -os.WTERMSIG(status)
761 elif os.WIFEXITED(status):
762 # The child process exited (e.g sys.exit()).
763 return os.WEXITSTATUS(status)
764 else:
765 # The child exited, but we don't understand its status.
766 # This shouldn't happen, but if it does, let's just
767 # return that status; perhaps that helps debug it.
768 return status
769
770
771class SafeChildWatcher(BaseChildWatcher):
772 """'Safe' child watcher implementation.
773
774 This implementation avoids disrupting other code spawning processes by
775 polling explicitly each process in the SIGCHLD handler instead of calling
776 os.waitpid(-1).
777
778 This is a safe solution but it has a significant overhead when handling a
779 big number of children (O(n) each time SIGCHLD is raised)
780 """
781
Guido van Rossum2bcae702013-11-13 15:50:08 -0800782 def __init__(self):
783 super().__init__()
784 self._callbacks = {}
785
786 def close(self):
787 self._callbacks.clear()
788 super().close()
789
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800790 def __enter__(self):
791 return self
792
793 def __exit__(self, a, b, c):
794 pass
795
796 def add_child_handler(self, pid, callback, *args):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100797 self._callbacks[pid] = (callback, args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800798
799 # Prevent a race condition in case the child is already terminated.
800 self._do_waitpid(pid)
801
Guido van Rossum2bcae702013-11-13 15:50:08 -0800802 def remove_child_handler(self, pid):
803 try:
804 del self._callbacks[pid]
805 return True
806 except KeyError:
807 return False
808
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809 def _do_waitpid_all(self):
810
811 for pid in list(self._callbacks):
812 self._do_waitpid(pid)
813
814 def _do_waitpid(self, expected_pid):
815 assert expected_pid > 0
816
817 try:
818 pid, status = os.waitpid(expected_pid, os.WNOHANG)
819 except ChildProcessError:
820 # The child process is already reaped
821 # (may happen if waitpid() is called elsewhere).
822 pid = expected_pid
823 returncode = 255
824 logger.warning(
825 "Unknown child process pid %d, will report returncode 255",
826 pid)
827 else:
828 if pid == 0:
829 # The child process is still alive.
830 return
831
832 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200833 if self._loop.get_debug():
834 logger.debug('process %s exited with returncode %s',
835 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800836
837 try:
838 callback, args = self._callbacks.pop(pid)
839 except KeyError: # pragma: no cover
840 # May happen if .remove_child_handler() is called
841 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200842 if self._loop.get_debug():
843 logger.warning("Child watcher got an unexpected pid: %r",
844 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800845 else:
846 callback(pid, returncode, *args)
847
848
849class FastChildWatcher(BaseChildWatcher):
850 """'Fast' child watcher implementation.
851
852 This implementation reaps every terminated processes by calling
853 os.waitpid(-1) directly, possibly breaking other code spawning processes
854 and waiting for their termination.
855
856 There is no noticeable overhead when handling a big number of children
857 (O(1) each time a child terminates).
858 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800859 def __init__(self):
860 super().__init__()
861 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862 self._lock = threading.Lock()
863 self._zombies = {}
864 self._forks = 0
865
866 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800867 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800869 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800870
871 def __enter__(self):
872 with self._lock:
873 self._forks += 1
874
875 return self
876
877 def __exit__(self, a, b, c):
878 with self._lock:
879 self._forks -= 1
880
881 if self._forks or not self._zombies:
882 return
883
884 collateral_victims = str(self._zombies)
885 self._zombies.clear()
886
887 logger.warning(
888 "Caught subprocesses termination from unknown pids: %s",
889 collateral_victims)
890
891 def add_child_handler(self, pid, callback, *args):
892 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800893 with self._lock:
894 try:
895 returncode = self._zombies.pop(pid)
896 except KeyError:
897 # The child is running.
898 self._callbacks[pid] = callback, args
899 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800901 # The child is dead already. We can fire the callback.
902 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903
Guido van Rossum2bcae702013-11-13 15:50:08 -0800904 def remove_child_handler(self, pid):
905 try:
906 del self._callbacks[pid]
907 return True
908 except KeyError:
909 return False
910
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911 def _do_waitpid_all(self):
912 # Because of signal coalescing, we must keep calling waitpid() as
913 # long as we're able to reap a child.
914 while True:
915 try:
916 pid, status = os.waitpid(-1, os.WNOHANG)
917 except ChildProcessError:
918 # No more child processes exist.
919 return
920 else:
921 if pid == 0:
922 # A child process is still alive.
923 return
924
925 returncode = self._compute_returncode(status)
926
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800927 with self._lock:
928 try:
929 callback, args = self._callbacks.pop(pid)
930 except KeyError:
931 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800932 if self._forks:
933 # It may not be registered yet.
934 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200935 if self._loop.get_debug():
936 logger.debug('unknown process %s exited '
937 'with returncode %s',
938 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800939 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800940 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200941 else:
942 if self._loop.get_debug():
943 logger.debug('process %s exited with returncode %s',
944 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800945
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800946 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800947 logger.warning(
948 "Caught subprocess termination from unknown pid: "
949 "%d -> %d", pid, returncode)
950 else:
951 callback(pid, returncode, *args)
952
953
954class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100955 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800956 _loop_factory = _UnixSelectorEventLoop
957
958 def __init__(self):
959 super().__init__()
960 self._watcher = None
961
962 def _init_watcher(self):
963 with events._lock:
964 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800965 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800966 if isinstance(threading.current_thread(),
967 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800968 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800969
970 def set_event_loop(self, loop):
971 """Set the event loop.
972
973 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800974 .set_event_loop() from the main thread will call .attach_loop(loop) on
975 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800976 """
977
978 super().set_event_loop(loop)
979
980 if self._watcher is not None and \
981 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800982 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800983
984 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200985 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800986
987 If not yet set, a SafeChildWatcher object is automatically created.
988 """
989 if self._watcher is None:
990 self._init_watcher()
991
992 return self._watcher
993
994 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200995 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800996
997 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
998
999 if self._watcher is not None:
1000 self._watcher.close()
1001
1002 self._watcher = watcher
1003
1004SelectorEventLoop = _UnixSelectorEventLoop
1005DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy