blob: 37310cfde80ec314fc368022a43bdac87fb62981 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
70 self._check_signal(sig)
71 try:
72 # set_wakeup_fd() raises ValueError if this is not the
73 # main thread. By calling it early we ensure that an
74 # event loop running in another thread cannot add a signal
75 # handler.
76 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020077 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 raise RuntimeError(str(exc))
79
Yury Selivanov569efa22014-02-18 18:02:19 -050080 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 self._signal_handlers[sig] = handle
82
83 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020084 # Register a dummy signal handler to ask Python to write the signal
85 # number in the wakup file descriptor. _process_self_data() will
86 # read signal numbers from this file descriptor to handle signals.
87 signal.signal(sig, _sighandler_noop)
88
Charles-François Natali74e7cf32013-12-05 22:47:19 +010089 # Set SA_RESTART to limit EINTR occurrences.
90 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 except OSError as exc:
92 del self._signal_handlers[sig]
93 if not self._signal_handlers:
94 try:
95 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020096 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070097 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098
99 if exc.errno == errno.EINVAL:
100 raise RuntimeError('sig {} cannot be caught'.format(sig))
101 else:
102 raise
103
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200104 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 """Internal helper that is the actual signal handler."""
106 handle = self._signal_handlers.get(sig)
107 if handle is None:
108 return # Assume it's some race condition.
109 if handle._cancelled:
110 self.remove_signal_handler(sig) # Remove it properly.
111 else:
112 self._add_callback_signalsafe(handle)
113
114 def remove_signal_handler(self, sig):
115 """Remove a handler for a signal. UNIX only.
116
117 Return True if a signal handler was removed, False if not.
118 """
119 self._check_signal(sig)
120 try:
121 del self._signal_handlers[sig]
122 except KeyError:
123 return False
124
125 if sig == signal.SIGINT:
126 handler = signal.default_int_handler
127 else:
128 handler = signal.SIG_DFL
129
130 try:
131 signal.signal(sig, handler)
132 except OSError as exc:
133 if exc.errno == errno.EINVAL:
134 raise RuntimeError('sig {} cannot be caught'.format(sig))
135 else:
136 raise
137
138 if not self._signal_handlers:
139 try:
140 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200141 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700142 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 return True
145
146 def _check_signal(self, sig):
147 """Internal helper to validate a signal.
148
149 Raise ValueError if the signal number is invalid or uncatchable.
150 Raise RuntimeError if there is a problem setting up the handler.
151 """
152 if not isinstance(sig, int):
153 raise TypeError('sig must be an int, not {!r}'.format(sig))
154
155 if not (1 <= sig < signal.NSIG):
156 raise ValueError(
157 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
158
159 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
160 extra=None):
161 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
162
163 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
166
Victor Stinnerf951d282014-06-29 00:46:45 +0200167 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 def _make_subprocess_transport(self, protocol, args, shell,
169 stdin, stdout, stderr, bufsize,
170 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800171 with events.get_child_watcher() as watcher:
172 transp = _UnixSubprocessTransport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800174 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800175 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 watcher.add_child_handler(transp.get_pid(),
177 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800178
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 return transp
180
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 def _child_watcher_callback(self, pid, returncode, transp):
182 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Victor Stinnerf951d282014-06-29 00:46:45 +0200184 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500185 def create_unix_connection(self, protocol_factory, path, *,
186 ssl=None, sock=None,
187 server_hostname=None):
188 assert server_hostname is None or isinstance(server_hostname, str)
189 if ssl:
190 if server_hostname is None:
191 raise ValueError(
192 'you have to pass server_hostname when using ssl')
193 else:
194 if server_hostname is not None:
195 raise ValueError('server_hostname is only meaningful with ssl')
196
197 if path is not None:
198 if sock is not None:
199 raise ValueError(
200 'path and sock can not be specified at the same time')
201
Victor Stinner79a29522014-02-19 01:45:59 +0100202 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500204 sock.setblocking(False)
205 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100206 except:
207 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 raise
209
210 else:
211 if sock is None:
212 raise ValueError('no path and sock were specified')
213 sock.setblocking(False)
214
215 transport, protocol = yield from self._create_connection_transport(
216 sock, protocol_factory, ssl, server_hostname)
217 return transport, protocol
218
Victor Stinnerf951d282014-06-29 00:46:45 +0200219 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500220 def create_unix_server(self, protocol_factory, path=None, *,
221 sock=None, backlog=100, ssl=None):
222 if isinstance(ssl, bool):
223 raise TypeError('ssl argument must be an SSLContext or None')
224
225 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200226 if sock is not None:
227 raise ValueError(
228 'path and sock can not be specified at the same time')
229
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
231
232 try:
233 sock.bind(path)
234 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 if exc.errno == errno.EADDRINUSE:
237 # Let's improve the error message by adding
238 # with what exact address it occurs.
239 msg = 'Address {!r} is already in use'.format(path)
240 raise OSError(errno.EADDRINUSE, msg) from None
241 else:
242 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200243 except:
244 sock.close()
245 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500246 else:
247 if sock is None:
248 raise ValueError(
249 'path was not specified, and no sock specified')
250
251 if sock.family != socket.AF_UNIX:
252 raise ValueError(
253 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
254
255 server = base_events.Server(self, [sock])
256 sock.listen(backlog)
257 sock.setblocking(False)
258 self._start_serving(protocol_factory, sock, ssl, server)
259 return server
260
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200262if hasattr(os, 'set_blocking'):
263 def _set_nonblocking(fd):
264 os.set_blocking(fd, False)
265else:
266 def _set_nonblocking(fd):
267 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
268 flags = flags | os.O_NONBLOCK
269 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
271
272class _UnixReadPipeTransport(transports.ReadTransport):
273
Yury Selivanovdec1a452014-02-18 22:27:48 -0500274 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
277 super().__init__(extra)
278 self._extra['pipe'] = pipe
279 self._loop = loop
280 self._pipe = pipe
281 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700282 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800283 if not (stat.S_ISFIFO(mode) or
284 stat.S_ISSOCK(mode) or
285 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700286 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 _set_nonblocking(self._fileno)
288 self._protocol = protocol
289 self._closing = False
290 self._loop.add_reader(self._fileno, self._read_ready)
291 self._loop.call_soon(self._protocol.connection_made, self)
292 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200293 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200294 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
Victor Stinnere912e652014-07-12 03:11:53 +0200296 def __repr__(self):
297 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
298 if self._pipe is not None:
299 polling = selector_events._test_selector_event(
300 self._loop._selector,
301 self._fileno, selectors.EVENT_READ)
302 if polling:
303 info.append('polling')
304 else:
305 info.append('idle')
306 else:
307 info.append('closed')
308 return '<%s>' % ' '.join(info)
309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 def _read_ready(self):
311 try:
312 data = os.read(self._fileno, self.max_size)
313 except (BlockingIOError, InterruptedError):
314 pass
315 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100316 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 else:
318 if data:
319 self._protocol.data_received(data)
320 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200321 if self._loop.get_debug():
322 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 self._closing = True
324 self._loop.remove_reader(self._fileno)
325 self._loop.call_soon(self._protocol.eof_received)
326 self._loop.call_soon(self._call_connection_lost, None)
327
Guido van Rossum57497ad2013-10-18 07:58:20 -0700328 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._loop.remove_reader(self._fileno)
330
Guido van Rossum57497ad2013-10-18 07:58:20 -0700331 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 self._loop.add_reader(self._fileno, self._read_ready)
333
334 def close(self):
335 if not self._closing:
336 self._close(None)
337
Victor Stinner0ee29c22014-02-19 01:40:41 +0100338 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200340 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
341 if self._loop.get_debug():
342 logger.debug("%r: %s", self, message, exc_info=True)
343 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500344 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100345 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500346 'exception': exc,
347 'transport': self,
348 'protocol': self._protocol,
349 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 self._close(exc)
351
352 def _close(self, exc):
353 self._closing = True
354 self._loop.remove_reader(self._fileno)
355 self._loop.call_soon(self._call_connection_lost, exc)
356
357 def _call_connection_lost(self, exc):
358 try:
359 self._protocol.connection_lost(exc)
360 finally:
361 self._pipe.close()
362 self._pipe = None
363 self._protocol = None
364 self._loop = None
365
366
Yury Selivanov3cb99142014-02-18 18:41:13 -0500367class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800368 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
371 super().__init__(extra)
372 self._extra['pipe'] = pipe
373 self._loop = loop
374 self._pipe = pipe
375 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700376 mode = os.fstat(self._fileno).st_mode
377 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100378 if not (is_socket or
379 stat.S_ISFIFO(mode) or
380 stat.S_ISCHR(mode)):
381 raise ValueError("Pipe transport is only for "
382 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 _set_nonblocking(self._fileno)
384 self._protocol = protocol
385 self._buffer = []
386 self._conn_lost = 0
387 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700388
389 # On AIX, the reader trick only works for sockets.
390 # On other platforms it works for pipes and sockets.
391 # (Exception: OS X 10.4? Issue #19294.)
392 if is_socket or not sys.platform.startswith("aix"):
393 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395 self._loop.call_soon(self._protocol.connection_made, self)
396 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200397 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200398 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Victor Stinnere912e652014-07-12 03:11:53 +0200400 def __repr__(self):
401 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
402 if self._pipe is not None:
403 polling = selector_events._test_selector_event(
404 self._loop._selector,
405 self._fileno, selectors.EVENT_WRITE)
406 if polling:
407 info.append('polling')
408 else:
409 info.append('idle')
410
411 bufsize = self.get_write_buffer_size()
412 info.append('bufsize=%s' % bufsize)
413 else:
414 info.append('closed')
415 return '<%s>' % ' '.join(info)
416
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800417 def get_write_buffer_size(self):
418 return sum(len(data) for data in self._buffer)
419
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700421 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200422 if self._loop.get_debug():
423 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100424 if self._buffer:
425 self._close(BrokenPipeError())
426 else:
427 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
429 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800430 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
431 if isinstance(data, bytearray):
432 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 if not data:
434 return
435
436 if self._conn_lost or self._closing:
437 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700438 logger.warning('pipe closed by peer or '
439 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 self._conn_lost += 1
441 return
442
443 if not self._buffer:
444 # Attempt to send it right away first.
445 try:
446 n = os.write(self._fileno, data)
447 except (BlockingIOError, InterruptedError):
448 n = 0
449 except Exception as exc:
450 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100451 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 return
453 if n == len(data):
454 return
455 elif n > 0:
456 data = data[n:]
457 self._loop.add_writer(self._fileno, self._write_ready)
458
459 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800460 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 def _write_ready(self):
463 data = b''.join(self._buffer)
464 assert data, 'Data should not be empty'
465
466 self._buffer.clear()
467 try:
468 n = os.write(self._fileno, data)
469 except (BlockingIOError, InterruptedError):
470 self._buffer.append(data)
471 except Exception as exc:
472 self._conn_lost += 1
473 # Remove writer here, _fatal_error() doesn't it
474 # because _buffer is empty.
475 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100476 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 else:
478 if n == len(data):
479 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800480 self._maybe_resume_protocol() # May append to buffer.
481 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 self._loop.remove_reader(self._fileno)
483 self._call_connection_lost(None)
484 return
485 elif n > 0:
486 data = data[n:]
487
488 self._buffer.append(data) # Try again later.
489
490 def can_write_eof(self):
491 return True
492
493 # TODO: Make the relationships between write_eof(), close(),
494 # abort(), _fatal_error() and _close() more straightforward.
495
496 def write_eof(self):
497 if self._closing:
498 return
499 assert self._pipe
500 self._closing = True
501 if not self._buffer:
502 self._loop.remove_reader(self._fileno)
503 self._loop.call_soon(self._call_connection_lost, None)
504
505 def close(self):
506 if not self._closing:
507 # write_eof is all what we needed to close the write pipe
508 self.write_eof()
509
510 def abort(self):
511 self._close(None)
512
Victor Stinner0ee29c22014-02-19 01:40:41 +0100513 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200515 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
516 if self._loop.get_debug():
517 logger.debug("%r: %s", self, message, exc_info=True)
518 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500519 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100520 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500521 'exception': exc,
522 'transport': self,
523 'protocol': self._protocol,
524 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._close(exc)
526
527 def _close(self, exc=None):
528 self._closing = True
529 if self._buffer:
530 self._loop.remove_writer(self._fileno)
531 self._buffer.clear()
532 self._loop.remove_reader(self._fileno)
533 self._loop.call_soon(self._call_connection_lost, exc)
534
535 def _call_connection_lost(self, exc):
536 try:
537 self._protocol.connection_lost(exc)
538 finally:
539 self._pipe.close()
540 self._pipe = None
541 self._protocol = None
542 self._loop = None
543
544
Guido van Rossum59691282013-10-30 14:52:03 -0700545class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
Guido van Rossum59691282013-10-30 14:52:03 -0700547 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700548 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700550 # Use a socket pair for stdin, since not all platforms
551 # support selecting read events on the write end of a
552 # socket (which we use in order to detect closing of the
553 # other end). Notably this is needed on AIX, and works
554 # just fine on other platforms.
555 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._proc = subprocess.Popen(
557 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
558 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700559 if stdin_w is not None:
560 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200561 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800562
563
564class AbstractChildWatcher:
565 """Abstract base class for monitoring child processes.
566
567 Objects derived from this class monitor a collection of subprocesses and
568 report their termination or interruption by a signal.
569
570 New callbacks are registered with .add_child_handler(). Starting a new
571 process must be done within a 'with' block to allow the watcher to suspend
572 its activity until the new process if fully registered (this is needed to
573 prevent a race condition in some implementations).
574
575 Example:
576 with watcher:
577 proc = subprocess.Popen("sleep 1")
578 watcher.add_child_handler(proc.pid, callback)
579
580 Notes:
581 Implementations of this class must be thread-safe.
582
583 Since child watcher objects may catch the SIGCHLD signal and call
584 waitpid(-1), there should be only one active object per process.
585 """
586
587 def add_child_handler(self, pid, callback, *args):
588 """Register a new child handler.
589
590 Arrange for callback(pid, returncode, *args) to be called when
591 process 'pid' terminates. Specifying another callback for the same
592 process replaces the previous handler.
593
Victor Stinneracdb7822014-07-14 18:33:40 +0200594 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800595 """
596 raise NotImplementedError()
597
598 def remove_child_handler(self, pid):
599 """Removes the handler for process 'pid'.
600
601 The function returns True if the handler was successfully removed,
602 False if there was nothing to remove."""
603
604 raise NotImplementedError()
605
Guido van Rossum2bcae702013-11-13 15:50:08 -0800606 def attach_loop(self, loop):
607 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800608
Guido van Rossum2bcae702013-11-13 15:50:08 -0800609 If the watcher was previously attached to an event loop, then it is
610 first detached before attaching to the new loop.
611
612 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613 """
614 raise NotImplementedError()
615
616 def close(self):
617 """Close the watcher.
618
619 This must be called to make sure that any underlying resource is freed.
620 """
621 raise NotImplementedError()
622
623 def __enter__(self):
624 """Enter the watcher's context and allow starting new processes
625
626 This function must return self"""
627 raise NotImplementedError()
628
629 def __exit__(self, a, b, c):
630 """Exit the watcher's context"""
631 raise NotImplementedError()
632
633
634class BaseChildWatcher(AbstractChildWatcher):
635
Guido van Rossum2bcae702013-11-13 15:50:08 -0800636 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800637 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800638
639 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800640 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641
642 def _do_waitpid(self, expected_pid):
643 raise NotImplementedError()
644
645 def _do_waitpid_all(self):
646 raise NotImplementedError()
647
Guido van Rossum2bcae702013-11-13 15:50:08 -0800648 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800649 assert loop is None or isinstance(loop, events.AbstractEventLoop)
650
651 if self._loop is not None:
652 self._loop.remove_signal_handler(signal.SIGCHLD)
653
654 self._loop = loop
655 if loop is not None:
656 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
657
658 # Prevent a race condition in case a child terminated
659 # during the switch.
660 self._do_waitpid_all()
661
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800662 def _sig_chld(self):
663 try:
664 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500665 except Exception as exc:
666 # self._loop should always be available here
667 # as '_sig_chld' is added as a signal handler
668 # in 'attach_loop'
669 self._loop.call_exception_handler({
670 'message': 'Unknown exception in SIGCHLD handler',
671 'exception': exc,
672 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800673
674 def _compute_returncode(self, status):
675 if os.WIFSIGNALED(status):
676 # The child process died because of a signal.
677 return -os.WTERMSIG(status)
678 elif os.WIFEXITED(status):
679 # The child process exited (e.g sys.exit()).
680 return os.WEXITSTATUS(status)
681 else:
682 # The child exited, but we don't understand its status.
683 # This shouldn't happen, but if it does, let's just
684 # return that status; perhaps that helps debug it.
685 return status
686
687
688class SafeChildWatcher(BaseChildWatcher):
689 """'Safe' child watcher implementation.
690
691 This implementation avoids disrupting other code spawning processes by
692 polling explicitly each process in the SIGCHLD handler instead of calling
693 os.waitpid(-1).
694
695 This is a safe solution but it has a significant overhead when handling a
696 big number of children (O(n) each time SIGCHLD is raised)
697 """
698
Guido van Rossum2bcae702013-11-13 15:50:08 -0800699 def __init__(self):
700 super().__init__()
701 self._callbacks = {}
702
703 def close(self):
704 self._callbacks.clear()
705 super().close()
706
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800707 def __enter__(self):
708 return self
709
710 def __exit__(self, a, b, c):
711 pass
712
713 def add_child_handler(self, pid, callback, *args):
714 self._callbacks[pid] = callback, args
715
716 # Prevent a race condition in case the child is already terminated.
717 self._do_waitpid(pid)
718
Guido van Rossum2bcae702013-11-13 15:50:08 -0800719 def remove_child_handler(self, pid):
720 try:
721 del self._callbacks[pid]
722 return True
723 except KeyError:
724 return False
725
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726 def _do_waitpid_all(self):
727
728 for pid in list(self._callbacks):
729 self._do_waitpid(pid)
730
731 def _do_waitpid(self, expected_pid):
732 assert expected_pid > 0
733
734 try:
735 pid, status = os.waitpid(expected_pid, os.WNOHANG)
736 except ChildProcessError:
737 # The child process is already reaped
738 # (may happen if waitpid() is called elsewhere).
739 pid = expected_pid
740 returncode = 255
741 logger.warning(
742 "Unknown child process pid %d, will report returncode 255",
743 pid)
744 else:
745 if pid == 0:
746 # The child process is still alive.
747 return
748
749 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200750 if self._loop.get_debug():
751 logger.debug('process %s exited with returncode %s',
752 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753
754 try:
755 callback, args = self._callbacks.pop(pid)
756 except KeyError: # pragma: no cover
757 # May happen if .remove_child_handler() is called
758 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200759 if self._loop.get_debug():
760 logger.warning("Child watcher got an unexpected pid: %r",
761 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800762 else:
763 callback(pid, returncode, *args)
764
765
766class FastChildWatcher(BaseChildWatcher):
767 """'Fast' child watcher implementation.
768
769 This implementation reaps every terminated processes by calling
770 os.waitpid(-1) directly, possibly breaking other code spawning processes
771 and waiting for their termination.
772
773 There is no noticeable overhead when handling a big number of children
774 (O(1) each time a child terminates).
775 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800776 def __init__(self):
777 super().__init__()
778 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800779 self._lock = threading.Lock()
780 self._zombies = {}
781 self._forks = 0
782
783 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800784 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800785 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800786 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800787
788 def __enter__(self):
789 with self._lock:
790 self._forks += 1
791
792 return self
793
794 def __exit__(self, a, b, c):
795 with self._lock:
796 self._forks -= 1
797
798 if self._forks or not self._zombies:
799 return
800
801 collateral_victims = str(self._zombies)
802 self._zombies.clear()
803
804 logger.warning(
805 "Caught subprocesses termination from unknown pids: %s",
806 collateral_victims)
807
808 def add_child_handler(self, pid, callback, *args):
809 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800810 with self._lock:
811 try:
812 returncode = self._zombies.pop(pid)
813 except KeyError:
814 # The child is running.
815 self._callbacks[pid] = callback, args
816 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800818 # The child is dead already. We can fire the callback.
819 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820
Guido van Rossum2bcae702013-11-13 15:50:08 -0800821 def remove_child_handler(self, pid):
822 try:
823 del self._callbacks[pid]
824 return True
825 except KeyError:
826 return False
827
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800828 def _do_waitpid_all(self):
829 # Because of signal coalescing, we must keep calling waitpid() as
830 # long as we're able to reap a child.
831 while True:
832 try:
833 pid, status = os.waitpid(-1, os.WNOHANG)
834 except ChildProcessError:
835 # No more child processes exist.
836 return
837 else:
838 if pid == 0:
839 # A child process is still alive.
840 return
841
842 returncode = self._compute_returncode(status)
843
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800844 with self._lock:
845 try:
846 callback, args = self._callbacks.pop(pid)
847 except KeyError:
848 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849 if self._forks:
850 # It may not be registered yet.
851 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200852 if self._loop.get_debug():
853 logger.debug('unknown process %s exited '
854 'with returncode %s',
855 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800857 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200858 else:
859 if self._loop.get_debug():
860 logger.debug('process %s exited with returncode %s',
861 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800863 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800864 logger.warning(
865 "Caught subprocess termination from unknown pid: "
866 "%d -> %d", pid, returncode)
867 else:
868 callback(pid, returncode, *args)
869
870
871class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
872 """XXX"""
873 _loop_factory = _UnixSelectorEventLoop
874
875 def __init__(self):
876 super().__init__()
877 self._watcher = None
878
879 def _init_watcher(self):
880 with events._lock:
881 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800882 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800883 if isinstance(threading.current_thread(),
884 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800885 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800886
887 def set_event_loop(self, loop):
888 """Set the event loop.
889
890 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800891 .set_event_loop() from the main thread will call .attach_loop(loop) on
892 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800893 """
894
895 super().set_event_loop(loop)
896
897 if self._watcher is not None and \
898 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800899 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
901 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200902 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903
904 If not yet set, a SafeChildWatcher object is automatically created.
905 """
906 if self._watcher is None:
907 self._init_watcher()
908
909 return self._watcher
910
911 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200912 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800913
914 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
915
916 if self._watcher is not None:
917 self._watcher.close()
918
919 self._watcher = watcher
920
921SelectorEventLoop = _UnixSelectorEventLoop
922DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy