blob: 8d3e25eb0d1f75f2ab47b58c621ee2d890ed3840 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
70 self._check_signal(sig)
71 try:
72 # set_wakeup_fd() raises ValueError if this is not the
73 # main thread. By calling it early we ensure that an
74 # event loop running in another thread cannot add a signal
75 # handler.
76 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020077 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 raise RuntimeError(str(exc))
79
Yury Selivanov569efa22014-02-18 18:02:19 -050080 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 self._signal_handlers[sig] = handle
82
83 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020084 # Register a dummy signal handler to ask Python to write the signal
85 # number in the wakup file descriptor. _process_self_data() will
86 # read signal numbers from this file descriptor to handle signals.
87 signal.signal(sig, _sighandler_noop)
88
Charles-François Natali74e7cf32013-12-05 22:47:19 +010089 # Set SA_RESTART to limit EINTR occurrences.
90 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 except OSError as exc:
92 del self._signal_handlers[sig]
93 if not self._signal_handlers:
94 try:
95 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020096 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070097 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098
99 if exc.errno == errno.EINVAL:
100 raise RuntimeError('sig {} cannot be caught'.format(sig))
101 else:
102 raise
103
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200104 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 """Internal helper that is the actual signal handler."""
106 handle = self._signal_handlers.get(sig)
107 if handle is None:
108 return # Assume it's some race condition.
109 if handle._cancelled:
110 self.remove_signal_handler(sig) # Remove it properly.
111 else:
112 self._add_callback_signalsafe(handle)
113
114 def remove_signal_handler(self, sig):
115 """Remove a handler for a signal. UNIX only.
116
117 Return True if a signal handler was removed, False if not.
118 """
119 self._check_signal(sig)
120 try:
121 del self._signal_handlers[sig]
122 except KeyError:
123 return False
124
125 if sig == signal.SIGINT:
126 handler = signal.default_int_handler
127 else:
128 handler = signal.SIG_DFL
129
130 try:
131 signal.signal(sig, handler)
132 except OSError as exc:
133 if exc.errno == errno.EINVAL:
134 raise RuntimeError('sig {} cannot be caught'.format(sig))
135 else:
136 raise
137
138 if not self._signal_handlers:
139 try:
140 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200141 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700142 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143
144 return True
145
146 def _check_signal(self, sig):
147 """Internal helper to validate a signal.
148
149 Raise ValueError if the signal number is invalid or uncatchable.
150 Raise RuntimeError if there is a problem setting up the handler.
151 """
152 if not isinstance(sig, int):
153 raise TypeError('sig must be an int, not {!r}'.format(sig))
154
155 if not (1 <= sig < signal.NSIG):
156 raise ValueError(
157 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
158
159 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
160 extra=None):
161 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
162
163 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
164 extra=None):
165 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
166
Victor Stinnerf951d282014-06-29 00:46:45 +0200167 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 def _make_subprocess_transport(self, protocol, args, shell,
169 stdin, stdout, stderr, bufsize,
170 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800171 with events.get_child_watcher() as watcher:
172 transp = _UnixSubprocessTransport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800174 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800175 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 watcher.add_child_handler(transp.get_pid(),
177 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800178
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700179 return transp
180
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 def _child_watcher_callback(self, pid, returncode, transp):
182 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Victor Stinnerf951d282014-06-29 00:46:45 +0200184 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500185 def create_unix_connection(self, protocol_factory, path, *,
186 ssl=None, sock=None,
187 server_hostname=None):
188 assert server_hostname is None or isinstance(server_hostname, str)
189 if ssl:
190 if server_hostname is None:
191 raise ValueError(
192 'you have to pass server_hostname when using ssl')
193 else:
194 if server_hostname is not None:
195 raise ValueError('server_hostname is only meaningful with ssl')
196
197 if path is not None:
198 if sock is not None:
199 raise ValueError(
200 'path and sock can not be specified at the same time')
201
Victor Stinner79a29522014-02-19 01:45:59 +0100202 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500204 sock.setblocking(False)
205 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100206 except:
207 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 raise
209
210 else:
211 if sock is None:
212 raise ValueError('no path and sock were specified')
213 sock.setblocking(False)
214
215 transport, protocol = yield from self._create_connection_transport(
216 sock, protocol_factory, ssl, server_hostname)
217 return transport, protocol
218
Victor Stinnerf951d282014-06-29 00:46:45 +0200219 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500220 def create_unix_server(self, protocol_factory, path=None, *,
221 sock=None, backlog=100, ssl=None):
222 if isinstance(ssl, bool):
223 raise TypeError('ssl argument must be an SSLContext or None')
224
225 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200226 if sock is not None:
227 raise ValueError(
228 'path and sock can not be specified at the same time')
229
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
231
232 try:
233 sock.bind(path)
234 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100235 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500236 if exc.errno == errno.EADDRINUSE:
237 # Let's improve the error message by adding
238 # with what exact address it occurs.
239 msg = 'Address {!r} is already in use'.format(path)
240 raise OSError(errno.EADDRINUSE, msg) from None
241 else:
242 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200243 except:
244 sock.close()
245 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500246 else:
247 if sock is None:
248 raise ValueError(
249 'path was not specified, and no sock specified')
250
251 if sock.family != socket.AF_UNIX:
252 raise ValueError(
253 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
254
255 server = base_events.Server(self, [sock])
256 sock.listen(backlog)
257 sock.setblocking(False)
258 self._start_serving(protocol_factory, sock, ssl, server)
259 return server
260
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200262if hasattr(os, 'set_blocking'):
263 def _set_nonblocking(fd):
264 os.set_blocking(fd, False)
265else:
266 def _set_nonblocking(fd):
267 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
268 flags = flags | os.O_NONBLOCK
269 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
271
272class _UnixReadPipeTransport(transports.ReadTransport):
273
Yury Selivanovdec1a452014-02-18 22:27:48 -0500274 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
277 super().__init__(extra)
278 self._extra['pipe'] = pipe
279 self._loop = loop
280 self._pipe = pipe
281 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700282 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800283 if not (stat.S_ISFIFO(mode) or
284 stat.S_ISSOCK(mode) or
285 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700286 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 _set_nonblocking(self._fileno)
288 self._protocol = protocol
289 self._closing = False
290 self._loop.add_reader(self._fileno, self._read_ready)
291 self._loop.call_soon(self._protocol.connection_made, self)
292 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200293 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200294 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295
Victor Stinnere912e652014-07-12 03:11:53 +0200296 def __repr__(self):
297 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
298 if self._pipe is not None:
299 polling = selector_events._test_selector_event(
300 self._loop._selector,
301 self._fileno, selectors.EVENT_READ)
302 if polling:
303 info.append('polling')
304 else:
305 info.append('idle')
306 else:
307 info.append('closed')
308 return '<%s>' % ' '.join(info)
309
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 def _read_ready(self):
311 try:
312 data = os.read(self._fileno, self.max_size)
313 except (BlockingIOError, InterruptedError):
314 pass
315 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100316 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 else:
318 if data:
319 self._protocol.data_received(data)
320 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200321 if self._loop.get_debug():
322 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 self._closing = True
324 self._loop.remove_reader(self._fileno)
325 self._loop.call_soon(self._protocol.eof_received)
326 self._loop.call_soon(self._call_connection_lost, None)
327
Guido van Rossum57497ad2013-10-18 07:58:20 -0700328 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 self._loop.remove_reader(self._fileno)
330
Guido van Rossum57497ad2013-10-18 07:58:20 -0700331 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 self._loop.add_reader(self._fileno, self._read_ready)
333
334 def close(self):
335 if not self._closing:
336 self._close(None)
337
Victor Stinner0ee29c22014-02-19 01:40:41 +0100338 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800340 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500341 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100342 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500343 'exception': exc,
344 'transport': self,
345 'protocol': self._protocol,
346 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 self._close(exc)
348
349 def _close(self, exc):
350 self._closing = True
351 self._loop.remove_reader(self._fileno)
352 self._loop.call_soon(self._call_connection_lost, exc)
353
354 def _call_connection_lost(self, exc):
355 try:
356 self._protocol.connection_lost(exc)
357 finally:
358 self._pipe.close()
359 self._pipe = None
360 self._protocol = None
361 self._loop = None
362
363
Yury Selivanov3cb99142014-02-18 18:41:13 -0500364class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800365 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366
367 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
368 super().__init__(extra)
369 self._extra['pipe'] = pipe
370 self._loop = loop
371 self._pipe = pipe
372 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700373 mode = os.fstat(self._fileno).st_mode
374 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100375 if not (is_socket or
376 stat.S_ISFIFO(mode) or
377 stat.S_ISCHR(mode)):
378 raise ValueError("Pipe transport is only for "
379 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 _set_nonblocking(self._fileno)
381 self._protocol = protocol
382 self._buffer = []
383 self._conn_lost = 0
384 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700385
386 # On AIX, the reader trick only works for sockets.
387 # On other platforms it works for pipes and sockets.
388 # (Exception: OS X 10.4? Issue #19294.)
389 if is_socket or not sys.platform.startswith("aix"):
390 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
392 self._loop.call_soon(self._protocol.connection_made, self)
393 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200394 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200395 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396
Victor Stinnere912e652014-07-12 03:11:53 +0200397 def __repr__(self):
398 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
399 if self._pipe is not None:
400 polling = selector_events._test_selector_event(
401 self._loop._selector,
402 self._fileno, selectors.EVENT_WRITE)
403 if polling:
404 info.append('polling')
405 else:
406 info.append('idle')
407
408 bufsize = self.get_write_buffer_size()
409 info.append('bufsize=%s' % bufsize)
410 else:
411 info.append('closed')
412 return '<%s>' % ' '.join(info)
413
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800414 def get_write_buffer_size(self):
415 return sum(len(data) for data in self._buffer)
416
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700418 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200419 if self._loop.get_debug():
420 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100421 if self._buffer:
422 self._close(BrokenPipeError())
423 else:
424 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425
426 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800427 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
428 if isinstance(data, bytearray):
429 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 if not data:
431 return
432
433 if self._conn_lost or self._closing:
434 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700435 logger.warning('pipe closed by peer or '
436 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 self._conn_lost += 1
438 return
439
440 if not self._buffer:
441 # Attempt to send it right away first.
442 try:
443 n = os.write(self._fileno, data)
444 except (BlockingIOError, InterruptedError):
445 n = 0
446 except Exception as exc:
447 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100448 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 return
450 if n == len(data):
451 return
452 elif n > 0:
453 data = data[n:]
454 self._loop.add_writer(self._fileno, self._write_ready)
455
456 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800457 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458
459 def _write_ready(self):
460 data = b''.join(self._buffer)
461 assert data, 'Data should not be empty'
462
463 self._buffer.clear()
464 try:
465 n = os.write(self._fileno, data)
466 except (BlockingIOError, InterruptedError):
467 self._buffer.append(data)
468 except Exception as exc:
469 self._conn_lost += 1
470 # Remove writer here, _fatal_error() doesn't it
471 # because _buffer is empty.
472 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100473 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 else:
475 if n == len(data):
476 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800477 self._maybe_resume_protocol() # May append to buffer.
478 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479 self._loop.remove_reader(self._fileno)
480 self._call_connection_lost(None)
481 return
482 elif n > 0:
483 data = data[n:]
484
485 self._buffer.append(data) # Try again later.
486
487 def can_write_eof(self):
488 return True
489
490 # TODO: Make the relationships between write_eof(), close(),
491 # abort(), _fatal_error() and _close() more straightforward.
492
493 def write_eof(self):
494 if self._closing:
495 return
496 assert self._pipe
497 self._closing = True
498 if not self._buffer:
499 self._loop.remove_reader(self._fileno)
500 self._loop.call_soon(self._call_connection_lost, None)
501
502 def close(self):
503 if not self._closing:
504 # write_eof is all what we needed to close the write pipe
505 self.write_eof()
506
507 def abort(self):
508 self._close(None)
509
Victor Stinner0ee29c22014-02-19 01:40:41 +0100510 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800512 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500513 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100514 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500515 'exception': exc,
516 'transport': self,
517 'protocol': self._protocol,
518 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 self._close(exc)
520
521 def _close(self, exc=None):
522 self._closing = True
523 if self._buffer:
524 self._loop.remove_writer(self._fileno)
525 self._buffer.clear()
526 self._loop.remove_reader(self._fileno)
527 self._loop.call_soon(self._call_connection_lost, exc)
528
529 def _call_connection_lost(self, exc):
530 try:
531 self._protocol.connection_lost(exc)
532 finally:
533 self._pipe.close()
534 self._pipe = None
535 self._protocol = None
536 self._loop = None
537
538
Guido van Rossum59691282013-10-30 14:52:03 -0700539class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540
Guido van Rossum59691282013-10-30 14:52:03 -0700541 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700542 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700544 # Use a socket pair for stdin, since not all platforms
545 # support selecting read events on the write end of a
546 # socket (which we use in order to detect closing of the
547 # other end). Notably this is needed on AIX, and works
548 # just fine on other platforms.
549 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550 self._proc = subprocess.Popen(
551 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
552 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700553 if stdin_w is not None:
554 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200555 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800556
557
558class AbstractChildWatcher:
559 """Abstract base class for monitoring child processes.
560
561 Objects derived from this class monitor a collection of subprocesses and
562 report their termination or interruption by a signal.
563
564 New callbacks are registered with .add_child_handler(). Starting a new
565 process must be done within a 'with' block to allow the watcher to suspend
566 its activity until the new process if fully registered (this is needed to
567 prevent a race condition in some implementations).
568
569 Example:
570 with watcher:
571 proc = subprocess.Popen("sleep 1")
572 watcher.add_child_handler(proc.pid, callback)
573
574 Notes:
575 Implementations of this class must be thread-safe.
576
577 Since child watcher objects may catch the SIGCHLD signal and call
578 waitpid(-1), there should be only one active object per process.
579 """
580
581 def add_child_handler(self, pid, callback, *args):
582 """Register a new child handler.
583
584 Arrange for callback(pid, returncode, *args) to be called when
585 process 'pid' terminates. Specifying another callback for the same
586 process replaces the previous handler.
587
Victor Stinneracdb7822014-07-14 18:33:40 +0200588 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800589 """
590 raise NotImplementedError()
591
592 def remove_child_handler(self, pid):
593 """Removes the handler for process 'pid'.
594
595 The function returns True if the handler was successfully removed,
596 False if there was nothing to remove."""
597
598 raise NotImplementedError()
599
Guido van Rossum2bcae702013-11-13 15:50:08 -0800600 def attach_loop(self, loop):
601 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800602
Guido van Rossum2bcae702013-11-13 15:50:08 -0800603 If the watcher was previously attached to an event loop, then it is
604 first detached before attaching to the new loop.
605
606 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800607 """
608 raise NotImplementedError()
609
610 def close(self):
611 """Close the watcher.
612
613 This must be called to make sure that any underlying resource is freed.
614 """
615 raise NotImplementedError()
616
617 def __enter__(self):
618 """Enter the watcher's context and allow starting new processes
619
620 This function must return self"""
621 raise NotImplementedError()
622
623 def __exit__(self, a, b, c):
624 """Exit the watcher's context"""
625 raise NotImplementedError()
626
627
628class BaseChildWatcher(AbstractChildWatcher):
629
Guido van Rossum2bcae702013-11-13 15:50:08 -0800630 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800631 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800632
633 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800634 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800635
636 def _do_waitpid(self, expected_pid):
637 raise NotImplementedError()
638
639 def _do_waitpid_all(self):
640 raise NotImplementedError()
641
Guido van Rossum2bcae702013-11-13 15:50:08 -0800642 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800643 assert loop is None or isinstance(loop, events.AbstractEventLoop)
644
645 if self._loop is not None:
646 self._loop.remove_signal_handler(signal.SIGCHLD)
647
648 self._loop = loop
649 if loop is not None:
650 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
651
652 # Prevent a race condition in case a child terminated
653 # during the switch.
654 self._do_waitpid_all()
655
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800656 def _sig_chld(self):
657 try:
658 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500659 except Exception as exc:
660 # self._loop should always be available here
661 # as '_sig_chld' is added as a signal handler
662 # in 'attach_loop'
663 self._loop.call_exception_handler({
664 'message': 'Unknown exception in SIGCHLD handler',
665 'exception': exc,
666 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800667
668 def _compute_returncode(self, status):
669 if os.WIFSIGNALED(status):
670 # The child process died because of a signal.
671 return -os.WTERMSIG(status)
672 elif os.WIFEXITED(status):
673 # The child process exited (e.g sys.exit()).
674 return os.WEXITSTATUS(status)
675 else:
676 # The child exited, but we don't understand its status.
677 # This shouldn't happen, but if it does, let's just
678 # return that status; perhaps that helps debug it.
679 return status
680
681
682class SafeChildWatcher(BaseChildWatcher):
683 """'Safe' child watcher implementation.
684
685 This implementation avoids disrupting other code spawning processes by
686 polling explicitly each process in the SIGCHLD handler instead of calling
687 os.waitpid(-1).
688
689 This is a safe solution but it has a significant overhead when handling a
690 big number of children (O(n) each time SIGCHLD is raised)
691 """
692
Guido van Rossum2bcae702013-11-13 15:50:08 -0800693 def __init__(self):
694 super().__init__()
695 self._callbacks = {}
696
697 def close(self):
698 self._callbacks.clear()
699 super().close()
700
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701 def __enter__(self):
702 return self
703
704 def __exit__(self, a, b, c):
705 pass
706
707 def add_child_handler(self, pid, callback, *args):
708 self._callbacks[pid] = callback, args
709
710 # Prevent a race condition in case the child is already terminated.
711 self._do_waitpid(pid)
712
Guido van Rossum2bcae702013-11-13 15:50:08 -0800713 def remove_child_handler(self, pid):
714 try:
715 del self._callbacks[pid]
716 return True
717 except KeyError:
718 return False
719
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800720 def _do_waitpid_all(self):
721
722 for pid in list(self._callbacks):
723 self._do_waitpid(pid)
724
725 def _do_waitpid(self, expected_pid):
726 assert expected_pid > 0
727
728 try:
729 pid, status = os.waitpid(expected_pid, os.WNOHANG)
730 except ChildProcessError:
731 # The child process is already reaped
732 # (may happen if waitpid() is called elsewhere).
733 pid = expected_pid
734 returncode = 255
735 logger.warning(
736 "Unknown child process pid %d, will report returncode 255",
737 pid)
738 else:
739 if pid == 0:
740 # The child process is still alive.
741 return
742
743 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200744 if self._loop.get_debug():
745 logger.debug('process %s exited with returncode %s',
746 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800747
748 try:
749 callback, args = self._callbacks.pop(pid)
750 except KeyError: # pragma: no cover
751 # May happen if .remove_child_handler() is called
752 # after os.waitpid() returns.
753 pass
754 else:
755 callback(pid, returncode, *args)
756
757
758class FastChildWatcher(BaseChildWatcher):
759 """'Fast' child watcher implementation.
760
761 This implementation reaps every terminated processes by calling
762 os.waitpid(-1) directly, possibly breaking other code spawning processes
763 and waiting for their termination.
764
765 There is no noticeable overhead when handling a big number of children
766 (O(1) each time a child terminates).
767 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800768 def __init__(self):
769 super().__init__()
770 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800771 self._lock = threading.Lock()
772 self._zombies = {}
773 self._forks = 0
774
775 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800776 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800777 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800778 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800779
780 def __enter__(self):
781 with self._lock:
782 self._forks += 1
783
784 return self
785
786 def __exit__(self, a, b, c):
787 with self._lock:
788 self._forks -= 1
789
790 if self._forks or not self._zombies:
791 return
792
793 collateral_victims = str(self._zombies)
794 self._zombies.clear()
795
796 logger.warning(
797 "Caught subprocesses termination from unknown pids: %s",
798 collateral_victims)
799
800 def add_child_handler(self, pid, callback, *args):
801 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800802 with self._lock:
803 try:
804 returncode = self._zombies.pop(pid)
805 except KeyError:
806 # The child is running.
807 self._callbacks[pid] = callback, args
808 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800809
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800810 # The child is dead already. We can fire the callback.
811 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800812
Guido van Rossum2bcae702013-11-13 15:50:08 -0800813 def remove_child_handler(self, pid):
814 try:
815 del self._callbacks[pid]
816 return True
817 except KeyError:
818 return False
819
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820 def _do_waitpid_all(self):
821 # Because of signal coalescing, we must keep calling waitpid() as
822 # long as we're able to reap a child.
823 while True:
824 try:
825 pid, status = os.waitpid(-1, os.WNOHANG)
826 except ChildProcessError:
827 # No more child processes exist.
828 return
829 else:
830 if pid == 0:
831 # A child process is still alive.
832 return
833
834 returncode = self._compute_returncode(status)
835
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800836 with self._lock:
837 try:
838 callback, args = self._callbacks.pop(pid)
839 except KeyError:
840 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800841 if self._forks:
842 # It may not be registered yet.
843 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200844 if self._loop.get_debug():
845 logger.debug('unknown process %s exited '
846 'with returncode %s',
847 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800849 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200850 else:
851 if self._loop.get_debug():
852 logger.debug('process %s exited with returncode %s',
853 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800854
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800855 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856 logger.warning(
857 "Caught subprocess termination from unknown pid: "
858 "%d -> %d", pid, returncode)
859 else:
860 callback(pid, returncode, *args)
861
862
863class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
864 """XXX"""
865 _loop_factory = _UnixSelectorEventLoop
866
867 def __init__(self):
868 super().__init__()
869 self._watcher = None
870
871 def _init_watcher(self):
872 with events._lock:
873 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800874 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800875 if isinstance(threading.current_thread(),
876 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800877 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800878
879 def set_event_loop(self, loop):
880 """Set the event loop.
881
882 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800883 .set_event_loop() from the main thread will call .attach_loop(loop) on
884 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885 """
886
887 super().set_event_loop(loop)
888
889 if self._watcher is not None and \
890 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800891 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892
893 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200894 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800895
896 If not yet set, a SafeChildWatcher object is automatically created.
897 """
898 if self._watcher is None:
899 self._init_watcher()
900
901 return self._watcher
902
903 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200904 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800905
906 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
907
908 if self._watcher is not None:
909 self._watcher.close()
910
911 self._watcher = watcher
912
913SelectorEventLoop = _UnixSelectorEventLoop
914DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy