blob: 168b5da0f2c0f9e57235a2113e94a89d002c8a43 [file] [log] [blame]
Victor Stinner32eb8402016-03-15 11:12:35 +01001import os
2import signal
3import subprocess
4import sys
5import time
6import unittest
7
8
9class SIGUSR1Exception(Exception):
10 pass
11
12
13class InterProcessSignalTests(unittest.TestCase):
14 def setUp(self):
15 self.got_signals = {'SIGHUP': 0, 'SIGUSR1': 0, 'SIGALRM': 0}
16
17 def sighup_handler(self, signum, frame):
18 self.got_signals['SIGHUP'] += 1
19
20 def sigusr1_handler(self, signum, frame):
21 self.got_signals['SIGUSR1'] += 1
22 raise SIGUSR1Exception
23
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000024 def wait_signal(self, child, signame):
25 if child is not None:
26 # This wait should be interrupted by exc_class
27 # (if set)
28 child.wait()
Victor Stinner32eb8402016-03-15 11:12:35 +010029
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000030 timeout = 10.0
31 deadline = time.monotonic() + timeout
Victor Stinner32eb8402016-03-15 11:12:35 +010032
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000033 while time.monotonic() < deadline:
34 if self.got_signals[signame]:
Victor Stinner32eb8402016-03-15 11:12:35 +010035 return
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000036 signal.pause()
Victor Stinner32eb8402016-03-15 11:12:35 +010037
38 self.fail('signal %s not received after %s seconds'
39 % (signame, timeout))
40
41 def subprocess_send_signal(self, pid, signame):
42 code = 'import os, signal; os.kill(%s, signal.%s)' % (pid, signame)
43 args = [sys.executable, '-I', '-c', code]
44 return subprocess.Popen(args)
45
46 def test_interprocess_signal(self):
47 # Install handlers. This function runs in a sub-process, so we
48 # don't worry about re-setting the default handlers.
49 signal.signal(signal.SIGHUP, self.sighup_handler)
50 signal.signal(signal.SIGUSR1, self.sigusr1_handler)
51 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
52 signal.signal(signal.SIGALRM, signal.default_int_handler)
53
54 # Let the sub-processes know who to send signals to.
55 pid = str(os.getpid())
56
57 with self.subprocess_send_signal(pid, "SIGHUP") as child:
58 self.wait_signal(child, 'SIGHUP')
59 self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 0,
60 'SIGALRM': 0})
61
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000062 with self.assertRaises(SIGUSR1Exception):
63 with self.subprocess_send_signal(pid, "SIGUSR1") as child:
64 self.wait_signal(child, 'SIGUSR1')
Victor Stinner32eb8402016-03-15 11:12:35 +010065 self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
66 'SIGALRM': 0})
67
68 with self.subprocess_send_signal(pid, "SIGUSR2") as child:
69 # Nothing should happen: SIGUSR2 is ignored
70 child.wait()
71
Victor Stinner9abee722017-09-19 09:36:54 -070072 try:
Pablo Galindo2ab2afd2018-12-11 11:32:12 +000073 with self.assertRaises(KeyboardInterrupt):
74 signal.alarm(1)
75 self.wait_signal(None, 'SIGALRM')
Victor Stinner9abee722017-09-19 09:36:54 -070076 self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
77 'SIGALRM': 0})
78 finally:
79 signal.alarm(0)
Victor Stinner32eb8402016-03-15 11:12:35 +010080
81
82if __name__ == "__main__":
83 unittest.main()