blob: e6a5fc524397121e170bd5a888957374b91ba946 [file] [log] [blame]
Michael Foord65b69a12010-03-27 13:25:41 +00001import signal
2import weakref
3
Michael Foordde4ceab2010-04-25 19:53:49 +00004from functools import wraps
5
Michael Foord65b69a12010-03-27 13:25:41 +00006__unittest = True
7
8
9class _InterruptHandler(object):
10 def __init__(self, default_handler):
11 self.called = False
Michael Foord6debd762013-01-29 23:07:57 +000012 self.original_handler = default_handler
13 if isinstance(default_handler, int):
14 if default_handler == signal.SIG_DFL:
15 # Pretend it's signal.default_int_handler instead.
16 default_handler = signal.default_int_handler
17 elif default_handler == signal.SIG_IGN:
18 # Not quite the same thing as SIG_IGN, but the closest we
19 # can make it: do nothing.
20 def default_handler(unused_signum, unused_frame):
21 pass
22 else:
23 raise TypeError("expected SIGINT signal handler to be "
24 "signal.SIG_IGN, signal.SIG_DFL, or a "
25 "callable object")
Michael Foord65b69a12010-03-27 13:25:41 +000026 self.default_handler = default_handler
27
28 def __call__(self, signum, frame):
29 installed_handler = signal.getsignal(signal.SIGINT)
30 if installed_handler is not self:
31 # if we aren't the installed handler, then delegate immediately
32 # to the default handler
33 self.default_handler(signum, frame)
34
35 if self.called:
36 self.default_handler(signum, frame)
37 self.called = True
38 for result in _results.keys():
39 result.stop()
40
41_results = weakref.WeakKeyDictionary()
42def registerResult(result):
43 _results[result] = 1
44
45def removeResult(result):
46 return bool(_results.pop(result, None))
47
48_interrupt_handler = None
49def installHandler():
50 global _interrupt_handler
51 if _interrupt_handler is None:
52 default_handler = signal.getsignal(signal.SIGINT)
53 _interrupt_handler = _InterruptHandler(default_handler)
54 signal.signal(signal.SIGINT, _interrupt_handler)
Michael Foordde4ceab2010-04-25 19:53:49 +000055
56
57def removeHandler(method=None):
58 if method is not None:
59 @wraps(method)
60 def inner(*args, **kwargs):
61 initial = signal.getsignal(signal.SIGINT)
62 removeHandler()
63 try:
64 return method(*args, **kwargs)
65 finally:
66 signal.signal(signal.SIGINT, initial)
67 return inner
68
69 global _interrupt_handler
70 if _interrupt_handler is not None:
Michael Foord6debd762013-01-29 23:07:57 +000071 signal.signal(signal.SIGINT, _interrupt_handler.original_handler)