blob: 2834076b2712995da5c0cfc705b383386343a36b [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Christian Heimes4fbc72b2008-03-22 00:47:35 +00003from contextlib import closing, nested
4import pickle
5import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00006import signal
Christian Heimes4fbc72b2008-03-22 00:47:35 +00007import subprocess
8import traceback
Christian Heimesc06950e2008-02-28 21:17:00 +00009import sys, os, time, errno
10
11if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
12 raise test_support.TestSkipped("Can't test signal on %s" % \
13 sys.platform)
14
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000015
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000016class HandlerBCalled(Exception):
17 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000018
Christian Heimes4fbc72b2008-03-22 00:47:35 +000019
20def exit_subprocess():
21 """Use os._exit(0) to exit the current subprocess.
22
23 Otherwise, the test catches the SystemExit and continues executing
24 in parallel with the original test, so you wind up with an
25 exponential number of tests running concurrently.
26 """
27 os._exit(0)
28
29
Thomas Woutersed03b412007-08-28 21:37:11 +000030class InterProcessSignalTests(unittest.TestCase):
31 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000032
Thomas Woutersed03b412007-08-28 21:37:11 +000033 def handlerA(self, *args):
34 self.a_called = True
35 if test_support.verbose:
36 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000037
Thomas Woutersed03b412007-08-28 21:37:11 +000038 def handlerB(self, *args):
39 self.b_called = True
40 if test_support.verbose:
41 print("handlerB invoked", args)
42 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000043
Christian Heimes4fbc72b2008-03-22 00:47:35 +000044 def wait(self, child):
45 """Wait for child to finish, ignoring EINTR."""
46 while True:
47 try:
48 child.wait()
49 return
50 except OSError as e:
51 if e.errno != errno.EINTR:
52 raise
Fred Drake004d5e62000-10-23 17:22:08 +000053
Christian Heimes4fbc72b2008-03-22 00:47:35 +000054 def run_test(self):
55 # Install handlers. This function runs in a sub-process, so we
56 # don't worry about re-setting the default handlers.
57 signal.signal(signal.SIGHUP, self.handlerA)
58 signal.signal(signal.SIGUSR1, self.handlerB)
59 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
60 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000061
Christian Heimes4fbc72b2008-03-22 00:47:35 +000062 # Variables the signals will modify:
63 self.a_called = False
64 self.b_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000065
Christian Heimes4fbc72b2008-03-22 00:47:35 +000066 # Let the sub-processes know who to send signals to.
67 pid = os.getpid()
Thomas Woutersed03b412007-08-28 21:37:11 +000068 if test_support.verbose:
69 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000070
Christian Heimes4fbc72b2008-03-22 00:47:35 +000071 child = subprocess.Popen(['kill', '-HUP', str(pid)])
72 self.wait(child)
73 self.assertTrue(self.a_called)
74 self.assertFalse(self.b_called)
75 self.a_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000076
Thomas Woutersed03b412007-08-28 21:37:11 +000077 try:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000078 child = subprocess.Popen(['kill', '-USR1', str(pid)])
79 # This wait should be interrupted by the signal's exception.
80 self.wait(child)
81 self.fail('HandlerBCalled exception not thrown')
82 except HandlerBCalled:
83 self.assertTrue(self.b_called)
84 self.assertFalse(self.a_called)
Thomas Woutersed03b412007-08-28 21:37:11 +000085 if test_support.verbose:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000086 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000087
Christian Heimes4fbc72b2008-03-22 00:47:35 +000088 child = subprocess.Popen(['kill', '-USR2', str(pid)])
89 self.wait(child) # Nothing should happen.
90
91 try:
92 signal.alarm(1)
93 # The race condition in pause doesn't matter in this case,
94 # since alarm is going to raise a KeyboardException, which
95 # will skip the call.
96 signal.pause()
Thomas Woutersed03b412007-08-28 21:37:11 +000097 except KeyboardInterrupt:
98 if test_support.verbose:
99 print("KeyboardInterrupt (the alarm() went off)")
Thomas Woutersed03b412007-08-28 21:37:11 +0000100 except:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000101 self.fail('Some other exception woke us from pause: %s' %
102 traceback.format_exc())
103 else:
104 self.fail('pause returned of its own accord')
Thomas Woutersed03b412007-08-28 21:37:11 +0000105
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000106 def test_main(self):
107 # This function spawns a child process to insulate the main
108 # test-running process from all the signals. It then
109 # communicates with that child process over a pipe and
110 # re-raises information about any exceptions the child
111 # throws. The real work happens in self.run_test().
112 os_done_r, os_done_w = os.pipe()
113 with nested(closing(os.fdopen(os_done_r, 'rb')),
114 closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
115 child = os.fork()
116 if child == 0:
117 # In the child process; run the test and report results
118 # through the pipe.
119 try:
120 done_r.close()
121 # Have to close done_w again here because
122 # exit_subprocess() will skip the enclosing with block.
123 with closing(done_w):
124 try:
125 self.run_test()
126 except:
127 pickle.dump(traceback.format_exc(), done_w)
128 else:
129 pickle.dump(None, done_w)
130 except:
131 print('Uh oh, raised from pickle.')
132 traceback.print_exc()
133 finally:
134 exit_subprocess()
135
136 done_w.close()
137 # Block for up to MAX_DURATION seconds for the test to finish.
138 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
139 if done_r in r:
140 tb = pickle.load(done_r)
141 if tb:
142 self.fail(tb)
143 else:
144 os.kill(child, signal.SIGKILL)
145 self.fail('Test deadlocked after %d seconds.' %
146 self.MAX_DURATION)
Thomas Woutersed03b412007-08-28 21:37:11 +0000147
148
149class BasicSignalTests(unittest.TestCase):
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000150 def trivial_signal_handler(self, *args):
151 pass
152
Thomas Woutersed03b412007-08-28 21:37:11 +0000153 def test_out_of_range_signal_number_raises_error(self):
154 self.assertRaises(ValueError, signal.getsignal, 4242)
155
Thomas Woutersed03b412007-08-28 21:37:11 +0000156 self.assertRaises(ValueError, signal.signal, 4242,
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000157 self.trivial_signal_handler)
Thomas Woutersed03b412007-08-28 21:37:11 +0000158
159 def test_setting_signal_handler_to_none_raises_error(self):
160 self.assertRaises(TypeError, signal.signal,
161 signal.SIGUSR1, None)
162
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000163 def test_getsignal(self):
164 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
165 self.assertEquals(signal.getsignal(signal.SIGHUP),
166 self.trivial_signal_handler)
167 signal.signal(signal.SIGHUP, hup)
168 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
169
170
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000171class WakeupSignalTests(unittest.TestCase):
172 TIMEOUT_FULL = 10
173 TIMEOUT_HALF = 5
174
175 def test_wakeup_fd_early(self):
176 import select
177
178 signal.alarm(1)
179 before_time = time.time()
180 # We attempt to get a signal during the sleep,
181 # before select is called
182 time.sleep(self.TIMEOUT_FULL)
183 mid_time = time.time()
184 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
185 select.select([self.read], [], [], self.TIMEOUT_FULL)
186 after_time = time.time()
187 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
188
189 def test_wakeup_fd_during(self):
190 import select
191
192 signal.alarm(1)
193 before_time = time.time()
194 # We attempt to get a signal during the select call
195 self.assertRaises(select.error, select.select,
196 [self.read], [], [], self.TIMEOUT_FULL)
197 after_time = time.time()
198 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
199
200 def setUp(self):
201 import fcntl
202
203 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
204 self.read, self.write = os.pipe()
205 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
206 flags = flags | os.O_NONBLOCK
207 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
208 self.old_wakeup = signal.set_wakeup_fd(self.write)
209
210 def tearDown(self):
211 signal.set_wakeup_fd(self.old_wakeup)
212 os.close(self.read)
213 os.close(self.write)
214 signal.signal(signal.SIGALRM, self.alrm)
215
Christian Heimes8640e742008-02-23 16:23:06 +0000216class SiginterruptTest(unittest.TestCase):
217 signum = signal.SIGUSR1
218 def readpipe_interrupted(self, cb):
219 r, w = os.pipe()
220 ppid = os.getpid()
221 pid = os.fork()
222
223 oldhandler = signal.signal(self.signum, lambda x,y: None)
224 cb()
225 if pid==0:
226 # child code: sleep, kill, sleep. and then exit,
227 # which closes the pipe from which the parent process reads
228 try:
229 time.sleep(0.2)
230 os.kill(ppid, self.signum)
231 time.sleep(0.2)
232 finally:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000233 exit_subprocess()
Christian Heimes8640e742008-02-23 16:23:06 +0000234
235 try:
236 os.close(w)
237
238 try:
239 d=os.read(r, 1)
240 return False
241 except OSError as err:
242 if err.errno != errno.EINTR:
243 raise
244 return True
245 finally:
246 signal.signal(self.signum, oldhandler)
247 os.waitpid(pid, 0)
248
249 def test_without_siginterrupt(self):
250 i=self.readpipe_interrupted(lambda: None)
251 self.assertEquals(i, True)
252
253 def test_siginterrupt_on(self):
254 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
255 self.assertEquals(i, True)
256
257 def test_siginterrupt_off(self):
258 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
259 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000260
Thomas Woutersed03b412007-08-28 21:37:11 +0000261def test_main():
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000262 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Christian Heimes8640e742008-02-23 16:23:06 +0000263 WakeupSignalTests, SiginterruptTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
265
266if __name__ == "__main__":
267 test_main()