blob: 5020cc5db58e2441c369dbeba8418dfa506aee45 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
70 self._check_signal(sig)
71 try:
72 # set_wakeup_fd() raises ValueError if this is not the
73 # main thread. By calling it early we ensure that an
74 # event loop running in another thread cannot add a signal
75 # handler.
76 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020077 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 raise RuntimeError(str(exc))
79
Yury Selivanov569efa22014-02-18 18:02:19 -050080 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 self._signal_handlers[sig] = handle
82
83 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020084 # Register a dummy signal handler to ask Python to write the signal
85 # number in the wakup file descriptor. _process_self_data() will
86 # read signal numbers from this file descriptor to handle signals.
87 signal.signal(sig, _sighandler_noop)
88
Charles-François Natali74e7cf32013-12-05 22:47:19 +010089 # Set SA_RESTART to limit EINTR occurrences.
90 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 except OSError as exc:
92 del self._signal_handlers[sig]
93 if not self._signal_handlers:
94 try:
95 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020096 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070097 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098
99 if exc.errno == errno.EINVAL:
100 raise RuntimeError('sig {} cannot be caught'.format(sig))
101 else:
102 raise
103
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200104 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 """Internal helper that is the actual signal handler."""
106 handle = self._signal_handlers.get(sig)
107 if handle is None:
108 return # Assume it's some race condition.
109 if handle._cancelled:
110 self.remove_signal_handler(sig) # Remove it properly.
111 else:
112 self._add_callback_signalsafe(handle)
113
114 def remove_signal_handler(self, sig):
115 """Remove a handler for a signal. UNIX only.
116
117 Return True if a signal handler was removed, False if not.
118 """
119 self._check_signal(sig)
120 try:
121 del self._signal_handlers[sig]
122 except KeyError:
123 return False
124
125 if sig == signal.SIGINT:
126 handler = signal.default_int_handler
127 else:
128 handler = signal.SIG_DFL
129
130 try:
131 signal.signal(sig, handler)
132 except OSError as exc:
133 if exc.errno == errno.EINVAL:
134 raise RuntimeError('sig {} cannot be caught'.format(sig))
135 else:
136 raise
137
138 if not self._signal_handlers:
139 try:
140 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200141 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700142 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 return True
145
146 def _check_signal(self, sig):
147 """Internal helper to validate a signal.
148
149 Raise ValueError if the signal number is invalid or uncatchable.
150 Raise RuntimeError if there is a problem setting up the handler.
151 """
152 if not isinstance(sig, int):
153 raise TypeError('sig must be an int, not {!r}'.format(sig))
154
155 if not (1 <= sig < signal.NSIG):
156 raise ValueError(
157 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
158
159 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
160 extra=None):
161 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
162
163 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
166
Victor Stinnerf951d282014-06-29 00:46:45 +0200167 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 def _make_subprocess_transport(self, protocol, args, shell,
169 stdin, stdout, stderr, bufsize,
170 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800171 with events.get_child_watcher() as watcher:
172 transp = _UnixSubprocessTransport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800174 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800175 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 watcher.add_child_handler(transp.get_pid(),
177 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800178
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 return transp
180
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 def _child_watcher_callback(self, pid, returncode, transp):
182 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Victor Stinnerf951d282014-06-29 00:46:45 +0200184 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500185 def create_unix_connection(self, protocol_factory, path, *,
186 ssl=None, sock=None,
187 server_hostname=None):
188 assert server_hostname is None or isinstance(server_hostname, str)
189 if ssl:
190 if server_hostname is None:
191 raise ValueError(
192 'you have to pass server_hostname when using ssl')
193 else:
194 if server_hostname is not None:
195 raise ValueError('server_hostname is only meaningful with ssl')
196
197 if path is not None:
198 if sock is not None:
199 raise ValueError(
200 'path and sock can not be specified at the same time')
201
Victor Stinner79a29522014-02-19 01:45:59 +0100202 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500204 sock.setblocking(False)
205 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100206 except:
207 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 raise
209
210 else:
211 if sock is None:
212 raise ValueError('no path and sock were specified')
213 sock.setblocking(False)
214
215 transport, protocol = yield from self._create_connection_transport(
216 sock, protocol_factory, ssl, server_hostname)
217 return transport, protocol
218
Victor Stinnerf951d282014-06-29 00:46:45 +0200219 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500220 def create_unix_server(self, protocol_factory, path=None, *,
221 sock=None, backlog=100, ssl=None):
222 if isinstance(ssl, bool):
223 raise TypeError('ssl argument must be an SSLContext or None')
224
225 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200226 if sock is not None:
227 raise ValueError(
228 'path and sock can not be specified at the same time')
229
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
231
232 try:
233 sock.bind(path)
234 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 if exc.errno == errno.EADDRINUSE:
237 # Let's improve the error message by adding
238 # with what exact address it occurs.
239 msg = 'Address {!r} is already in use'.format(path)
240 raise OSError(errno.EADDRINUSE, msg) from None
241 else:
242 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200243 except:
244 sock.close()
245 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500246 else:
247 if sock is None:
248 raise ValueError(
249 'path was not specified, and no sock specified')
250
251 if sock.family != socket.AF_UNIX:
252 raise ValueError(
253 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
254
255 server = base_events.Server(self, [sock])
256 sock.listen(backlog)
257 sock.setblocking(False)
258 self._start_serving(protocol_factory, sock, ssl, server)
259 return server
260
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
262def _set_nonblocking(fd):
263 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
264 flags = flags | os.O_NONBLOCK
265 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
266
267
268class _UnixReadPipeTransport(transports.ReadTransport):
269
Yury Selivanovdec1a452014-02-18 22:27:48 -0500270 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
272 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
273 super().__init__(extra)
274 self._extra['pipe'] = pipe
275 self._loop = loop
276 self._pipe = pipe
277 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700278 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800279 if not (stat.S_ISFIFO(mode) or
280 stat.S_ISSOCK(mode) or
281 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700282 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 _set_nonblocking(self._fileno)
284 self._protocol = protocol
285 self._closing = False
286 self._loop.add_reader(self._fileno, self._read_ready)
287 self._loop.call_soon(self._protocol.connection_made, self)
288 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200289 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200290 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291
Victor Stinnere912e652014-07-12 03:11:53 +0200292 def __repr__(self):
293 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
294 if self._pipe is not None:
295 polling = selector_events._test_selector_event(
296 self._loop._selector,
297 self._fileno, selectors.EVENT_READ)
298 if polling:
299 info.append('polling')
300 else:
301 info.append('idle')
302 else:
303 info.append('closed')
304 return '<%s>' % ' '.join(info)
305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 def _read_ready(self):
307 try:
308 data = os.read(self._fileno, self.max_size)
309 except (BlockingIOError, InterruptedError):
310 pass
311 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100312 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 else:
314 if data:
315 self._protocol.data_received(data)
316 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200317 if self._loop.get_debug():
318 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 self._closing = True
320 self._loop.remove_reader(self._fileno)
321 self._loop.call_soon(self._protocol.eof_received)
322 self._loop.call_soon(self._call_connection_lost, None)
323
Guido van Rossum57497ad2013-10-18 07:58:20 -0700324 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 self._loop.remove_reader(self._fileno)
326
Guido van Rossum57497ad2013-10-18 07:58:20 -0700327 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self._loop.add_reader(self._fileno, self._read_ready)
329
330 def close(self):
331 if not self._closing:
332 self._close(None)
333
Victor Stinner0ee29c22014-02-19 01:40:41 +0100334 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800336 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500337 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100338 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500339 'exception': exc,
340 'transport': self,
341 'protocol': self._protocol,
342 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 self._close(exc)
344
345 def _close(self, exc):
346 self._closing = True
347 self._loop.remove_reader(self._fileno)
348 self._loop.call_soon(self._call_connection_lost, exc)
349
350 def _call_connection_lost(self, exc):
351 try:
352 self._protocol.connection_lost(exc)
353 finally:
354 self._pipe.close()
355 self._pipe = None
356 self._protocol = None
357 self._loop = None
358
359
Yury Selivanov3cb99142014-02-18 18:41:13 -0500360class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800361 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362
363 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
364 super().__init__(extra)
365 self._extra['pipe'] = pipe
366 self._loop = loop
367 self._pipe = pipe
368 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700369 mode = os.fstat(self._fileno).st_mode
370 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100371 if not (is_socket or
372 stat.S_ISFIFO(mode) or
373 stat.S_ISCHR(mode)):
374 raise ValueError("Pipe transport is only for "
375 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 _set_nonblocking(self._fileno)
377 self._protocol = protocol
378 self._buffer = []
379 self._conn_lost = 0
380 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700381
382 # On AIX, the reader trick only works for sockets.
383 # On other platforms it works for pipes and sockets.
384 # (Exception: OS X 10.4? Issue #19294.)
385 if is_socket or not sys.platform.startswith("aix"):
386 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387
388 self._loop.call_soon(self._protocol.connection_made, self)
389 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200390 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200391 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
Victor Stinnere912e652014-07-12 03:11:53 +0200393 def __repr__(self):
394 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
395 if self._pipe is not None:
396 polling = selector_events._test_selector_event(
397 self._loop._selector,
398 self._fileno, selectors.EVENT_WRITE)
399 if polling:
400 info.append('polling')
401 else:
402 info.append('idle')
403
404 bufsize = self.get_write_buffer_size()
405 info.append('bufsize=%s' % bufsize)
406 else:
407 info.append('closed')
408 return '<%s>' % ' '.join(info)
409
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800410 def get_write_buffer_size(self):
411 return sum(len(data) for data in self._buffer)
412
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700414 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200415 if self._loop.get_debug():
416 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100417 if self._buffer:
418 self._close(BrokenPipeError())
419 else:
420 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800423 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
424 if isinstance(data, bytearray):
425 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 if not data:
427 return
428
429 if self._conn_lost or self._closing:
430 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700431 logger.warning('pipe closed by peer or '
432 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 self._conn_lost += 1
434 return
435
436 if not self._buffer:
437 # Attempt to send it right away first.
438 try:
439 n = os.write(self._fileno, data)
440 except (BlockingIOError, InterruptedError):
441 n = 0
442 except Exception as exc:
443 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100444 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 return
446 if n == len(data):
447 return
448 elif n > 0:
449 data = data[n:]
450 self._loop.add_writer(self._fileno, self._write_ready)
451
452 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800453 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454
455 def _write_ready(self):
456 data = b''.join(self._buffer)
457 assert data, 'Data should not be empty'
458
459 self._buffer.clear()
460 try:
461 n = os.write(self._fileno, data)
462 except (BlockingIOError, InterruptedError):
463 self._buffer.append(data)
464 except Exception as exc:
465 self._conn_lost += 1
466 # Remove writer here, _fatal_error() doesn't it
467 # because _buffer is empty.
468 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100469 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 else:
471 if n == len(data):
472 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800473 self._maybe_resume_protocol() # May append to buffer.
474 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 self._loop.remove_reader(self._fileno)
476 self._call_connection_lost(None)
477 return
478 elif n > 0:
479 data = data[n:]
480
481 self._buffer.append(data) # Try again later.
482
483 def can_write_eof(self):
484 return True
485
486 # TODO: Make the relationships between write_eof(), close(),
487 # abort(), _fatal_error() and _close() more straightforward.
488
489 def write_eof(self):
490 if self._closing:
491 return
492 assert self._pipe
493 self._closing = True
494 if not self._buffer:
495 self._loop.remove_reader(self._fileno)
496 self._loop.call_soon(self._call_connection_lost, None)
497
498 def close(self):
499 if not self._closing:
500 # write_eof is all what we needed to close the write pipe
501 self.write_eof()
502
503 def abort(self):
504 self._close(None)
505
Victor Stinner0ee29c22014-02-19 01:40:41 +0100506 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800508 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500509 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100510 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500511 'exception': exc,
512 'transport': self,
513 'protocol': self._protocol,
514 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 self._close(exc)
516
517 def _close(self, exc=None):
518 self._closing = True
519 if self._buffer:
520 self._loop.remove_writer(self._fileno)
521 self._buffer.clear()
522 self._loop.remove_reader(self._fileno)
523 self._loop.call_soon(self._call_connection_lost, exc)
524
525 def _call_connection_lost(self, exc):
526 try:
527 self._protocol.connection_lost(exc)
528 finally:
529 self._pipe.close()
530 self._pipe = None
531 self._protocol = None
532 self._loop = None
533
534
Guido van Rossum59691282013-10-30 14:52:03 -0700535class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536
Guido van Rossum59691282013-10-30 14:52:03 -0700537 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700538 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700540 # Use a socket pair for stdin, since not all platforms
541 # support selecting read events on the write end of a
542 # socket (which we use in order to detect closing of the
543 # other end). Notably this is needed on AIX, and works
544 # just fine on other platforms.
545 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 self._proc = subprocess.Popen(
547 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
548 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700549 if stdin_w is not None:
550 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200551 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800552
553
554class AbstractChildWatcher:
555 """Abstract base class for monitoring child processes.
556
557 Objects derived from this class monitor a collection of subprocesses and
558 report their termination or interruption by a signal.
559
560 New callbacks are registered with .add_child_handler(). Starting a new
561 process must be done within a 'with' block to allow the watcher to suspend
562 its activity until the new process if fully registered (this is needed to
563 prevent a race condition in some implementations).
564
565 Example:
566 with watcher:
567 proc = subprocess.Popen("sleep 1")
568 watcher.add_child_handler(proc.pid, callback)
569
570 Notes:
571 Implementations of this class must be thread-safe.
572
573 Since child watcher objects may catch the SIGCHLD signal and call
574 waitpid(-1), there should be only one active object per process.
575 """
576
577 def add_child_handler(self, pid, callback, *args):
578 """Register a new child handler.
579
580 Arrange for callback(pid, returncode, *args) to be called when
581 process 'pid' terminates. Specifying another callback for the same
582 process replaces the previous handler.
583
Victor Stinneracdb7822014-07-14 18:33:40 +0200584 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800585 """
586 raise NotImplementedError()
587
588 def remove_child_handler(self, pid):
589 """Removes the handler for process 'pid'.
590
591 The function returns True if the handler was successfully removed,
592 False if there was nothing to remove."""
593
594 raise NotImplementedError()
595
Guido van Rossum2bcae702013-11-13 15:50:08 -0800596 def attach_loop(self, loop):
597 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800598
Guido van Rossum2bcae702013-11-13 15:50:08 -0800599 If the watcher was previously attached to an event loop, then it is
600 first detached before attaching to the new loop.
601
602 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800603 """
604 raise NotImplementedError()
605
606 def close(self):
607 """Close the watcher.
608
609 This must be called to make sure that any underlying resource is freed.
610 """
611 raise NotImplementedError()
612
613 def __enter__(self):
614 """Enter the watcher's context and allow starting new processes
615
616 This function must return self"""
617 raise NotImplementedError()
618
619 def __exit__(self, a, b, c):
620 """Exit the watcher's context"""
621 raise NotImplementedError()
622
623
624class BaseChildWatcher(AbstractChildWatcher):
625
Guido van Rossum2bcae702013-11-13 15:50:08 -0800626 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800627 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800628
629 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800630 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800631
632 def _do_waitpid(self, expected_pid):
633 raise NotImplementedError()
634
635 def _do_waitpid_all(self):
636 raise NotImplementedError()
637
Guido van Rossum2bcae702013-11-13 15:50:08 -0800638 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800639 assert loop is None or isinstance(loop, events.AbstractEventLoop)
640
641 if self._loop is not None:
642 self._loop.remove_signal_handler(signal.SIGCHLD)
643
644 self._loop = loop
645 if loop is not None:
646 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
647
648 # Prevent a race condition in case a child terminated
649 # during the switch.
650 self._do_waitpid_all()
651
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800652 def _sig_chld(self):
653 try:
654 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500655 except Exception as exc:
656 # self._loop should always be available here
657 # as '_sig_chld' is added as a signal handler
658 # in 'attach_loop'
659 self._loop.call_exception_handler({
660 'message': 'Unknown exception in SIGCHLD handler',
661 'exception': exc,
662 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800663
664 def _compute_returncode(self, status):
665 if os.WIFSIGNALED(status):
666 # The child process died because of a signal.
667 return -os.WTERMSIG(status)
668 elif os.WIFEXITED(status):
669 # The child process exited (e.g sys.exit()).
670 return os.WEXITSTATUS(status)
671 else:
672 # The child exited, but we don't understand its status.
673 # This shouldn't happen, but if it does, let's just
674 # return that status; perhaps that helps debug it.
675 return status
676
677
678class SafeChildWatcher(BaseChildWatcher):
679 """'Safe' child watcher implementation.
680
681 This implementation avoids disrupting other code spawning processes by
682 polling explicitly each process in the SIGCHLD handler instead of calling
683 os.waitpid(-1).
684
685 This is a safe solution but it has a significant overhead when handling a
686 big number of children (O(n) each time SIGCHLD is raised)
687 """
688
Guido van Rossum2bcae702013-11-13 15:50:08 -0800689 def __init__(self):
690 super().__init__()
691 self._callbacks = {}
692
693 def close(self):
694 self._callbacks.clear()
695 super().close()
696
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800697 def __enter__(self):
698 return self
699
700 def __exit__(self, a, b, c):
701 pass
702
703 def add_child_handler(self, pid, callback, *args):
704 self._callbacks[pid] = callback, args
705
706 # Prevent a race condition in case the child is already terminated.
707 self._do_waitpid(pid)
708
Guido van Rossum2bcae702013-11-13 15:50:08 -0800709 def remove_child_handler(self, pid):
710 try:
711 del self._callbacks[pid]
712 return True
713 except KeyError:
714 return False
715
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800716 def _do_waitpid_all(self):
717
718 for pid in list(self._callbacks):
719 self._do_waitpid(pid)
720
721 def _do_waitpid(self, expected_pid):
722 assert expected_pid > 0
723
724 try:
725 pid, status = os.waitpid(expected_pid, os.WNOHANG)
726 except ChildProcessError:
727 # The child process is already reaped
728 # (may happen if waitpid() is called elsewhere).
729 pid = expected_pid
730 returncode = 255
731 logger.warning(
732 "Unknown child process pid %d, will report returncode 255",
733 pid)
734 else:
735 if pid == 0:
736 # The child process is still alive.
737 return
738
739 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200740 if self._loop.get_debug():
741 logger.debug('process %s exited with returncode %s',
742 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800743
744 try:
745 callback, args = self._callbacks.pop(pid)
746 except KeyError: # pragma: no cover
747 # May happen if .remove_child_handler() is called
748 # after os.waitpid() returns.
749 pass
750 else:
751 callback(pid, returncode, *args)
752
753
754class FastChildWatcher(BaseChildWatcher):
755 """'Fast' child watcher implementation.
756
757 This implementation reaps every terminated processes by calling
758 os.waitpid(-1) directly, possibly breaking other code spawning processes
759 and waiting for their termination.
760
761 There is no noticeable overhead when handling a big number of children
762 (O(1) each time a child terminates).
763 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800764 def __init__(self):
765 super().__init__()
766 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800767 self._lock = threading.Lock()
768 self._zombies = {}
769 self._forks = 0
770
771 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800772 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800774 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800775
776 def __enter__(self):
777 with self._lock:
778 self._forks += 1
779
780 return self
781
782 def __exit__(self, a, b, c):
783 with self._lock:
784 self._forks -= 1
785
786 if self._forks or not self._zombies:
787 return
788
789 collateral_victims = str(self._zombies)
790 self._zombies.clear()
791
792 logger.warning(
793 "Caught subprocesses termination from unknown pids: %s",
794 collateral_victims)
795
796 def add_child_handler(self, pid, callback, *args):
797 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800798 with self._lock:
799 try:
800 returncode = self._zombies.pop(pid)
801 except KeyError:
802 # The child is running.
803 self._callbacks[pid] = callback, args
804 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800805
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800806 # The child is dead already. We can fire the callback.
807 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800808
Guido van Rossum2bcae702013-11-13 15:50:08 -0800809 def remove_child_handler(self, pid):
810 try:
811 del self._callbacks[pid]
812 return True
813 except KeyError:
814 return False
815
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800816 def _do_waitpid_all(self):
817 # Because of signal coalescing, we must keep calling waitpid() as
818 # long as we're able to reap a child.
819 while True:
820 try:
821 pid, status = os.waitpid(-1, os.WNOHANG)
822 except ChildProcessError:
823 # No more child processes exist.
824 return
825 else:
826 if pid == 0:
827 # A child process is still alive.
828 return
829
830 returncode = self._compute_returncode(status)
831
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800832 with self._lock:
833 try:
834 callback, args = self._callbacks.pop(pid)
835 except KeyError:
836 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800837 if self._forks:
838 # It may not be registered yet.
839 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200840 if self._loop.get_debug():
841 logger.debug('unknown process %s exited '
842 'with returncode %s',
843 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800844 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800845 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200846 else:
847 if self._loop.get_debug():
848 logger.debug('process %s exited with returncode %s',
849 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800850
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800851 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800852 logger.warning(
853 "Caught subprocess termination from unknown pid: "
854 "%d -> %d", pid, returncode)
855 else:
856 callback(pid, returncode, *args)
857
858
859class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
860 """XXX"""
861 _loop_factory = _UnixSelectorEventLoop
862
863 def __init__(self):
864 super().__init__()
865 self._watcher = None
866
867 def _init_watcher(self):
868 with events._lock:
869 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800870 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800871 if isinstance(threading.current_thread(),
872 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800873 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800874
875 def set_event_loop(self, loop):
876 """Set the event loop.
877
878 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800879 .set_event_loop() from the main thread will call .attach_loop(loop) on
880 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800881 """
882
883 super().set_event_loop(loop)
884
885 if self._watcher is not None and \
886 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800887 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800888
889 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200890 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891
892 If not yet set, a SafeChildWatcher object is automatically created.
893 """
894 if self._watcher is None:
895 self._init_watcher()
896
897 return self._watcher
898
899 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200900 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800901
902 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
903
904 if self._watcher is not None:
905 self._watcher.close()
906
907 self._watcher = watcher
908
909SelectorEventLoop = _UnixSelectorEventLoop
910DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy