blob: e49212e5ea08bf51c28186b505b238d4ffd99757 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Guido van Rossume36fcde2014-11-14 11:45:47 -080070 if coroutines.iscoroutinefunction(callback):
71 raise TypeError("coroutines cannot be used with call_soon()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 self._check_signal(sig)
73 try:
74 # set_wakeup_fd() raises ValueError if this is not the
75 # main thread. By calling it early we ensure that an
76 # event loop running in another thread cannot add a signal
77 # handler.
78 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020079 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 raise RuntimeError(str(exc))
81
Yury Selivanov569efa22014-02-18 18:02:19 -050082 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 self._signal_handlers[sig] = handle
84
85 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020086 # Register a dummy signal handler to ask Python to write the signal
87 # number in the wakup file descriptor. _process_self_data() will
88 # read signal numbers from this file descriptor to handle signals.
89 signal.signal(sig, _sighandler_noop)
90
Charles-François Natali74e7cf32013-12-05 22:47:19 +010091 # Set SA_RESTART to limit EINTR occurrences.
92 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 except OSError as exc:
94 del self._signal_handlers[sig]
95 if not self._signal_handlers:
96 try:
97 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020098 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070099 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
101 if exc.errno == errno.EINVAL:
102 raise RuntimeError('sig {} cannot be caught'.format(sig))
103 else:
104 raise
105
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200106 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107 """Internal helper that is the actual signal handler."""
108 handle = self._signal_handlers.get(sig)
109 if handle is None:
110 return # Assume it's some race condition.
111 if handle._cancelled:
112 self.remove_signal_handler(sig) # Remove it properly.
113 else:
114 self._add_callback_signalsafe(handle)
115
116 def remove_signal_handler(self, sig):
117 """Remove a handler for a signal. UNIX only.
118
119 Return True if a signal handler was removed, False if not.
120 """
121 self._check_signal(sig)
122 try:
123 del self._signal_handlers[sig]
124 except KeyError:
125 return False
126
127 if sig == signal.SIGINT:
128 handler = signal.default_int_handler
129 else:
130 handler = signal.SIG_DFL
131
132 try:
133 signal.signal(sig, handler)
134 except OSError as exc:
135 if exc.errno == errno.EINVAL:
136 raise RuntimeError('sig {} cannot be caught'.format(sig))
137 else:
138 raise
139
140 if not self._signal_handlers:
141 try:
142 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200143 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700144 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145
146 return True
147
148 def _check_signal(self, sig):
149 """Internal helper to validate a signal.
150
151 Raise ValueError if the signal number is invalid or uncatchable.
152 Raise RuntimeError if there is a problem setting up the handler.
153 """
154 if not isinstance(sig, int):
155 raise TypeError('sig must be an int, not {!r}'.format(sig))
156
157 if not (1 <= sig < signal.NSIG):
158 raise ValueError(
159 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
160
161 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
162 extra=None):
163 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
164
165 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
166 extra=None):
167 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
168
Victor Stinnerf951d282014-06-29 00:46:45 +0200169 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 def _make_subprocess_transport(self, protocol, args, shell,
171 stdin, stdout, stderr, bufsize,
172 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800173 with events.get_child_watcher() as watcher:
174 transp = _UnixSubprocessTransport(self, protocol, args, shell,
175 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800176 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800177 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800178 watcher.add_child_handler(transp.get_pid(),
179 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800180
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 return transp
182
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800183 def _child_watcher_callback(self, pid, returncode, transp):
184 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
Victor Stinnerf951d282014-06-29 00:46:45 +0200186 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 def create_unix_connection(self, protocol_factory, path, *,
188 ssl=None, sock=None,
189 server_hostname=None):
190 assert server_hostname is None or isinstance(server_hostname, str)
191 if ssl:
192 if server_hostname is None:
193 raise ValueError(
194 'you have to pass server_hostname when using ssl')
195 else:
196 if server_hostname is not None:
197 raise ValueError('server_hostname is only meaningful with ssl')
198
199 if path is not None:
200 if sock is not None:
201 raise ValueError(
202 'path and sock can not be specified at the same time')
203
Victor Stinner79a29522014-02-19 01:45:59 +0100204 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500205 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500206 sock.setblocking(False)
207 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100208 except:
209 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500210 raise
211
212 else:
213 if sock is None:
214 raise ValueError('no path and sock were specified')
215 sock.setblocking(False)
216
217 transport, protocol = yield from self._create_connection_transport(
218 sock, protocol_factory, ssl, server_hostname)
219 return transport, protocol
220
Victor Stinnerf951d282014-06-29 00:46:45 +0200221 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500222 def create_unix_server(self, protocol_factory, path=None, *,
223 sock=None, backlog=100, ssl=None):
224 if isinstance(ssl, bool):
225 raise TypeError('ssl argument must be an SSLContext or None')
226
227 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200228 if sock is not None:
229 raise ValueError(
230 'path and sock can not be specified at the same time')
231
Yury Selivanovb057c522014-02-18 12:15:06 -0500232 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
233
234 try:
235 sock.bind(path)
236 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100237 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500238 if exc.errno == errno.EADDRINUSE:
239 # Let's improve the error message by adding
240 # with what exact address it occurs.
241 msg = 'Address {!r} is already in use'.format(path)
242 raise OSError(errno.EADDRINUSE, msg) from None
243 else:
244 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200245 except:
246 sock.close()
247 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500248 else:
249 if sock is None:
250 raise ValueError(
251 'path was not specified, and no sock specified')
252
253 if sock.family != socket.AF_UNIX:
254 raise ValueError(
255 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
256
257 server = base_events.Server(self, [sock])
258 sock.listen(backlog)
259 sock.setblocking(False)
260 self._start_serving(protocol_factory, sock, ssl, server)
261 return server
262
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200264if hasattr(os, 'set_blocking'):
265 def _set_nonblocking(fd):
266 os.set_blocking(fd, False)
267else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400268 import fcntl
269
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200270 def _set_nonblocking(fd):
271 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
272 flags = flags | os.O_NONBLOCK
273 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
275
276class _UnixReadPipeTransport(transports.ReadTransport):
277
Yury Selivanovdec1a452014-02-18 22:27:48 -0500278 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279
280 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
281 super().__init__(extra)
282 self._extra['pipe'] = pipe
283 self._loop = loop
284 self._pipe = pipe
285 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700286 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800287 if not (stat.S_ISFIFO(mode) or
288 stat.S_ISSOCK(mode) or
289 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700290 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 _set_nonblocking(self._fileno)
292 self._protocol = protocol
293 self._closing = False
294 self._loop.add_reader(self._fileno, self._read_ready)
295 self._loop.call_soon(self._protocol.connection_made, self)
296 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200297 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200298 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299
Victor Stinnere912e652014-07-12 03:11:53 +0200300 def __repr__(self):
301 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
302 if self._pipe is not None:
303 polling = selector_events._test_selector_event(
304 self._loop._selector,
305 self._fileno, selectors.EVENT_READ)
306 if polling:
307 info.append('polling')
308 else:
309 info.append('idle')
310 else:
311 info.append('closed')
312 return '<%s>' % ' '.join(info)
313
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 def _read_ready(self):
315 try:
316 data = os.read(self._fileno, self.max_size)
317 except (BlockingIOError, InterruptedError):
318 pass
319 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100320 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 else:
322 if data:
323 self._protocol.data_received(data)
324 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200325 if self._loop.get_debug():
326 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 self._closing = True
328 self._loop.remove_reader(self._fileno)
329 self._loop.call_soon(self._protocol.eof_received)
330 self._loop.call_soon(self._call_connection_lost, None)
331
Guido van Rossum57497ad2013-10-18 07:58:20 -0700332 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 self._loop.remove_reader(self._fileno)
334
Guido van Rossum57497ad2013-10-18 07:58:20 -0700335 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 self._loop.add_reader(self._fileno, self._read_ready)
337
338 def close(self):
339 if not self._closing:
340 self._close(None)
341
Victor Stinner0ee29c22014-02-19 01:40:41 +0100342 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200344 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
345 if self._loop.get_debug():
346 logger.debug("%r: %s", self, message, exc_info=True)
347 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500348 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100349 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500350 'exception': exc,
351 'transport': self,
352 'protocol': self._protocol,
353 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 self._close(exc)
355
356 def _close(self, exc):
357 self._closing = True
358 self._loop.remove_reader(self._fileno)
359 self._loop.call_soon(self._call_connection_lost, exc)
360
361 def _call_connection_lost(self, exc):
362 try:
363 self._protocol.connection_lost(exc)
364 finally:
365 self._pipe.close()
366 self._pipe = None
367 self._protocol = None
368 self._loop = None
369
370
Yury Selivanov3cb99142014-02-18 18:41:13 -0500371class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800372 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373
374 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100375 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 self._pipe = pipe
378 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700379 mode = os.fstat(self._fileno).st_mode
380 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100381 if not (is_socket or
382 stat.S_ISFIFO(mode) or
383 stat.S_ISCHR(mode)):
384 raise ValueError("Pipe transport is only for "
385 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 _set_nonblocking(self._fileno)
387 self._protocol = protocol
388 self._buffer = []
389 self._conn_lost = 0
390 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700391
392 # On AIX, the reader trick only works for sockets.
393 # On other platforms it works for pipes and sockets.
394 # (Exception: OS X 10.4? Issue #19294.)
395 if is_socket or not sys.platform.startswith("aix"):
396 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397
398 self._loop.call_soon(self._protocol.connection_made, self)
399 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200400 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200401 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402
Victor Stinnere912e652014-07-12 03:11:53 +0200403 def __repr__(self):
404 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
405 if self._pipe is not None:
406 polling = selector_events._test_selector_event(
407 self._loop._selector,
408 self._fileno, selectors.EVENT_WRITE)
409 if polling:
410 info.append('polling')
411 else:
412 info.append('idle')
413
414 bufsize = self.get_write_buffer_size()
415 info.append('bufsize=%s' % bufsize)
416 else:
417 info.append('closed')
418 return '<%s>' % ' '.join(info)
419
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800420 def get_write_buffer_size(self):
421 return sum(len(data) for data in self._buffer)
422
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700423 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700424 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200425 if self._loop.get_debug():
426 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100427 if self._buffer:
428 self._close(BrokenPipeError())
429 else:
430 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431
432 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800433 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
434 if isinstance(data, bytearray):
435 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 if not data:
437 return
438
439 if self._conn_lost or self._closing:
440 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700441 logger.warning('pipe closed by peer or '
442 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 self._conn_lost += 1
444 return
445
446 if not self._buffer:
447 # Attempt to send it right away first.
448 try:
449 n = os.write(self._fileno, data)
450 except (BlockingIOError, InterruptedError):
451 n = 0
452 except Exception as exc:
453 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100454 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 return
456 if n == len(data):
457 return
458 elif n > 0:
459 data = data[n:]
460 self._loop.add_writer(self._fileno, self._write_ready)
461
462 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800463 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
465 def _write_ready(self):
466 data = b''.join(self._buffer)
467 assert data, 'Data should not be empty'
468
469 self._buffer.clear()
470 try:
471 n = os.write(self._fileno, data)
472 except (BlockingIOError, InterruptedError):
473 self._buffer.append(data)
474 except Exception as exc:
475 self._conn_lost += 1
476 # Remove writer here, _fatal_error() doesn't it
477 # because _buffer is empty.
478 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100479 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 else:
481 if n == len(data):
482 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800483 self._maybe_resume_protocol() # May append to buffer.
484 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 self._loop.remove_reader(self._fileno)
486 self._call_connection_lost(None)
487 return
488 elif n > 0:
489 data = data[n:]
490
491 self._buffer.append(data) # Try again later.
492
493 def can_write_eof(self):
494 return True
495
496 # TODO: Make the relationships between write_eof(), close(),
497 # abort(), _fatal_error() and _close() more straightforward.
498
499 def write_eof(self):
500 if self._closing:
501 return
502 assert self._pipe
503 self._closing = True
504 if not self._buffer:
505 self._loop.remove_reader(self._fileno)
506 self._loop.call_soon(self._call_connection_lost, None)
507
508 def close(self):
509 if not self._closing:
510 # write_eof is all what we needed to close the write pipe
511 self.write_eof()
512
513 def abort(self):
514 self._close(None)
515
Victor Stinner0ee29c22014-02-19 01:40:41 +0100516 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200518 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
519 if self._loop.get_debug():
520 logger.debug("%r: %s", self, message, exc_info=True)
521 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500522 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100523 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500524 'exception': exc,
525 'transport': self,
526 'protocol': self._protocol,
527 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 self._close(exc)
529
530 def _close(self, exc=None):
531 self._closing = True
532 if self._buffer:
533 self._loop.remove_writer(self._fileno)
534 self._buffer.clear()
535 self._loop.remove_reader(self._fileno)
536 self._loop.call_soon(self._call_connection_lost, exc)
537
538 def _call_connection_lost(self, exc):
539 try:
540 self._protocol.connection_lost(exc)
541 finally:
542 self._pipe.close()
543 self._pipe = None
544 self._protocol = None
545 self._loop = None
546
547
Guido van Rossum59691282013-10-30 14:52:03 -0700548class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549
Guido van Rossum59691282013-10-30 14:52:03 -0700550 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700551 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700553 # Use a socket pair for stdin, since not all platforms
554 # support selecting read events on the write end of a
555 # socket (which we use in order to detect closing of the
556 # other end). Notably this is needed on AIX, and works
557 # just fine on other platforms.
558 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 self._proc = subprocess.Popen(
560 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
561 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700562 if stdin_w is not None:
563 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200564 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800565
566
567class AbstractChildWatcher:
568 """Abstract base class for monitoring child processes.
569
570 Objects derived from this class monitor a collection of subprocesses and
571 report their termination or interruption by a signal.
572
573 New callbacks are registered with .add_child_handler(). Starting a new
574 process must be done within a 'with' block to allow the watcher to suspend
575 its activity until the new process if fully registered (this is needed to
576 prevent a race condition in some implementations).
577
578 Example:
579 with watcher:
580 proc = subprocess.Popen("sleep 1")
581 watcher.add_child_handler(proc.pid, callback)
582
583 Notes:
584 Implementations of this class must be thread-safe.
585
586 Since child watcher objects may catch the SIGCHLD signal and call
587 waitpid(-1), there should be only one active object per process.
588 """
589
590 def add_child_handler(self, pid, callback, *args):
591 """Register a new child handler.
592
593 Arrange for callback(pid, returncode, *args) to be called when
594 process 'pid' terminates. Specifying another callback for the same
595 process replaces the previous handler.
596
Victor Stinneracdb7822014-07-14 18:33:40 +0200597 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800598 """
599 raise NotImplementedError()
600
601 def remove_child_handler(self, pid):
602 """Removes the handler for process 'pid'.
603
604 The function returns True if the handler was successfully removed,
605 False if there was nothing to remove."""
606
607 raise NotImplementedError()
608
Guido van Rossum2bcae702013-11-13 15:50:08 -0800609 def attach_loop(self, loop):
610 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611
Guido van Rossum2bcae702013-11-13 15:50:08 -0800612 If the watcher was previously attached to an event loop, then it is
613 first detached before attaching to the new loop.
614
615 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800616 """
617 raise NotImplementedError()
618
619 def close(self):
620 """Close the watcher.
621
622 This must be called to make sure that any underlying resource is freed.
623 """
624 raise NotImplementedError()
625
626 def __enter__(self):
627 """Enter the watcher's context and allow starting new processes
628
629 This function must return self"""
630 raise NotImplementedError()
631
632 def __exit__(self, a, b, c):
633 """Exit the watcher's context"""
634 raise NotImplementedError()
635
636
637class BaseChildWatcher(AbstractChildWatcher):
638
Guido van Rossum2bcae702013-11-13 15:50:08 -0800639 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800640 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641
642 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800643 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800644
645 def _do_waitpid(self, expected_pid):
646 raise NotImplementedError()
647
648 def _do_waitpid_all(self):
649 raise NotImplementedError()
650
Guido van Rossum2bcae702013-11-13 15:50:08 -0800651 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800652 assert loop is None or isinstance(loop, events.AbstractEventLoop)
653
654 if self._loop is not None:
655 self._loop.remove_signal_handler(signal.SIGCHLD)
656
657 self._loop = loop
658 if loop is not None:
659 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
660
661 # Prevent a race condition in case a child terminated
662 # during the switch.
663 self._do_waitpid_all()
664
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800665 def _sig_chld(self):
666 try:
667 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500668 except Exception as exc:
669 # self._loop should always be available here
670 # as '_sig_chld' is added as a signal handler
671 # in 'attach_loop'
672 self._loop.call_exception_handler({
673 'message': 'Unknown exception in SIGCHLD handler',
674 'exception': exc,
675 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800676
677 def _compute_returncode(self, status):
678 if os.WIFSIGNALED(status):
679 # The child process died because of a signal.
680 return -os.WTERMSIG(status)
681 elif os.WIFEXITED(status):
682 # The child process exited (e.g sys.exit()).
683 return os.WEXITSTATUS(status)
684 else:
685 # The child exited, but we don't understand its status.
686 # This shouldn't happen, but if it does, let's just
687 # return that status; perhaps that helps debug it.
688 return status
689
690
691class SafeChildWatcher(BaseChildWatcher):
692 """'Safe' child watcher implementation.
693
694 This implementation avoids disrupting other code spawning processes by
695 polling explicitly each process in the SIGCHLD handler instead of calling
696 os.waitpid(-1).
697
698 This is a safe solution but it has a significant overhead when handling a
699 big number of children (O(n) each time SIGCHLD is raised)
700 """
701
Guido van Rossum2bcae702013-11-13 15:50:08 -0800702 def __init__(self):
703 super().__init__()
704 self._callbacks = {}
705
706 def close(self):
707 self._callbacks.clear()
708 super().close()
709
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800710 def __enter__(self):
711 return self
712
713 def __exit__(self, a, b, c):
714 pass
715
716 def add_child_handler(self, pid, callback, *args):
717 self._callbacks[pid] = callback, args
718
719 # Prevent a race condition in case the child is already terminated.
720 self._do_waitpid(pid)
721
Guido van Rossum2bcae702013-11-13 15:50:08 -0800722 def remove_child_handler(self, pid):
723 try:
724 del self._callbacks[pid]
725 return True
726 except KeyError:
727 return False
728
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800729 def _do_waitpid_all(self):
730
731 for pid in list(self._callbacks):
732 self._do_waitpid(pid)
733
734 def _do_waitpid(self, expected_pid):
735 assert expected_pid > 0
736
737 try:
738 pid, status = os.waitpid(expected_pid, os.WNOHANG)
739 except ChildProcessError:
740 # The child process is already reaped
741 # (may happen if waitpid() is called elsewhere).
742 pid = expected_pid
743 returncode = 255
744 logger.warning(
745 "Unknown child process pid %d, will report returncode 255",
746 pid)
747 else:
748 if pid == 0:
749 # The child process is still alive.
750 return
751
752 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200753 if self._loop.get_debug():
754 logger.debug('process %s exited with returncode %s',
755 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800756
757 try:
758 callback, args = self._callbacks.pop(pid)
759 except KeyError: # pragma: no cover
760 # May happen if .remove_child_handler() is called
761 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200762 if self._loop.get_debug():
763 logger.warning("Child watcher got an unexpected pid: %r",
764 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800765 else:
766 callback(pid, returncode, *args)
767
768
769class FastChildWatcher(BaseChildWatcher):
770 """'Fast' child watcher implementation.
771
772 This implementation reaps every terminated processes by calling
773 os.waitpid(-1) directly, possibly breaking other code spawning processes
774 and waiting for their termination.
775
776 There is no noticeable overhead when handling a big number of children
777 (O(1) each time a child terminates).
778 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800779 def __init__(self):
780 super().__init__()
781 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800782 self._lock = threading.Lock()
783 self._zombies = {}
784 self._forks = 0
785
786 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800787 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800788 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800789 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800790
791 def __enter__(self):
792 with self._lock:
793 self._forks += 1
794
795 return self
796
797 def __exit__(self, a, b, c):
798 with self._lock:
799 self._forks -= 1
800
801 if self._forks or not self._zombies:
802 return
803
804 collateral_victims = str(self._zombies)
805 self._zombies.clear()
806
807 logger.warning(
808 "Caught subprocesses termination from unknown pids: %s",
809 collateral_victims)
810
811 def add_child_handler(self, pid, callback, *args):
812 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800813 with self._lock:
814 try:
815 returncode = self._zombies.pop(pid)
816 except KeyError:
817 # The child is running.
818 self._callbacks[pid] = callback, args
819 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800821 # The child is dead already. We can fire the callback.
822 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800823
Guido van Rossum2bcae702013-11-13 15:50:08 -0800824 def remove_child_handler(self, pid):
825 try:
826 del self._callbacks[pid]
827 return True
828 except KeyError:
829 return False
830
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800831 def _do_waitpid_all(self):
832 # Because of signal coalescing, we must keep calling waitpid() as
833 # long as we're able to reap a child.
834 while True:
835 try:
836 pid, status = os.waitpid(-1, os.WNOHANG)
837 except ChildProcessError:
838 # No more child processes exist.
839 return
840 else:
841 if pid == 0:
842 # A child process is still alive.
843 return
844
845 returncode = self._compute_returncode(status)
846
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800847 with self._lock:
848 try:
849 callback, args = self._callbacks.pop(pid)
850 except KeyError:
851 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800852 if self._forks:
853 # It may not be registered yet.
854 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200855 if self._loop.get_debug():
856 logger.debug('unknown process %s exited '
857 'with returncode %s',
858 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800860 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200861 else:
862 if self._loop.get_debug():
863 logger.debug('process %s exited with returncode %s',
864 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800866 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800867 logger.warning(
868 "Caught subprocess termination from unknown pid: "
869 "%d -> %d", pid, returncode)
870 else:
871 callback(pid, returncode, *args)
872
873
874class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
875 """XXX"""
876 _loop_factory = _UnixSelectorEventLoop
877
878 def __init__(self):
879 super().__init__()
880 self._watcher = None
881
882 def _init_watcher(self):
883 with events._lock:
884 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800885 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800886 if isinstance(threading.current_thread(),
887 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800888 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800889
890 def set_event_loop(self, loop):
891 """Set the event loop.
892
893 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800894 .set_event_loop() from the main thread will call .attach_loop(loop) on
895 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800896 """
897
898 super().set_event_loop(loop)
899
900 if self._watcher is not None and \
901 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800902 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903
904 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200905 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800906
907 If not yet set, a SafeChildWatcher object is automatically created.
908 """
909 if self._watcher is None:
910 self._init_watcher()
911
912 return self._watcher
913
914 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200915 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800916
917 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
918
919 if self._watcher is not None:
920 self._watcher.close()
921
922 self._watcher = watcher
923
924SelectorEventLoop = _UnixSelectorEventLoop
925DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy