blob: 91e43cfc97c22b75004c445524b4f9f38da88c76 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010072 raise TypeError("coroutines cannot be used "
73 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010075 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 try:
77 # set_wakeup_fd() raises ValueError if this is not the
78 # main thread. By calling it early we ensure that an
79 # event loop running in another thread cannot add a signal
80 # handler.
81 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020082 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 raise RuntimeError(str(exc))
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._signal_handlers[sig] = handle
87
88 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020089 # Register a dummy signal handler to ask Python to write the signal
90 # number in the wakup file descriptor. _process_self_data() will
91 # read signal numbers from this file descriptor to handle signals.
92 signal.signal(sig, _sighandler_noop)
93
Charles-François Natali74e7cf32013-12-05 22:47:19 +010094 # Set SA_RESTART to limit EINTR occurrences.
95 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 except OSError as exc:
97 del self._signal_handlers[sig]
98 if not self._signal_handlers:
99 try:
100 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200101 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700102 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 if exc.errno == errno.EINVAL:
105 raise RuntimeError('sig {} cannot be caught'.format(sig))
106 else:
107 raise
108
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 """Internal helper that is the actual signal handler."""
111 handle = self._signal_handlers.get(sig)
112 if handle is None:
113 return # Assume it's some race condition.
114 if handle._cancelled:
115 self.remove_signal_handler(sig) # Remove it properly.
116 else:
117 self._add_callback_signalsafe(handle)
118
119 def remove_signal_handler(self, sig):
120 """Remove a handler for a signal. UNIX only.
121
122 Return True if a signal handler was removed, False if not.
123 """
124 self._check_signal(sig)
125 try:
126 del self._signal_handlers[sig]
127 except KeyError:
128 return False
129
130 if sig == signal.SIGINT:
131 handler = signal.default_int_handler
132 else:
133 handler = signal.SIG_DFL
134
135 try:
136 signal.signal(sig, handler)
137 except OSError as exc:
138 if exc.errno == errno.EINVAL:
139 raise RuntimeError('sig {} cannot be caught'.format(sig))
140 else:
141 raise
142
143 if not self._signal_handlers:
144 try:
145 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200146 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700147 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 return True
150
151 def _check_signal(self, sig):
152 """Internal helper to validate a signal.
153
154 Raise ValueError if the signal number is invalid or uncatchable.
155 Raise RuntimeError if there is a problem setting up the handler.
156 """
157 if not isinstance(sig, int):
158 raise TypeError('sig must be an int, not {!r}'.format(sig))
159
160 if not (1 <= sig < signal.NSIG):
161 raise ValueError(
162 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
163
164 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
165 extra=None):
166 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
167
168 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
169 extra=None):
170 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
171
Victor Stinnerf951d282014-06-29 00:46:45 +0200172 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 def _make_subprocess_transport(self, protocol, args, shell,
174 stdin, stdout, stderr, bufsize,
175 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 with events.get_child_watcher() as watcher:
177 transp = _UnixSubprocessTransport(self, protocol, args, shell,
178 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800179 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800180 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 watcher.add_child_handler(transp.get_pid(),
182 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800183
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 return transp
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 def _child_watcher_callback(self, pid, returncode, transp):
187 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188
Victor Stinnerf951d282014-06-29 00:46:45 +0200189 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500190 def create_unix_connection(self, protocol_factory, path, *,
191 ssl=None, sock=None,
192 server_hostname=None):
193 assert server_hostname is None or isinstance(server_hostname, str)
194 if ssl:
195 if server_hostname is None:
196 raise ValueError(
197 'you have to pass server_hostname when using ssl')
198 else:
199 if server_hostname is not None:
200 raise ValueError('server_hostname is only meaningful with ssl')
201
202 if path is not None:
203 if sock is not None:
204 raise ValueError(
205 'path and sock can not be specified at the same time')
206
Victor Stinner79a29522014-02-19 01:45:59 +0100207 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 sock.setblocking(False)
210 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100211 except:
212 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 raise
214
215 else:
216 if sock is None:
217 raise ValueError('no path and sock were specified')
218 sock.setblocking(False)
219
220 transport, protocol = yield from self._create_connection_transport(
221 sock, protocol_factory, ssl, server_hostname)
222 return transport, protocol
223
Victor Stinnerf951d282014-06-29 00:46:45 +0200224 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500225 def create_unix_server(self, protocol_factory, path=None, *,
226 sock=None, backlog=100, ssl=None):
227 if isinstance(ssl, bool):
228 raise TypeError('ssl argument must be an SSLContext or None')
229
230 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200231 if sock is not None:
232 raise ValueError(
233 'path and sock can not be specified at the same time')
234
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
236
237 try:
238 sock.bind(path)
239 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100240 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500241 if exc.errno == errno.EADDRINUSE:
242 # Let's improve the error message by adding
243 # with what exact address it occurs.
244 msg = 'Address {!r} is already in use'.format(path)
245 raise OSError(errno.EADDRINUSE, msg) from None
246 else:
247 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200248 except:
249 sock.close()
250 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500251 else:
252 if sock is None:
253 raise ValueError(
254 'path was not specified, and no sock specified')
255
256 if sock.family != socket.AF_UNIX:
257 raise ValueError(
258 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
259
260 server = base_events.Server(self, [sock])
261 sock.listen(backlog)
262 sock.setblocking(False)
263 self._start_serving(protocol_factory, sock, ssl, server)
264 return server
265
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200267if hasattr(os, 'set_blocking'):
268 def _set_nonblocking(fd):
269 os.set_blocking(fd, False)
270else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400271 import fcntl
272
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200273 def _set_nonblocking(fd):
274 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
275 flags = flags | os.O_NONBLOCK
276 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277
278
279class _UnixReadPipeTransport(transports.ReadTransport):
280
Yury Selivanovdec1a452014-02-18 22:27:48 -0500281 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
284 super().__init__(extra)
285 self._extra['pipe'] = pipe
286 self._loop = loop
287 self._pipe = pipe
288 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700289 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800290 if not (stat.S_ISFIFO(mode) or
291 stat.S_ISSOCK(mode) or
292 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700293 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 _set_nonblocking(self._fileno)
295 self._protocol = protocol
296 self._closing = False
297 self._loop.add_reader(self._fileno, self._read_ready)
298 self._loop.call_soon(self._protocol.connection_made, self)
299 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200300 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200301 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
Victor Stinnere912e652014-07-12 03:11:53 +0200303 def __repr__(self):
304 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
305 if self._pipe is not None:
306 polling = selector_events._test_selector_event(
307 self._loop._selector,
308 self._fileno, selectors.EVENT_READ)
309 if polling:
310 info.append('polling')
311 else:
312 info.append('idle')
313 else:
314 info.append('closed')
315 return '<%s>' % ' '.join(info)
316
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 def _read_ready(self):
318 try:
319 data = os.read(self._fileno, self.max_size)
320 except (BlockingIOError, InterruptedError):
321 pass
322 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100323 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 else:
325 if data:
326 self._protocol.data_received(data)
327 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200328 if self._loop.get_debug():
329 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 self._closing = True
331 self._loop.remove_reader(self._fileno)
332 self._loop.call_soon(self._protocol.eof_received)
333 self._loop.call_soon(self._call_connection_lost, None)
334
Guido van Rossum57497ad2013-10-18 07:58:20 -0700335 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 self._loop.remove_reader(self._fileno)
337
Guido van Rossum57497ad2013-10-18 07:58:20 -0700338 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 self._loop.add_reader(self._fileno, self._read_ready)
340
341 def close(self):
342 if not self._closing:
343 self._close(None)
344
Victor Stinner0ee29c22014-02-19 01:40:41 +0100345 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200347 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
348 if self._loop.get_debug():
349 logger.debug("%r: %s", self, message, exc_info=True)
350 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500351 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100352 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500353 'exception': exc,
354 'transport': self,
355 'protocol': self._protocol,
356 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._close(exc)
358
359 def _close(self, exc):
360 self._closing = True
361 self._loop.remove_reader(self._fileno)
362 self._loop.call_soon(self._call_connection_lost, exc)
363
364 def _call_connection_lost(self, exc):
365 try:
366 self._protocol.connection_lost(exc)
367 finally:
368 self._pipe.close()
369 self._pipe = None
370 self._protocol = None
371 self._loop = None
372
373
Yury Selivanov3cb99142014-02-18 18:41:13 -0500374class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800375 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
377 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100378 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 self._pipe = pipe
381 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700382 mode = os.fstat(self._fileno).st_mode
383 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100384 if not (is_socket or
385 stat.S_ISFIFO(mode) or
386 stat.S_ISCHR(mode)):
387 raise ValueError("Pipe transport is only for "
388 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 _set_nonblocking(self._fileno)
390 self._protocol = protocol
391 self._buffer = []
392 self._conn_lost = 0
393 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700394
395 # On AIX, the reader trick only works for sockets.
396 # On other platforms it works for pipes and sockets.
397 # (Exception: OS X 10.4? Issue #19294.)
398 if is_socket or not sys.platform.startswith("aix"):
399 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400
401 self._loop.call_soon(self._protocol.connection_made, self)
402 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200403 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200404 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
Victor Stinnere912e652014-07-12 03:11:53 +0200406 def __repr__(self):
407 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
408 if self._pipe is not None:
409 polling = selector_events._test_selector_event(
410 self._loop._selector,
411 self._fileno, selectors.EVENT_WRITE)
412 if polling:
413 info.append('polling')
414 else:
415 info.append('idle')
416
417 bufsize = self.get_write_buffer_size()
418 info.append('bufsize=%s' % bufsize)
419 else:
420 info.append('closed')
421 return '<%s>' % ' '.join(info)
422
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800423 def get_write_buffer_size(self):
424 return sum(len(data) for data in self._buffer)
425
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700427 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200428 if self._loop.get_debug():
429 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100430 if self._buffer:
431 self._close(BrokenPipeError())
432 else:
433 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
435 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800436 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
437 if isinstance(data, bytearray):
438 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 if not data:
440 return
441
442 if self._conn_lost or self._closing:
443 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700444 logger.warning('pipe closed by peer or '
445 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 self._conn_lost += 1
447 return
448
449 if not self._buffer:
450 # Attempt to send it right away first.
451 try:
452 n = os.write(self._fileno, data)
453 except (BlockingIOError, InterruptedError):
454 n = 0
455 except Exception as exc:
456 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100457 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 return
459 if n == len(data):
460 return
461 elif n > 0:
462 data = data[n:]
463 self._loop.add_writer(self._fileno, self._write_ready)
464
465 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800466 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467
468 def _write_ready(self):
469 data = b''.join(self._buffer)
470 assert data, 'Data should not be empty'
471
472 self._buffer.clear()
473 try:
474 n = os.write(self._fileno, data)
475 except (BlockingIOError, InterruptedError):
476 self._buffer.append(data)
477 except Exception as exc:
478 self._conn_lost += 1
479 # Remove writer here, _fatal_error() doesn't it
480 # because _buffer is empty.
481 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100482 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 else:
484 if n == len(data):
485 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800486 self._maybe_resume_protocol() # May append to buffer.
487 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 self._loop.remove_reader(self._fileno)
489 self._call_connection_lost(None)
490 return
491 elif n > 0:
492 data = data[n:]
493
494 self._buffer.append(data) # Try again later.
495
496 def can_write_eof(self):
497 return True
498
499 # TODO: Make the relationships between write_eof(), close(),
500 # abort(), _fatal_error() and _close() more straightforward.
501
502 def write_eof(self):
503 if self._closing:
504 return
505 assert self._pipe
506 self._closing = True
507 if not self._buffer:
508 self._loop.remove_reader(self._fileno)
509 self._loop.call_soon(self._call_connection_lost, None)
510
511 def close(self):
512 if not self._closing:
513 # write_eof is all what we needed to close the write pipe
514 self.write_eof()
515
516 def abort(self):
517 self._close(None)
518
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200521 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
522 if self._loop.get_debug():
523 logger.debug("%r: %s", self, message, exc_info=True)
524 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500527 'exception': exc,
528 'transport': self,
529 'protocol': self._protocol,
530 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 self._close(exc)
532
533 def _close(self, exc=None):
534 self._closing = True
535 if self._buffer:
536 self._loop.remove_writer(self._fileno)
537 self._buffer.clear()
538 self._loop.remove_reader(self._fileno)
539 self._loop.call_soon(self._call_connection_lost, exc)
540
541 def _call_connection_lost(self, exc):
542 try:
543 self._protocol.connection_lost(exc)
544 finally:
545 self._pipe.close()
546 self._pipe = None
547 self._protocol = None
548 self._loop = None
549
550
Victor Stinner1e40f102014-12-11 23:30:17 +0100551if hasattr(os, 'set_inheritable'):
552 # Python 3.4 and newer
553 _set_inheritable = os.set_inheritable
554else:
555 import fcntl
556
557 def _set_inheritable(fd, inheritable):
558 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
559
560 old = fcntl.fcntl(fd, fcntl.F_GETFD)
561 if not inheritable:
562 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
563 else:
564 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
565
566
Guido van Rossum59691282013-10-30 14:52:03 -0700567class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568
Guido van Rossum59691282013-10-30 14:52:03 -0700569 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700570 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700572 # Use a socket pair for stdin, since not all platforms
573 # support selecting read events on the write end of a
574 # socket (which we use in order to detect closing of the
575 # other end). Notably this is needed on AIX, and works
576 # just fine on other platforms.
577 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100578
579 # Mark the write end of the stdin pipe as non-inheritable,
580 # needed by close_fds=False on Python 3.3 and older
581 # (Python 3.4 implements the PEP 446, socketpair returns
582 # non-inheritable sockets)
583 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 self._proc = subprocess.Popen(
585 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
586 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700587 if stdin_w is not None:
588 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200589 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800590
591
592class AbstractChildWatcher:
593 """Abstract base class for monitoring child processes.
594
595 Objects derived from this class monitor a collection of subprocesses and
596 report their termination or interruption by a signal.
597
598 New callbacks are registered with .add_child_handler(). Starting a new
599 process must be done within a 'with' block to allow the watcher to suspend
600 its activity until the new process if fully registered (this is needed to
601 prevent a race condition in some implementations).
602
603 Example:
604 with watcher:
605 proc = subprocess.Popen("sleep 1")
606 watcher.add_child_handler(proc.pid, callback)
607
608 Notes:
609 Implementations of this class must be thread-safe.
610
611 Since child watcher objects may catch the SIGCHLD signal and call
612 waitpid(-1), there should be only one active object per process.
613 """
614
615 def add_child_handler(self, pid, callback, *args):
616 """Register a new child handler.
617
618 Arrange for callback(pid, returncode, *args) to be called when
619 process 'pid' terminates. Specifying another callback for the same
620 process replaces the previous handler.
621
Victor Stinneracdb7822014-07-14 18:33:40 +0200622 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800623 """
624 raise NotImplementedError()
625
626 def remove_child_handler(self, pid):
627 """Removes the handler for process 'pid'.
628
629 The function returns True if the handler was successfully removed,
630 False if there was nothing to remove."""
631
632 raise NotImplementedError()
633
Guido van Rossum2bcae702013-11-13 15:50:08 -0800634 def attach_loop(self, loop):
635 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800636
Guido van Rossum2bcae702013-11-13 15:50:08 -0800637 If the watcher was previously attached to an event loop, then it is
638 first detached before attaching to the new loop.
639
640 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641 """
642 raise NotImplementedError()
643
644 def close(self):
645 """Close the watcher.
646
647 This must be called to make sure that any underlying resource is freed.
648 """
649 raise NotImplementedError()
650
651 def __enter__(self):
652 """Enter the watcher's context and allow starting new processes
653
654 This function must return self"""
655 raise NotImplementedError()
656
657 def __exit__(self, a, b, c):
658 """Exit the watcher's context"""
659 raise NotImplementedError()
660
661
662class BaseChildWatcher(AbstractChildWatcher):
663
Guido van Rossum2bcae702013-11-13 15:50:08 -0800664 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800665 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666
667 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800668 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800669
670 def _do_waitpid(self, expected_pid):
671 raise NotImplementedError()
672
673 def _do_waitpid_all(self):
674 raise NotImplementedError()
675
Guido van Rossum2bcae702013-11-13 15:50:08 -0800676 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800677 assert loop is None or isinstance(loop, events.AbstractEventLoop)
678
679 if self._loop is not None:
680 self._loop.remove_signal_handler(signal.SIGCHLD)
681
682 self._loop = loop
683 if loop is not None:
684 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
685
686 # Prevent a race condition in case a child terminated
687 # during the switch.
688 self._do_waitpid_all()
689
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800690 def _sig_chld(self):
691 try:
692 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500693 except Exception as exc:
694 # self._loop should always be available here
695 # as '_sig_chld' is added as a signal handler
696 # in 'attach_loop'
697 self._loop.call_exception_handler({
698 'message': 'Unknown exception in SIGCHLD handler',
699 'exception': exc,
700 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701
702 def _compute_returncode(self, status):
703 if os.WIFSIGNALED(status):
704 # The child process died because of a signal.
705 return -os.WTERMSIG(status)
706 elif os.WIFEXITED(status):
707 # The child process exited (e.g sys.exit()).
708 return os.WEXITSTATUS(status)
709 else:
710 # The child exited, but we don't understand its status.
711 # This shouldn't happen, but if it does, let's just
712 # return that status; perhaps that helps debug it.
713 return status
714
715
716class SafeChildWatcher(BaseChildWatcher):
717 """'Safe' child watcher implementation.
718
719 This implementation avoids disrupting other code spawning processes by
720 polling explicitly each process in the SIGCHLD handler instead of calling
721 os.waitpid(-1).
722
723 This is a safe solution but it has a significant overhead when handling a
724 big number of children (O(n) each time SIGCHLD is raised)
725 """
726
Guido van Rossum2bcae702013-11-13 15:50:08 -0800727 def __init__(self):
728 super().__init__()
729 self._callbacks = {}
730
731 def close(self):
732 self._callbacks.clear()
733 super().close()
734
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800735 def __enter__(self):
736 return self
737
738 def __exit__(self, a, b, c):
739 pass
740
741 def add_child_handler(self, pid, callback, *args):
742 self._callbacks[pid] = callback, args
743
744 # Prevent a race condition in case the child is already terminated.
745 self._do_waitpid(pid)
746
Guido van Rossum2bcae702013-11-13 15:50:08 -0800747 def remove_child_handler(self, pid):
748 try:
749 del self._callbacks[pid]
750 return True
751 except KeyError:
752 return False
753
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800754 def _do_waitpid_all(self):
755
756 for pid in list(self._callbacks):
757 self._do_waitpid(pid)
758
759 def _do_waitpid(self, expected_pid):
760 assert expected_pid > 0
761
762 try:
763 pid, status = os.waitpid(expected_pid, os.WNOHANG)
764 except ChildProcessError:
765 # The child process is already reaped
766 # (may happen if waitpid() is called elsewhere).
767 pid = expected_pid
768 returncode = 255
769 logger.warning(
770 "Unknown child process pid %d, will report returncode 255",
771 pid)
772 else:
773 if pid == 0:
774 # The child process is still alive.
775 return
776
777 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200778 if self._loop.get_debug():
779 logger.debug('process %s exited with returncode %s',
780 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800781
782 try:
783 callback, args = self._callbacks.pop(pid)
784 except KeyError: # pragma: no cover
785 # May happen if .remove_child_handler() is called
786 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200787 if self._loop.get_debug():
788 logger.warning("Child watcher got an unexpected pid: %r",
789 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800790 else:
791 callback(pid, returncode, *args)
792
793
794class FastChildWatcher(BaseChildWatcher):
795 """'Fast' child watcher implementation.
796
797 This implementation reaps every terminated processes by calling
798 os.waitpid(-1) directly, possibly breaking other code spawning processes
799 and waiting for their termination.
800
801 There is no noticeable overhead when handling a big number of children
802 (O(1) each time a child terminates).
803 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800804 def __init__(self):
805 super().__init__()
806 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800807 self._lock = threading.Lock()
808 self._zombies = {}
809 self._forks = 0
810
811 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800812 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800813 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800814 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800815
816 def __enter__(self):
817 with self._lock:
818 self._forks += 1
819
820 return self
821
822 def __exit__(self, a, b, c):
823 with self._lock:
824 self._forks -= 1
825
826 if self._forks or not self._zombies:
827 return
828
829 collateral_victims = str(self._zombies)
830 self._zombies.clear()
831
832 logger.warning(
833 "Caught subprocesses termination from unknown pids: %s",
834 collateral_victims)
835
836 def add_child_handler(self, pid, callback, *args):
837 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800838 with self._lock:
839 try:
840 returncode = self._zombies.pop(pid)
841 except KeyError:
842 # The child is running.
843 self._callbacks[pid] = callback, args
844 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800845
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800846 # The child is dead already. We can fire the callback.
847 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848
Guido van Rossum2bcae702013-11-13 15:50:08 -0800849 def remove_child_handler(self, pid):
850 try:
851 del self._callbacks[pid]
852 return True
853 except KeyError:
854 return False
855
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856 def _do_waitpid_all(self):
857 # Because of signal coalescing, we must keep calling waitpid() as
858 # long as we're able to reap a child.
859 while True:
860 try:
861 pid, status = os.waitpid(-1, os.WNOHANG)
862 except ChildProcessError:
863 # No more child processes exist.
864 return
865 else:
866 if pid == 0:
867 # A child process is still alive.
868 return
869
870 returncode = self._compute_returncode(status)
871
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800872 with self._lock:
873 try:
874 callback, args = self._callbacks.pop(pid)
875 except KeyError:
876 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800877 if self._forks:
878 # It may not be registered yet.
879 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200880 if self._loop.get_debug():
881 logger.debug('unknown process %s exited '
882 'with returncode %s',
883 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800885 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200886 else:
887 if self._loop.get_debug():
888 logger.debug('process %s exited with returncode %s',
889 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800890
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800891 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892 logger.warning(
893 "Caught subprocess termination from unknown pid: "
894 "%d -> %d", pid, returncode)
895 else:
896 callback(pid, returncode, *args)
897
898
899class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
900 """XXX"""
901 _loop_factory = _UnixSelectorEventLoop
902
903 def __init__(self):
904 super().__init__()
905 self._watcher = None
906
907 def _init_watcher(self):
908 with events._lock:
909 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800910 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800911 if isinstance(threading.current_thread(),
912 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800913 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800914
915 def set_event_loop(self, loop):
916 """Set the event loop.
917
918 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800919 .set_event_loop() from the main thread will call .attach_loop(loop) on
920 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921 """
922
923 super().set_event_loop(loop)
924
925 if self._watcher is not None and \
926 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800927 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800928
929 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200930 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800931
932 If not yet set, a SafeChildWatcher object is automatically created.
933 """
934 if self._watcher is None:
935 self._init_watcher()
936
937 return self._watcher
938
939 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200940 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800941
942 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
943
944 if self._watcher is not None:
945 self._watcher.close()
946
947 self._watcher = watcher
948
949SelectorEventLoop = _UnixSelectorEventLoop
950DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy