blob: a234f4fac13a190706fb2ab30563c0bab8dd0485 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
3import collections
4import errno
5import fcntl
6import functools
7import os
8import signal
9import socket
10import stat
11import subprocess
12import sys
13
14
15from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
24__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR']
25
26STDIN = 0
27STDOUT = 1
28STDERR = 2
29
30
31if sys.platform == 'win32': # pragma: no cover
32 raise ImportError('Signals are not really supported on Windows')
33
34
35class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
36 """Unix event loop
37
38 Adds signal handling to SelectorEventLoop
39 """
40
41 def __init__(self, selector=None):
42 super().__init__(selector)
43 self._signal_handlers = {}
44 self._subprocesses = {}
45
46 def _socketpair(self):
47 return socket.socketpair()
48
49 def close(self):
50 handler = self._signal_handlers.get(signal.SIGCHLD)
51 if handler is not None:
52 self.remove_signal_handler(signal.SIGCHLD)
53 super().close()
54
55 def add_signal_handler(self, sig, callback, *args):
56 """Add a handler for a signal. UNIX only.
57
58 Raise ValueError if the signal number is invalid or uncatchable.
59 Raise RuntimeError if there is a problem setting up the handler.
60 """
61 self._check_signal(sig)
62 try:
63 # set_wakeup_fd() raises ValueError if this is not the
64 # main thread. By calling it early we ensure that an
65 # event loop running in another thread cannot add a signal
66 # handler.
67 signal.set_wakeup_fd(self._csock.fileno())
68 except ValueError as exc:
69 raise RuntimeError(str(exc))
70
71 handle = events.make_handle(callback, args)
72 self._signal_handlers[sig] = handle
73
74 try:
75 signal.signal(sig, self._handle_signal)
76 except OSError as exc:
77 del self._signal_handlers[sig]
78 if not self._signal_handlers:
79 try:
80 signal.set_wakeup_fd(-1)
81 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070082 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083
84 if exc.errno == errno.EINVAL:
85 raise RuntimeError('sig {} cannot be caught'.format(sig))
86 else:
87 raise
88
89 def _handle_signal(self, sig, arg):
90 """Internal helper that is the actual signal handler."""
91 handle = self._signal_handlers.get(sig)
92 if handle is None:
93 return # Assume it's some race condition.
94 if handle._cancelled:
95 self.remove_signal_handler(sig) # Remove it properly.
96 else:
97 self._add_callback_signalsafe(handle)
98
99 def remove_signal_handler(self, sig):
100 """Remove a handler for a signal. UNIX only.
101
102 Return True if a signal handler was removed, False if not.
103 """
104 self._check_signal(sig)
105 try:
106 del self._signal_handlers[sig]
107 except KeyError:
108 return False
109
110 if sig == signal.SIGINT:
111 handler = signal.default_int_handler
112 else:
113 handler = signal.SIG_DFL
114
115 try:
116 signal.signal(sig, handler)
117 except OSError as exc:
118 if exc.errno == errno.EINVAL:
119 raise RuntimeError('sig {} cannot be caught'.format(sig))
120 else:
121 raise
122
123 if not self._signal_handlers:
124 try:
125 signal.set_wakeup_fd(-1)
126 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700127 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
129 return True
130
131 def _check_signal(self, sig):
132 """Internal helper to validate a signal.
133
134 Raise ValueError if the signal number is invalid or uncatchable.
135 Raise RuntimeError if there is a problem setting up the handler.
136 """
137 if not isinstance(sig, int):
138 raise TypeError('sig must be an int, not {!r}'.format(sig))
139
140 if not (1 <= sig < signal.NSIG):
141 raise ValueError(
142 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
143
144 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
145 extra=None):
146 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
147
148 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
149 extra=None):
150 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
151
152 @tasks.coroutine
153 def _make_subprocess_transport(self, protocol, args, shell,
154 stdin, stdout, stderr, bufsize,
155 extra=None, **kwargs):
156 self._reg_sigchld()
157 transp = _UnixSubprocessTransport(self, protocol, args, shell,
158 stdin, stdout, stderr, bufsize,
159 extra=None, **kwargs)
160 self._subprocesses[transp.get_pid()] = transp
161 yield from transp._post_init()
162 return transp
163
164 def _reg_sigchld(self):
165 if signal.SIGCHLD not in self._signal_handlers:
166 self.add_signal_handler(signal.SIGCHLD, self._sig_chld)
167
168 def _sig_chld(self):
169 try:
170 try:
171 pid, status = os.waitpid(0, os.WNOHANG)
172 except ChildProcessError:
173 return
174 if pid == 0:
175 self.call_soon(self._sig_chld)
176 return
177 elif os.WIFSIGNALED(status):
178 returncode = -os.WTERMSIG(status)
179 elif os.WIFEXITED(status):
180 returncode = os.WEXITSTATUS(status)
181 else:
182 self.call_soon(self._sig_chld)
183 return
184 transp = self._subprocesses.get(pid)
185 if transp is not None:
186 transp._process_exited(returncode)
187 except Exception:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700188 logger.exception('Unknown exception in SIGCHLD handler')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
190 def _subprocess_closed(self, transport):
191 pid = transport.get_pid()
192 self._subprocesses.pop(pid, None)
193
194
195def _set_nonblocking(fd):
196 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
197 flags = flags | os.O_NONBLOCK
198 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
199
200
201class _UnixReadPipeTransport(transports.ReadTransport):
202
203 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
204
205 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
206 super().__init__(extra)
207 self._extra['pipe'] = pipe
208 self._loop = loop
209 self._pipe = pipe
210 self._fileno = pipe.fileno()
211 _set_nonblocking(self._fileno)
212 self._protocol = protocol
213 self._closing = False
214 self._loop.add_reader(self._fileno, self._read_ready)
215 self._loop.call_soon(self._protocol.connection_made, self)
216 if waiter is not None:
217 self._loop.call_soon(waiter.set_result, None)
218
219 def _read_ready(self):
220 try:
221 data = os.read(self._fileno, self.max_size)
222 except (BlockingIOError, InterruptedError):
223 pass
224 except OSError as exc:
225 self._fatal_error(exc)
226 else:
227 if data:
228 self._protocol.data_received(data)
229 else:
230 self._closing = True
231 self._loop.remove_reader(self._fileno)
232 self._loop.call_soon(self._protocol.eof_received)
233 self._loop.call_soon(self._call_connection_lost, None)
234
Guido van Rossum57497ad2013-10-18 07:58:20 -0700235 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 self._loop.remove_reader(self._fileno)
237
Guido van Rossum57497ad2013-10-18 07:58:20 -0700238 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 self._loop.add_reader(self._fileno, self._read_ready)
240
241 def close(self):
242 if not self._closing:
243 self._close(None)
244
245 def _fatal_error(self, exc):
246 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700247 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 self._close(exc)
249
250 def _close(self, exc):
251 self._closing = True
252 self._loop.remove_reader(self._fileno)
253 self._loop.call_soon(self._call_connection_lost, exc)
254
255 def _call_connection_lost(self, exc):
256 try:
257 self._protocol.connection_lost(exc)
258 finally:
259 self._pipe.close()
260 self._pipe = None
261 self._protocol = None
262 self._loop = None
263
264
265class _UnixWritePipeTransport(transports.WriteTransport):
266
267 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
268 super().__init__(extra)
269 self._extra['pipe'] = pipe
270 self._loop = loop
271 self._pipe = pipe
272 self._fileno = pipe.fileno()
273 if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode):
274 raise ValueError("Pipe transport is for pipes only.")
275 _set_nonblocking(self._fileno)
276 self._protocol = protocol
277 self._buffer = []
278 self._conn_lost = 0
279 self._closing = False # Set when close() or write_eof() called.
280 self._loop.add_reader(self._fileno, self._read_ready)
281
282 self._loop.call_soon(self._protocol.connection_made, self)
283 if waiter is not None:
284 self._loop.call_soon(waiter.set_result, None)
285
286 def _read_ready(self):
287 # pipe was closed by peer
288 self._close()
289
290 def write(self, data):
291 assert isinstance(data, bytes), repr(data)
292 if not data:
293 return
294
295 if self._conn_lost or self._closing:
296 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700297 logger.warning('pipe closed by peer or '
298 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 self._conn_lost += 1
300 return
301
302 if not self._buffer:
303 # Attempt to send it right away first.
304 try:
305 n = os.write(self._fileno, data)
306 except (BlockingIOError, InterruptedError):
307 n = 0
308 except Exception as exc:
309 self._conn_lost += 1
310 self._fatal_error(exc)
311 return
312 if n == len(data):
313 return
314 elif n > 0:
315 data = data[n:]
316 self._loop.add_writer(self._fileno, self._write_ready)
317
318 self._buffer.append(data)
319
320 def _write_ready(self):
321 data = b''.join(self._buffer)
322 assert data, 'Data should not be empty'
323
324 self._buffer.clear()
325 try:
326 n = os.write(self._fileno, data)
327 except (BlockingIOError, InterruptedError):
328 self._buffer.append(data)
329 except Exception as exc:
330 self._conn_lost += 1
331 # Remove writer here, _fatal_error() doesn't it
332 # because _buffer is empty.
333 self._loop.remove_writer(self._fileno)
334 self._fatal_error(exc)
335 else:
336 if n == len(data):
337 self._loop.remove_writer(self._fileno)
338 if self._closing:
339 self._loop.remove_reader(self._fileno)
340 self._call_connection_lost(None)
341 return
342 elif n > 0:
343 data = data[n:]
344
345 self._buffer.append(data) # Try again later.
346
347 def can_write_eof(self):
348 return True
349
350 # TODO: Make the relationships between write_eof(), close(),
351 # abort(), _fatal_error() and _close() more straightforward.
352
353 def write_eof(self):
354 if self._closing:
355 return
356 assert self._pipe
357 self._closing = True
358 if not self._buffer:
359 self._loop.remove_reader(self._fileno)
360 self._loop.call_soon(self._call_connection_lost, None)
361
362 def close(self):
363 if not self._closing:
364 # write_eof is all what we needed to close the write pipe
365 self.write_eof()
366
367 def abort(self):
368 self._close(None)
369
370 def _fatal_error(self, exc):
371 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700372 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._close(exc)
374
375 def _close(self, exc=None):
376 self._closing = True
377 if self._buffer:
378 self._loop.remove_writer(self._fileno)
379 self._buffer.clear()
380 self._loop.remove_reader(self._fileno)
381 self._loop.call_soon(self._call_connection_lost, exc)
382
383 def _call_connection_lost(self, exc):
384 try:
385 self._protocol.connection_lost(exc)
386 finally:
387 self._pipe.close()
388 self._pipe = None
389 self._protocol = None
390 self._loop = None
391
392
393class _UnixWriteSubprocessPipeProto(protocols.BaseProtocol):
394 pipe = None
395
396 def __init__(self, proc, fd):
397 self.proc = proc
398 self.fd = fd
399 self.connected = False
400 self.disconnected = False
401 proc._pipes[fd] = self
402
403 def connection_made(self, transport):
404 self.connected = True
405 self.pipe = transport
406 self.proc._try_connected()
407
408 def connection_lost(self, exc):
409 self.disconnected = True
410 self.proc._pipe_connection_lost(self.fd, exc)
411
412
413class _UnixReadSubprocessPipeProto(_UnixWriteSubprocessPipeProto,
414 protocols.Protocol):
415
416 def data_received(self, data):
417 self.proc._pipe_data_received(self.fd, data)
418
419 def eof_received(self):
420 pass
421
422
423class _UnixSubprocessTransport(transports.SubprocessTransport):
424
425 def __init__(self, loop, protocol, args, shell,
426 stdin, stdout, stderr, bufsize,
427 extra=None, **kwargs):
428 super().__init__(extra)
429 self._protocol = protocol
430 self._loop = loop
431
432 self._pipes = {}
433 if stdin == subprocess.PIPE:
434 self._pipes[STDIN] = None
435 if stdout == subprocess.PIPE:
436 self._pipes[STDOUT] = None
437 if stderr == subprocess.PIPE:
438 self._pipes[STDERR] = None
439 self._pending_calls = collections.deque()
440 self._finished = False
441 self._returncode = None
442
443 self._proc = subprocess.Popen(
444 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
445 universal_newlines=False, bufsize=bufsize, **kwargs)
446 self._extra['subprocess'] = self._proc
447
448 def close(self):
449 for proto in self._pipes.values():
450 proto.pipe.close()
451 if self._returncode is None:
452 self.terminate()
453
454 def get_pid(self):
455 return self._proc.pid
456
457 def get_returncode(self):
458 return self._returncode
459
460 def get_pipe_transport(self, fd):
461 if fd in self._pipes:
462 return self._pipes[fd].pipe
463 else:
464 return None
465
466 def send_signal(self, signal):
467 self._proc.send_signal(signal)
468
469 def terminate(self):
470 self._proc.terminate()
471
472 def kill(self):
473 self._proc.kill()
474
475 @tasks.coroutine
476 def _post_init(self):
477 proc = self._proc
478 loop = self._loop
479 if proc.stdin is not None:
480 transp, proto = yield from loop.connect_write_pipe(
481 functools.partial(
482 _UnixWriteSubprocessPipeProto, self, STDIN),
483 proc.stdin)
484 if proc.stdout is not None:
485 transp, proto = yield from loop.connect_read_pipe(
486 functools.partial(
487 _UnixReadSubprocessPipeProto, self, STDOUT),
488 proc.stdout)
489 if proc.stderr is not None:
490 transp, proto = yield from loop.connect_read_pipe(
491 functools.partial(
492 _UnixReadSubprocessPipeProto, self, STDERR),
493 proc.stderr)
494 if not self._pipes:
495 self._try_connected()
496
497 def _call(self, cb, *data):
498 if self._pending_calls is not None:
499 self._pending_calls.append((cb, data))
500 else:
501 self._loop.call_soon(cb, *data)
502
503 def _try_connected(self):
504 assert self._pending_calls is not None
505 if all(p is not None and p.connected for p in self._pipes.values()):
506 self._loop.call_soon(self._protocol.connection_made, self)
507 for callback, data in self._pending_calls:
508 self._loop.call_soon(callback, *data)
509 self._pending_calls = None
510
511 def _pipe_connection_lost(self, fd, exc):
512 self._call(self._protocol.pipe_connection_lost, fd, exc)
513 self._try_finish()
514
515 def _pipe_data_received(self, fd, data):
516 self._call(self._protocol.pipe_data_received, fd, data)
517
518 def _process_exited(self, returncode):
519 assert returncode is not None, returncode
520 assert self._returncode is None, self._returncode
521 self._returncode = returncode
522 self._loop._subprocess_closed(self)
523 self._call(self._protocol.process_exited)
524 self._try_finish()
525
526 def _try_finish(self):
527 assert not self._finished
528 if self._returncode is None:
529 return
530 if all(p is not None and p.disconnected
531 for p in self._pipes.values()):
532 self._finished = True
533 self._loop.call_soon(self._call_connection_lost, None)
534
535 def _call_connection_lost(self, exc):
536 try:
537 self._protocol.connection_lost(exc)
538 finally:
539 self._proc = None
540 self._protocol = None
541 self._loop = None