blob: c95ad488c00202771c54835425c2f26365fea895 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
11
12
Guido van Rossum59691282013-10-30 14:52:03 -070013from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import constants
15from . import events
16from . import protocols
17from . import selector_events
18from . import tasks
19from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
23__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR']
24
25STDIN = 0
26STDOUT = 1
27STDERR = 2
28
29
30if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
34class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
35 """Unix event loop
36
37 Adds signal handling to SelectorEventLoop
38 """
39
40 def __init__(self, selector=None):
41 super().__init__(selector)
42 self._signal_handlers = {}
43 self._subprocesses = {}
44
45 def _socketpair(self):
46 return socket.socketpair()
47
48 def close(self):
49 handler = self._signal_handlers.get(signal.SIGCHLD)
50 if handler is not None:
51 self.remove_signal_handler(signal.SIGCHLD)
52 super().close()
53
54 def add_signal_handler(self, sig, callback, *args):
55 """Add a handler for a signal. UNIX only.
56
57 Raise ValueError if the signal number is invalid or uncatchable.
58 Raise RuntimeError if there is a problem setting up the handler.
59 """
60 self._check_signal(sig)
61 try:
62 # set_wakeup_fd() raises ValueError if this is not the
63 # main thread. By calling it early we ensure that an
64 # event loop running in another thread cannot add a signal
65 # handler.
66 signal.set_wakeup_fd(self._csock.fileno())
67 except ValueError as exc:
68 raise RuntimeError(str(exc))
69
70 handle = events.make_handle(callback, args)
71 self._signal_handlers[sig] = handle
72
73 try:
74 signal.signal(sig, self._handle_signal)
75 except OSError as exc:
76 del self._signal_handlers[sig]
77 if not self._signal_handlers:
78 try:
79 signal.set_wakeup_fd(-1)
80 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070081 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082
83 if exc.errno == errno.EINVAL:
84 raise RuntimeError('sig {} cannot be caught'.format(sig))
85 else:
86 raise
87
88 def _handle_signal(self, sig, arg):
89 """Internal helper that is the actual signal handler."""
90 handle = self._signal_handlers.get(sig)
91 if handle is None:
92 return # Assume it's some race condition.
93 if handle._cancelled:
94 self.remove_signal_handler(sig) # Remove it properly.
95 else:
96 self._add_callback_signalsafe(handle)
97
98 def remove_signal_handler(self, sig):
99 """Remove a handler for a signal. UNIX only.
100
101 Return True if a signal handler was removed, False if not.
102 """
103 self._check_signal(sig)
104 try:
105 del self._signal_handlers[sig]
106 except KeyError:
107 return False
108
109 if sig == signal.SIGINT:
110 handler = signal.default_int_handler
111 else:
112 handler = signal.SIG_DFL
113
114 try:
115 signal.signal(sig, handler)
116 except OSError as exc:
117 if exc.errno == errno.EINVAL:
118 raise RuntimeError('sig {} cannot be caught'.format(sig))
119 else:
120 raise
121
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 return True
129
130 def _check_signal(self, sig):
131 """Internal helper to validate a signal.
132
133 Raise ValueError if the signal number is invalid or uncatchable.
134 Raise RuntimeError if there is a problem setting up the handler.
135 """
136 if not isinstance(sig, int):
137 raise TypeError('sig must be an int, not {!r}'.format(sig))
138
139 if not (1 <= sig < signal.NSIG):
140 raise ValueError(
141 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
142
143 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
144 extra=None):
145 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
150
151 @tasks.coroutine
152 def _make_subprocess_transport(self, protocol, args, shell,
153 stdin, stdout, stderr, bufsize,
154 extra=None, **kwargs):
155 self._reg_sigchld()
156 transp = _UnixSubprocessTransport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
158 extra=None, **kwargs)
159 self._subprocesses[transp.get_pid()] = transp
160 yield from transp._post_init()
161 return transp
162
163 def _reg_sigchld(self):
164 if signal.SIGCHLD not in self._signal_handlers:
165 self.add_signal_handler(signal.SIGCHLD, self._sig_chld)
166
167 def _sig_chld(self):
168 try:
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700169 # Because of signal coalescing, we must keep calling waitpid() as
170 # long as we're able to reap a child.
Charles-François Natali5121deb2013-10-20 23:23:44 +0200171 while True:
172 try:
173 pid, status = os.waitpid(-1, os.WNOHANG)
174 except ChildProcessError:
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700175 break # No more child processes exist.
Charles-François Natali5121deb2013-10-20 23:23:44 +0200176 if pid == 0:
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700177 break # All remaining child processes are still alive.
Charles-François Natali5121deb2013-10-20 23:23:44 +0200178 elif os.WIFSIGNALED(status):
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700179 # A child process died because of a signal.
Charles-François Natali5121deb2013-10-20 23:23:44 +0200180 returncode = -os.WTERMSIG(status)
181 elif os.WIFEXITED(status):
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700182 # A child process exited (e.g. sys.exit()).
Charles-François Natali5121deb2013-10-20 23:23:44 +0200183 returncode = os.WEXITSTATUS(status)
184 else:
Guido van Rossum8da15cc2013-10-21 15:00:44 -0700185 # A child exited, but we don't understand its status.
186 # This shouldn't happen, but if it does, let's just
187 # return that status; perhaps that helps debug it.
188 returncode = status
Charles-François Natali5121deb2013-10-20 23:23:44 +0200189 transp = self._subprocesses.get(pid)
190 if transp is not None:
191 transp._process_exited(returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700193 logger.exception('Unknown exception in SIGCHLD handler')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700194
195 def _subprocess_closed(self, transport):
196 pid = transport.get_pid()
197 self._subprocesses.pop(pid, None)
198
199
200def _set_nonblocking(fd):
201 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
202 flags = flags | os.O_NONBLOCK
203 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
204
205
206class _UnixReadPipeTransport(transports.ReadTransport):
207
208 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
209
210 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
211 super().__init__(extra)
212 self._extra['pipe'] = pipe
213 self._loop = loop
214 self._pipe = pipe
215 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700216 mode = os.fstat(self._fileno).st_mode
217 if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
218 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 _set_nonblocking(self._fileno)
220 self._protocol = protocol
221 self._closing = False
222 self._loop.add_reader(self._fileno, self._read_ready)
223 self._loop.call_soon(self._protocol.connection_made, self)
224 if waiter is not None:
225 self._loop.call_soon(waiter.set_result, None)
226
227 def _read_ready(self):
228 try:
229 data = os.read(self._fileno, self.max_size)
230 except (BlockingIOError, InterruptedError):
231 pass
232 except OSError as exc:
233 self._fatal_error(exc)
234 else:
235 if data:
236 self._protocol.data_received(data)
237 else:
238 self._closing = True
239 self._loop.remove_reader(self._fileno)
240 self._loop.call_soon(self._protocol.eof_received)
241 self._loop.call_soon(self._call_connection_lost, None)
242
Guido van Rossum57497ad2013-10-18 07:58:20 -0700243 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700244 self._loop.remove_reader(self._fileno)
245
Guido van Rossum57497ad2013-10-18 07:58:20 -0700246 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 self._loop.add_reader(self._fileno, self._read_ready)
248
249 def close(self):
250 if not self._closing:
251 self._close(None)
252
253 def _fatal_error(self, exc):
254 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700255 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 self._close(exc)
257
258 def _close(self, exc):
259 self._closing = True
260 self._loop.remove_reader(self._fileno)
261 self._loop.call_soon(self._call_connection_lost, exc)
262
263 def _call_connection_lost(self, exc):
264 try:
265 self._protocol.connection_lost(exc)
266 finally:
267 self._pipe.close()
268 self._pipe = None
269 self._protocol = None
270 self._loop = None
271
272
273class _UnixWritePipeTransport(transports.WriteTransport):
274
275 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
276 super().__init__(extra)
277 self._extra['pipe'] = pipe
278 self._loop = loop
279 self._pipe = pipe
280 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700281 mode = os.fstat(self._fileno).st_mode
282 is_socket = stat.S_ISSOCK(mode)
283 is_pipe = stat.S_ISFIFO(mode)
284 if not (is_socket or is_pipe):
285 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 _set_nonblocking(self._fileno)
287 self._protocol = protocol
288 self._buffer = []
289 self._conn_lost = 0
290 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700291
292 # On AIX, the reader trick only works for sockets.
293 # On other platforms it works for pipes and sockets.
294 # (Exception: OS X 10.4? Issue #19294.)
295 if is_socket or not sys.platform.startswith("aix"):
296 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297
298 self._loop.call_soon(self._protocol.connection_made, self)
299 if waiter is not None:
300 self._loop.call_soon(waiter.set_result, None)
301
302 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700303 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 self._close()
305
306 def write(self, data):
307 assert isinstance(data, bytes), repr(data)
308 if not data:
309 return
310
311 if self._conn_lost or self._closing:
312 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700313 logger.warning('pipe closed by peer or '
314 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 self._conn_lost += 1
316 return
317
318 if not self._buffer:
319 # Attempt to send it right away first.
320 try:
321 n = os.write(self._fileno, data)
322 except (BlockingIOError, InterruptedError):
323 n = 0
324 except Exception as exc:
325 self._conn_lost += 1
326 self._fatal_error(exc)
327 return
328 if n == len(data):
329 return
330 elif n > 0:
331 data = data[n:]
332 self._loop.add_writer(self._fileno, self._write_ready)
333
334 self._buffer.append(data)
335
336 def _write_ready(self):
337 data = b''.join(self._buffer)
338 assert data, 'Data should not be empty'
339
340 self._buffer.clear()
341 try:
342 n = os.write(self._fileno, data)
343 except (BlockingIOError, InterruptedError):
344 self._buffer.append(data)
345 except Exception as exc:
346 self._conn_lost += 1
347 # Remove writer here, _fatal_error() doesn't it
348 # because _buffer is empty.
349 self._loop.remove_writer(self._fileno)
350 self._fatal_error(exc)
351 else:
352 if n == len(data):
353 self._loop.remove_writer(self._fileno)
354 if self._closing:
355 self._loop.remove_reader(self._fileno)
356 self._call_connection_lost(None)
357 return
358 elif n > 0:
359 data = data[n:]
360
361 self._buffer.append(data) # Try again later.
362
363 def can_write_eof(self):
364 return True
365
366 # TODO: Make the relationships between write_eof(), close(),
367 # abort(), _fatal_error() and _close() more straightforward.
368
369 def write_eof(self):
370 if self._closing:
371 return
372 assert self._pipe
373 self._closing = True
374 if not self._buffer:
375 self._loop.remove_reader(self._fileno)
376 self._loop.call_soon(self._call_connection_lost, None)
377
378 def close(self):
379 if not self._closing:
380 # write_eof is all what we needed to close the write pipe
381 self.write_eof()
382
383 def abort(self):
384 self._close(None)
385
386 def _fatal_error(self, exc):
387 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700388 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 self._close(exc)
390
391 def _close(self, exc=None):
392 self._closing = True
393 if self._buffer:
394 self._loop.remove_writer(self._fileno)
395 self._buffer.clear()
396 self._loop.remove_reader(self._fileno)
397 self._loop.call_soon(self._call_connection_lost, exc)
398
399 def _call_connection_lost(self, exc):
400 try:
401 self._protocol.connection_lost(exc)
402 finally:
403 self._pipe.close()
404 self._pipe = None
405 self._protocol = None
406 self._loop = None
407
408
Guido van Rossum59691282013-10-30 14:52:03 -0700409class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
Guido van Rossum59691282013-10-30 14:52:03 -0700411 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700412 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700414 # Use a socket pair for stdin, since not all platforms
415 # support selecting read events on the write end of a
416 # socket (which we use in order to detect closing of the
417 # other end). Notably this is needed on AIX, and works
418 # just fine on other platforms.
419 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 self._proc = subprocess.Popen(
421 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
422 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700423 if stdin_w is not None:
424 stdin.close()
425 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)