blob: a410710f270813f54c73c6773fe6271b6482660d [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Christian Heimes4fbc72b2008-03-22 00:47:35 +00003from contextlib import closing, nested
Christian Heimescc47b052008-03-25 14:56:36 +00004import gc
Christian Heimes4fbc72b2008-03-22 00:47:35 +00005import pickle
6import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00007import signal
Christian Heimes4fbc72b2008-03-22 00:47:35 +00008import subprocess
9import traceback
Christian Heimesc06950e2008-02-28 21:17:00 +000010import sys, os, time, errno
11
12if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13 raise test_support.TestSkipped("Can't test signal on %s" % \
14 sys.platform)
15
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000016
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000017class HandlerBCalled(Exception):
18 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000019
Christian Heimes4fbc72b2008-03-22 00:47:35 +000020
21def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
23
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
27 """
28 os._exit(0)
29
30
Benjamin Petersonad9d48d2008-04-02 21:49:44 +000031def ignoring_eintr(__func, *args, **kwargs):
32 try:
33 return __func(*args, **kwargs)
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +000034 except EnvironmentError as e:
35 if e.errno != errno.EINTR:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +000036 raise
37 return None
38
39
Thomas Woutersed03b412007-08-28 21:37:11 +000040class InterProcessSignalTests(unittest.TestCase):
41 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000042
Christian Heimescc47b052008-03-25 14:56:36 +000043 def setUp(self):
44 self.using_gc = gc.isenabled()
45 gc.disable()
46
47 def tearDown(self):
48 if self.using_gc:
49 gc.enable()
50
Thomas Woutersed03b412007-08-28 21:37:11 +000051 def handlerA(self, *args):
52 self.a_called = True
53 if test_support.verbose:
54 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000055
Thomas Woutersed03b412007-08-28 21:37:11 +000056 def handlerB(self, *args):
57 self.b_called = True
58 if test_support.verbose:
59 print("handlerB invoked", args)
60 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000061
Christian Heimes4fbc72b2008-03-22 00:47:35 +000062 def wait(self, child):
63 """Wait for child to finish, ignoring EINTR."""
64 while True:
65 try:
66 child.wait()
67 return
68 except OSError as e:
69 if e.errno != errno.EINTR:
70 raise
Fred Drake004d5e62000-10-23 17:22:08 +000071
Christian Heimes4fbc72b2008-03-22 00:47:35 +000072 def run_test(self):
73 # Install handlers. This function runs in a sub-process, so we
74 # don't worry about re-setting the default handlers.
75 signal.signal(signal.SIGHUP, self.handlerA)
76 signal.signal(signal.SIGUSR1, self.handlerB)
77 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
78 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000079
Christian Heimes4fbc72b2008-03-22 00:47:35 +000080 # Variables the signals will modify:
81 self.a_called = False
82 self.b_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000083
Christian Heimes4fbc72b2008-03-22 00:47:35 +000084 # Let the sub-processes know who to send signals to.
85 pid = os.getpid()
Thomas Woutersed03b412007-08-28 21:37:11 +000086 if test_support.verbose:
87 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000088
Benjamin Petersonad9d48d2008-04-02 21:49:44 +000089 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
90 if child:
91 self.wait(child)
92 if not self.a_called:
93 time.sleep(1) # Give the signal time to be delivered.
Christian Heimes4fbc72b2008-03-22 00:47:35 +000094 self.assertTrue(self.a_called)
95 self.assertFalse(self.b_called)
96 self.a_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000097
Thomas Woutersed03b412007-08-28 21:37:11 +000098 try:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000099 child = subprocess.Popen(['kill', '-USR1', str(pid)])
100 # This wait should be interrupted by the signal's exception.
101 self.wait(child)
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000102 time.sleep(1) # Give the signal time to be delivered.
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000103 self.fail('HandlerBCalled exception not thrown')
104 except HandlerBCalled:
105 self.assertTrue(self.b_called)
106 self.assertFalse(self.a_called)
Thomas Woutersed03b412007-08-28 21:37:11 +0000107 if test_support.verbose:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000108 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000109
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000110 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
111 if child:
112 self.wait(child) # Nothing should happen.
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000113
114 try:
115 signal.alarm(1)
116 # The race condition in pause doesn't matter in this case,
117 # since alarm is going to raise a KeyboardException, which
118 # will skip the call.
119 signal.pause()
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000120 # But if another signal arrives before the alarm, pause
121 # may return early.
122 time.sleep(1)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123 except KeyboardInterrupt:
124 if test_support.verbose:
125 print("KeyboardInterrupt (the alarm() went off)")
Thomas Woutersed03b412007-08-28 21:37:11 +0000126 except:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000127 self.fail("Some other exception woke us from pause: %s" %
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000128 traceback.format_exc())
129 else:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000130 self.fail("pause returned of its own accord, and the signal"
131 " didn't arrive after another second.")
Thomas Woutersed03b412007-08-28 21:37:11 +0000132
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000133 def test_main(self):
134 # This function spawns a child process to insulate the main
135 # test-running process from all the signals. It then
136 # communicates with that child process over a pipe and
137 # re-raises information about any exceptions the child
138 # throws. The real work happens in self.run_test().
139 os_done_r, os_done_w = os.pipe()
140 with nested(closing(os.fdopen(os_done_r, 'rb')),
141 closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
142 child = os.fork()
143 if child == 0:
144 # In the child process; run the test and report results
145 # through the pipe.
146 try:
147 done_r.close()
148 # Have to close done_w again here because
149 # exit_subprocess() will skip the enclosing with block.
150 with closing(done_w):
151 try:
152 self.run_test()
153 except:
154 pickle.dump(traceback.format_exc(), done_w)
155 else:
156 pickle.dump(None, done_w)
157 except:
158 print('Uh oh, raised from pickle.')
159 traceback.print_exc()
160 finally:
161 exit_subprocess()
162
163 done_w.close()
164 # Block for up to MAX_DURATION seconds for the test to finish.
165 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
166 if done_r in r:
167 tb = pickle.load(done_r)
168 if tb:
169 self.fail(tb)
170 else:
171 os.kill(child, signal.SIGKILL)
172 self.fail('Test deadlocked after %d seconds.' %
173 self.MAX_DURATION)
Thomas Woutersed03b412007-08-28 21:37:11 +0000174
175
176class BasicSignalTests(unittest.TestCase):
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000177 def trivial_signal_handler(self, *args):
178 pass
179
Thomas Woutersed03b412007-08-28 21:37:11 +0000180 def test_out_of_range_signal_number_raises_error(self):
181 self.assertRaises(ValueError, signal.getsignal, 4242)
182
Thomas Woutersed03b412007-08-28 21:37:11 +0000183 self.assertRaises(ValueError, signal.signal, 4242,
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000184 self.trivial_signal_handler)
Thomas Woutersed03b412007-08-28 21:37:11 +0000185
186 def test_setting_signal_handler_to_none_raises_error(self):
187 self.assertRaises(TypeError, signal.signal,
188 signal.SIGUSR1, None)
189
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000190 def test_getsignal(self):
191 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
192 self.assertEquals(signal.getsignal(signal.SIGHUP),
193 self.trivial_signal_handler)
194 signal.signal(signal.SIGHUP, hup)
195 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
196
197
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000198class WakeupSignalTests(unittest.TestCase):
199 TIMEOUT_FULL = 10
200 TIMEOUT_HALF = 5
201
202 def test_wakeup_fd_early(self):
203 import select
204
205 signal.alarm(1)
206 before_time = time.time()
207 # We attempt to get a signal during the sleep,
208 # before select is called
209 time.sleep(self.TIMEOUT_FULL)
210 mid_time = time.time()
211 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
212 select.select([self.read], [], [], self.TIMEOUT_FULL)
213 after_time = time.time()
214 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
215
216 def test_wakeup_fd_during(self):
217 import select
218
219 signal.alarm(1)
220 before_time = time.time()
221 # We attempt to get a signal during the select call
222 self.assertRaises(select.error, select.select,
223 [self.read], [], [], self.TIMEOUT_FULL)
224 after_time = time.time()
225 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
226
227 def setUp(self):
228 import fcntl
229
230 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
231 self.read, self.write = os.pipe()
232 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
233 flags = flags | os.O_NONBLOCK
234 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
235 self.old_wakeup = signal.set_wakeup_fd(self.write)
236
237 def tearDown(self):
238 signal.set_wakeup_fd(self.old_wakeup)
239 os.close(self.read)
240 os.close(self.write)
241 signal.signal(signal.SIGALRM, self.alrm)
242
Christian Heimes8640e742008-02-23 16:23:06 +0000243class SiginterruptTest(unittest.TestCase):
244 signum = signal.SIGUSR1
245 def readpipe_interrupted(self, cb):
246 r, w = os.pipe()
247 ppid = os.getpid()
248 pid = os.fork()
249
250 oldhandler = signal.signal(self.signum, lambda x,y: None)
251 cb()
252 if pid==0:
253 # child code: sleep, kill, sleep. and then exit,
254 # which closes the pipe from which the parent process reads
255 try:
256 time.sleep(0.2)
257 os.kill(ppid, self.signum)
258 time.sleep(0.2)
259 finally:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000260 exit_subprocess()
Christian Heimes8640e742008-02-23 16:23:06 +0000261
262 try:
263 os.close(w)
264
265 try:
266 d=os.read(r, 1)
267 return False
268 except OSError as err:
269 if err.errno != errno.EINTR:
270 raise
271 return True
272 finally:
273 signal.signal(self.signum, oldhandler)
274 os.waitpid(pid, 0)
275
276 def test_without_siginterrupt(self):
277 i=self.readpipe_interrupted(lambda: None)
278 self.assertEquals(i, True)
279
280 def test_siginterrupt_on(self):
281 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
282 self.assertEquals(i, True)
283
284 def test_siginterrupt_off(self):
285 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
286 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000287
Martin v. Löwis823725e2008-03-24 13:39:54 +0000288class ItimerTest(unittest.TestCase):
289 def setUp(self):
290 self.hndl_called = False
291 self.hndl_count = 0
292 self.itimer = None
Christian Heimescc47b052008-03-25 14:56:36 +0000293 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000294
295 def tearDown(self):
Christian Heimescc47b052008-03-25 14:56:36 +0000296 signal.signal(signal.SIGALRM, self.old_alarm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000297 if self.itimer is not None: # test_itimer_exc doesn't change this attr
298 # just ensure that itimer is stopped
299 signal.setitimer(self.itimer, 0)
300
301 def sig_alrm(self, *args):
302 self.hndl_called = True
303 if test_support.verbose:
304 print("SIGALRM handler invoked", args)
305
306 def sig_vtalrm(self, *args):
307 self.hndl_called = True
308
309 if self.hndl_count > 3:
310 # it shouldn't be here, because it should have been disabled.
311 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
312 "timer.")
313 elif self.hndl_count == 3:
314 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
315 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
316 if test_support.verbose:
317 print("last SIGVTALRM handler call")
318
319 self.hndl_count += 1
320
321 if test_support.verbose:
322 print("SIGVTALRM handler invoked", args)
323
324 def sig_prof(self, *args):
325 self.hndl_called = True
326 signal.setitimer(signal.ITIMER_PROF, 0)
327
328 if test_support.verbose:
329 print("SIGPROF handler invoked", args)
330
331 def test_itimer_exc(self):
332 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
333 # defines it ?
334 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
Christian Heimescc47b052008-03-25 14:56:36 +0000335 # Negative times are treated as zero on some platforms.
336 if 0:
337 self.assertRaises(signal.ItimerError,
338 signal.setitimer, signal.ITIMER_REAL, -1)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000339
340 def test_itimer_real(self):
341 self.itimer = signal.ITIMER_REAL
Martin v. Löwis823725e2008-03-24 13:39:54 +0000342 signal.setitimer(self.itimer, 1.0)
343 if test_support.verbose:
344 print("\ncall pause()...")
345 signal.pause()
346
347 self.assertEqual(self.hndl_called, True)
348
349 def test_itimer_virtual(self):
350 self.itimer = signal.ITIMER_VIRTUAL
351 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
352 signal.setitimer(self.itimer, 0.3, 0.2)
353
354 for i in range(100000000):
355 if signal.getitimer(self.itimer) == (0.0, 0.0):
356 break # sig_vtalrm handler stopped this itimer
357
358 # virtual itimer should be (0.0, 0.0) now
359 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
360 # and the handler should have been called
361 self.assertEquals(self.hndl_called, True)
362
363 def test_itimer_prof(self):
364 self.itimer = signal.ITIMER_PROF
365 signal.signal(signal.SIGPROF, self.sig_prof)
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +0000366 signal.setitimer(self.itimer, 0.2, 0.2)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000367
368 for i in range(100000000):
369 if signal.getitimer(self.itimer) == (0.0, 0.0):
370 break # sig_prof handler stopped this itimer
371
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +0000372 # profiling itimer should be (0.0, 0.0) now
373 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
374 # and the handler should have been called
Martin v. Löwis823725e2008-03-24 13:39:54 +0000375 self.assertEqual(self.hndl_called, True)
376
Thomas Woutersed03b412007-08-28 21:37:11 +0000377def test_main():
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000378 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Martin v. Löwis823725e2008-03-24 13:39:54 +0000379 WakeupSignalTests, SiginterruptTest, ItimerTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000380
381
382if __name__ == "__main__":
383 test_main()