blob: 76574afa859b256a071818a86f05cde76ce09f10 [file] [log] [blame]
Georg Brandl9f2b93e2007-08-24 18:07:52 +00001import unittest
2from test import test_support
Jeffrey Yasskincf26f542008-03-21 05:02:44 +00003from contextlib import closing, nested
4import pickle
5import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00006import signal
Jeffrey Yasskincf26f542008-03-21 05:02:44 +00007import traceback
Christian Heimesacfd8ed2008-02-28 21:00:45 +00008import sys, os, time, errno
9
10if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
11 raise test_support.TestSkipped("Can't test signal on %s" % \
12 sys.platform)
13
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000014
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000015class HandlerBCalled(Exception):
16 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000017
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000018
19def exit_subprocess():
20 """Use os._exit(0) to exit the current subprocess.
21
22 Otherwise, the test catches the SystemExit and continues executing
23 in parallel with the original test, so you wind up with an
24 exponential number of tests running concurrently.
25 """
26 os._exit(0)
27
28
Georg Brandl9f2b93e2007-08-24 18:07:52 +000029class InterProcessSignalTests(unittest.TestCase):
30 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000031
Georg Brandl9f2b93e2007-08-24 18:07:52 +000032 def handlerA(self, *args):
33 self.a_called = True
34 if test_support.verbose:
35 print "handlerA invoked", args
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000036
Georg Brandl9f2b93e2007-08-24 18:07:52 +000037 def handlerB(self, *args):
38 self.b_called = True
39 if test_support.verbose:
40 print "handlerB invoked", args
41 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000042
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000043 def wait(self, child_pid):
44 """Wait for child_pid to finish, ignoring EINTR."""
45 while True:
46 try:
47 pid, status = os.waitpid(child_pid, 0)
48 return status
49 except OSError as e:
50 if e.errno != errno.EINTR:
51 raise
Fred Drake004d5e62000-10-23 17:22:08 +000052
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000053 def run_test(self):
54 # Install handlers. This function runs in a sub-process, so we
55 # don't worry about re-setting the default handlers.
56 signal.signal(signal.SIGHUP, self.handlerA)
57 signal.signal(signal.SIGUSR1, self.handlerB)
58 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
59 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000060
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000061 # Variables the signals will modify:
62 self.a_called = False
63 self.b_called = False
Tim Peters1742f332006-08-12 04:42:47 +000064
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000065 # Let the sub-processes know who to send signals to.
66 pid = os.getpid()
Georg Brandl9f2b93e2007-08-24 18:07:52 +000067 if test_support.verbose:
68 print "test runner's pid is", pid
Tim Peters1742f332006-08-12 04:42:47 +000069
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000070 child = os.fork()
71 if child == 0:
72 os.kill(pid, signal.SIGHUP)
73 exit_subprocess()
74 self.wait(child)
75 self.assertTrue(self.a_called)
76 self.assertFalse(self.b_called)
77 self.a_called = False
Tim Peters1742f332006-08-12 04:42:47 +000078
Georg Brandl9f2b93e2007-08-24 18:07:52 +000079 try:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000080 child = os.fork()
81 if child == 0:
82 os.kill(pid, signal.SIGUSR1)
83 exit_subprocess()
84 # This wait should be interrupted by the signal's exception.
85 self.wait(child)
86 self.fail('HandlerBCalled exception not thrown')
87 except HandlerBCalled:
88 # So we call it again to reap the child's zombie.
89 self.wait(child)
90 self.assertTrue(self.b_called)
91 self.assertFalse(self.a_called)
Georg Brandl9f2b93e2007-08-24 18:07:52 +000092 if test_support.verbose:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000093 print "HandlerBCalled exception caught"
Neal Norwitzec3c5e32006-07-30 19:18:38 +000094
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000095 child = os.fork()
96 if child == 0:
97 os.kill(pid, signal.SIGUSR2)
98 exit_subprocess()
99 self.wait(child) # Nothing should happen.
100
101 try:
102 signal.alarm(1)
103 # The race condition in pause doesn't matter in this case,
104 # since alarm is going to raise a KeyboardException, which
105 # will skip the call.
106 signal.pause()
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000107 except KeyboardInterrupt:
108 if test_support.verbose:
109 print "KeyboardInterrupt (the alarm() went off)"
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000110 except:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000111 self.fail('Some other exception woke us from pause: %s' %
112 traceback.format_exc())
113 else:
114 self.fail('pause returned of its own accord')
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000115
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000116 def test_main(self):
117 # This function spawns a child process to insulate the main
118 # test-running process from all the signals. It then
119 # communicates with that child process over a pipe and
120 # re-raises information about any exceptions the child
121 # throws. The real work happens in self.run_test().
122 os_done_r, os_done_w = os.pipe()
123 with nested(closing(os.fdopen(os_done_r)),
124 closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
125 child = os.fork()
126 if child == 0:
127 # In the child process; run the test and report results
128 # through the pipe.
129 try:
130 done_r.close()
131 # Have to close done_w again here because
132 # exit_subprocess() will skip the enclosing with block.
133 with closing(done_w):
134 try:
135 self.run_test()
136 except:
137 pickle.dump(traceback.format_exc(), done_w)
138 else:
139 pickle.dump(None, done_w)
140 except:
141 print 'Uh oh, raised from pickle.'
142 traceback.print_exc()
143 finally:
144 exit_subprocess()
145
146 done_w.close()
147 # Block for up to MAX_DURATION seconds for the test to finish.
148 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
149 if done_r in r:
150 tb = pickle.load(done_r)
151 if tb:
152 self.fail(tb)
153 else:
154 os.kill(child, signal.SIGKILL)
155 self.fail('Test deadlocked after %d seconds.' %
156 self.MAX_DURATION)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000157
158
159class BasicSignalTests(unittest.TestCase):
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000160 def trivial_signal_handler(self, *args):
161 pass
162
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000163 def test_out_of_range_signal_number_raises_error(self):
164 self.assertRaises(ValueError, signal.getsignal, 4242)
165
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000166 self.assertRaises(ValueError, signal.signal, 4242,
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000167 self.trivial_signal_handler)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000168
169 def test_setting_signal_handler_to_none_raises_error(self):
170 self.assertRaises(TypeError, signal.signal,
171 signal.SIGUSR1, None)
172
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000173 def test_getsignal(self):
174 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
175 self.assertEquals(signal.getsignal(signal.SIGHUP),
176 self.trivial_signal_handler)
177 signal.signal(signal.SIGHUP, hup)
178 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
179
180
Guido van Rossum02de8972007-12-19 19:41:06 +0000181class WakeupSignalTests(unittest.TestCase):
182 TIMEOUT_FULL = 10
183 TIMEOUT_HALF = 5
184
185 def test_wakeup_fd_early(self):
186 import select
187
188 signal.alarm(1)
189 before_time = time.time()
190 # We attempt to get a signal during the sleep,
191 # before select is called
192 time.sleep(self.TIMEOUT_FULL)
193 mid_time = time.time()
194 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
195 select.select([self.read], [], [], self.TIMEOUT_FULL)
196 after_time = time.time()
197 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
198
199 def test_wakeup_fd_during(self):
200 import select
201
202 signal.alarm(1)
203 before_time = time.time()
204 # We attempt to get a signal during the select call
205 self.assertRaises(select.error, select.select,
206 [self.read], [], [], self.TIMEOUT_FULL)
207 after_time = time.time()
208 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
209
210 def setUp(self):
211 import fcntl
212
213 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
214 self.read, self.write = os.pipe()
215 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
216 flags = flags | os.O_NONBLOCK
217 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
218 self.old_wakeup = signal.set_wakeup_fd(self.write)
219
220 def tearDown(self):
221 signal.set_wakeup_fd(self.old_wakeup)
222 os.close(self.read)
223 os.close(self.write)
224 signal.signal(signal.SIGALRM, self.alrm)
225
Facundo Batista7e251e82008-02-23 15:07:35 +0000226class SiginterruptTest(unittest.TestCase):
227 signum = signal.SIGUSR1
228 def readpipe_interrupted(self, cb):
229 r, w = os.pipe()
230 ppid = os.getpid()
231 pid = os.fork()
232
233 oldhandler = signal.signal(self.signum, lambda x,y: None)
234 cb()
235 if pid==0:
236 # child code: sleep, kill, sleep. and then exit,
237 # which closes the pipe from which the parent process reads
238 try:
239 time.sleep(0.2)
240 os.kill(ppid, self.signum)
241 time.sleep(0.2)
242 finally:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000243 exit_subprocess()
Facundo Batista7e251e82008-02-23 15:07:35 +0000244
245 try:
246 os.close(w)
247
248 try:
249 d=os.read(r, 1)
250 return False
251 except OSError, err:
252 if err.errno != errno.EINTR:
253 raise
254 return True
255 finally:
256 signal.signal(self.signum, oldhandler)
257 os.waitpid(pid, 0)
258
259 def test_without_siginterrupt(self):
260 i=self.readpipe_interrupted(lambda: None)
261 self.assertEquals(i, True)
262
263 def test_siginterrupt_on(self):
264 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
265 self.assertEquals(i, True)
266
267 def test_siginterrupt_off(self):
268 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
269 self.assertEquals(i, False)
Guido van Rossum02de8972007-12-19 19:41:06 +0000270
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000271def test_main():
Guido van Rossum02de8972007-12-19 19:41:06 +0000272 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Facundo Batista7e251e82008-02-23 15:07:35 +0000273 WakeupSignalTests, SiginterruptTest)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000274
275
276if __name__ == "__main__":
277 test_main()