blob: 3c039a14e5d60a8eb8f4fce04eff7368351f0526 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Christian Heimes4fbc72b2008-03-22 00:47:35 +00003from contextlib import closing, nested
Christian Heimescc47b052008-03-25 14:56:36 +00004import gc
Christian Heimes4fbc72b2008-03-22 00:47:35 +00005import pickle
6import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00007import signal
Christian Heimes4fbc72b2008-03-22 00:47:35 +00008import subprocess
9import traceback
Christian Heimesc06950e2008-02-28 21:17:00 +000010import sys, os, time, errno
11
12if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13 raise test_support.TestSkipped("Can't test signal on %s" % \
14 sys.platform)
15
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000016
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000017class HandlerBCalled(Exception):
18 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000019
Christian Heimes4fbc72b2008-03-22 00:47:35 +000020
21def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
23
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
27 """
28 os._exit(0)
29
30
Thomas Woutersed03b412007-08-28 21:37:11 +000031class InterProcessSignalTests(unittest.TestCase):
32 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000033
Christian Heimescc47b052008-03-25 14:56:36 +000034 def setUp(self):
35 self.using_gc = gc.isenabled()
36 gc.disable()
37
38 def tearDown(self):
39 if self.using_gc:
40 gc.enable()
41
Thomas Woutersed03b412007-08-28 21:37:11 +000042 def handlerA(self, *args):
43 self.a_called = True
44 if test_support.verbose:
45 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000046
Thomas Woutersed03b412007-08-28 21:37:11 +000047 def handlerB(self, *args):
48 self.b_called = True
49 if test_support.verbose:
50 print("handlerB invoked", args)
51 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000052
Christian Heimes4fbc72b2008-03-22 00:47:35 +000053 def wait(self, child):
54 """Wait for child to finish, ignoring EINTR."""
55 while True:
56 try:
57 child.wait()
58 return
59 except OSError as e:
60 if e.errno != errno.EINTR:
61 raise
Fred Drake004d5e62000-10-23 17:22:08 +000062
Christian Heimes4fbc72b2008-03-22 00:47:35 +000063 def run_test(self):
64 # Install handlers. This function runs in a sub-process, so we
65 # don't worry about re-setting the default handlers.
66 signal.signal(signal.SIGHUP, self.handlerA)
67 signal.signal(signal.SIGUSR1, self.handlerB)
68 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
69 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000070
Christian Heimes4fbc72b2008-03-22 00:47:35 +000071 # Variables the signals will modify:
72 self.a_called = False
73 self.b_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000074
Christian Heimes4fbc72b2008-03-22 00:47:35 +000075 # Let the sub-processes know who to send signals to.
76 pid = os.getpid()
Thomas Woutersed03b412007-08-28 21:37:11 +000077 if test_support.verbose:
78 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000079
Christian Heimes4fbc72b2008-03-22 00:47:35 +000080 child = subprocess.Popen(['kill', '-HUP', str(pid)])
81 self.wait(child)
82 self.assertTrue(self.a_called)
83 self.assertFalse(self.b_called)
84 self.a_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000085
Thomas Woutersed03b412007-08-28 21:37:11 +000086 try:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000087 child = subprocess.Popen(['kill', '-USR1', str(pid)])
88 # This wait should be interrupted by the signal's exception.
89 self.wait(child)
90 self.fail('HandlerBCalled exception not thrown')
91 except HandlerBCalled:
92 self.assertTrue(self.b_called)
93 self.assertFalse(self.a_called)
Thomas Woutersed03b412007-08-28 21:37:11 +000094 if test_support.verbose:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000095 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimes4fbc72b2008-03-22 00:47:35 +000097 child = subprocess.Popen(['kill', '-USR2', str(pid)])
98 self.wait(child) # Nothing should happen.
99
100 try:
101 signal.alarm(1)
102 # The race condition in pause doesn't matter in this case,
103 # since alarm is going to raise a KeyboardException, which
104 # will skip the call.
105 signal.pause()
Thomas Woutersed03b412007-08-28 21:37:11 +0000106 except KeyboardInterrupt:
107 if test_support.verbose:
108 print("KeyboardInterrupt (the alarm() went off)")
Thomas Woutersed03b412007-08-28 21:37:11 +0000109 except:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000110 self.fail('Some other exception woke us from pause: %s' %
111 traceback.format_exc())
112 else:
113 self.fail('pause returned of its own accord')
Thomas Woutersed03b412007-08-28 21:37:11 +0000114
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000115 def test_main(self):
116 # This function spawns a child process to insulate the main
117 # test-running process from all the signals. It then
118 # communicates with that child process over a pipe and
119 # re-raises information about any exceptions the child
120 # throws. The real work happens in self.run_test().
121 os_done_r, os_done_w = os.pipe()
122 with nested(closing(os.fdopen(os_done_r, 'rb')),
123 closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
124 child = os.fork()
125 if child == 0:
126 # In the child process; run the test and report results
127 # through the pipe.
128 try:
129 done_r.close()
130 # Have to close done_w again here because
131 # exit_subprocess() will skip the enclosing with block.
132 with closing(done_w):
133 try:
134 self.run_test()
135 except:
136 pickle.dump(traceback.format_exc(), done_w)
137 else:
138 pickle.dump(None, done_w)
139 except:
140 print('Uh oh, raised from pickle.')
141 traceback.print_exc()
142 finally:
143 exit_subprocess()
144
145 done_w.close()
146 # Block for up to MAX_DURATION seconds for the test to finish.
147 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
148 if done_r in r:
149 tb = pickle.load(done_r)
150 if tb:
151 self.fail(tb)
152 else:
153 os.kill(child, signal.SIGKILL)
154 self.fail('Test deadlocked after %d seconds.' %
155 self.MAX_DURATION)
Thomas Woutersed03b412007-08-28 21:37:11 +0000156
157
158class BasicSignalTests(unittest.TestCase):
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000159 def trivial_signal_handler(self, *args):
160 pass
161
Thomas Woutersed03b412007-08-28 21:37:11 +0000162 def test_out_of_range_signal_number_raises_error(self):
163 self.assertRaises(ValueError, signal.getsignal, 4242)
164
Thomas Woutersed03b412007-08-28 21:37:11 +0000165 self.assertRaises(ValueError, signal.signal, 4242,
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000166 self.trivial_signal_handler)
Thomas Woutersed03b412007-08-28 21:37:11 +0000167
168 def test_setting_signal_handler_to_none_raises_error(self):
169 self.assertRaises(TypeError, signal.signal,
170 signal.SIGUSR1, None)
171
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000172 def test_getsignal(self):
173 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
174 self.assertEquals(signal.getsignal(signal.SIGHUP),
175 self.trivial_signal_handler)
176 signal.signal(signal.SIGHUP, hup)
177 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
178
179
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000180class WakeupSignalTests(unittest.TestCase):
181 TIMEOUT_FULL = 10
182 TIMEOUT_HALF = 5
183
184 def test_wakeup_fd_early(self):
185 import select
186
187 signal.alarm(1)
188 before_time = time.time()
189 # We attempt to get a signal during the sleep,
190 # before select is called
191 time.sleep(self.TIMEOUT_FULL)
192 mid_time = time.time()
193 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
194 select.select([self.read], [], [], self.TIMEOUT_FULL)
195 after_time = time.time()
196 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
197
198 def test_wakeup_fd_during(self):
199 import select
200
201 signal.alarm(1)
202 before_time = time.time()
203 # We attempt to get a signal during the select call
204 self.assertRaises(select.error, select.select,
205 [self.read], [], [], self.TIMEOUT_FULL)
206 after_time = time.time()
207 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
208
209 def setUp(self):
210 import fcntl
211
212 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
213 self.read, self.write = os.pipe()
214 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
215 flags = flags | os.O_NONBLOCK
216 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
217 self.old_wakeup = signal.set_wakeup_fd(self.write)
218
219 def tearDown(self):
220 signal.set_wakeup_fd(self.old_wakeup)
221 os.close(self.read)
222 os.close(self.write)
223 signal.signal(signal.SIGALRM, self.alrm)
224
Christian Heimes8640e742008-02-23 16:23:06 +0000225class SiginterruptTest(unittest.TestCase):
226 signum = signal.SIGUSR1
227 def readpipe_interrupted(self, cb):
228 r, w = os.pipe()
229 ppid = os.getpid()
230 pid = os.fork()
231
232 oldhandler = signal.signal(self.signum, lambda x,y: None)
233 cb()
234 if pid==0:
235 # child code: sleep, kill, sleep. and then exit,
236 # which closes the pipe from which the parent process reads
237 try:
238 time.sleep(0.2)
239 os.kill(ppid, self.signum)
240 time.sleep(0.2)
241 finally:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000242 exit_subprocess()
Christian Heimes8640e742008-02-23 16:23:06 +0000243
244 try:
245 os.close(w)
246
247 try:
248 d=os.read(r, 1)
249 return False
250 except OSError as err:
251 if err.errno != errno.EINTR:
252 raise
253 return True
254 finally:
255 signal.signal(self.signum, oldhandler)
256 os.waitpid(pid, 0)
257
258 def test_without_siginterrupt(self):
259 i=self.readpipe_interrupted(lambda: None)
260 self.assertEquals(i, True)
261
262 def test_siginterrupt_on(self):
263 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
264 self.assertEquals(i, True)
265
266 def test_siginterrupt_off(self):
267 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
268 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000269
Martin v. Löwis823725e2008-03-24 13:39:54 +0000270class ItimerTest(unittest.TestCase):
271 def setUp(self):
272 self.hndl_called = False
273 self.hndl_count = 0
274 self.itimer = None
Christian Heimescc47b052008-03-25 14:56:36 +0000275 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000276
277 def tearDown(self):
Christian Heimescc47b052008-03-25 14:56:36 +0000278 signal.signal(signal.SIGALRM, self.old_alarm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000279 if self.itimer is not None: # test_itimer_exc doesn't change this attr
280 # just ensure that itimer is stopped
281 signal.setitimer(self.itimer, 0)
282
283 def sig_alrm(self, *args):
284 self.hndl_called = True
285 if test_support.verbose:
286 print("SIGALRM handler invoked", args)
287
288 def sig_vtalrm(self, *args):
289 self.hndl_called = True
290
291 if self.hndl_count > 3:
292 # it shouldn't be here, because it should have been disabled.
293 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
294 "timer.")
295 elif self.hndl_count == 3:
296 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
297 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
298 if test_support.verbose:
299 print("last SIGVTALRM handler call")
300
301 self.hndl_count += 1
302
303 if test_support.verbose:
304 print("SIGVTALRM handler invoked", args)
305
306 def sig_prof(self, *args):
307 self.hndl_called = True
308 signal.setitimer(signal.ITIMER_PROF, 0)
309
310 if test_support.verbose:
311 print("SIGPROF handler invoked", args)
312
313 def test_itimer_exc(self):
314 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
315 # defines it ?
316 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
Christian Heimescc47b052008-03-25 14:56:36 +0000317 # Negative times are treated as zero on some platforms.
318 if 0:
319 self.assertRaises(signal.ItimerError,
320 signal.setitimer, signal.ITIMER_REAL, -1)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000321
322 def test_itimer_real(self):
323 self.itimer = signal.ITIMER_REAL
Martin v. Löwis823725e2008-03-24 13:39:54 +0000324 signal.setitimer(self.itimer, 1.0)
325 if test_support.verbose:
326 print("\ncall pause()...")
327 signal.pause()
328
329 self.assertEqual(self.hndl_called, True)
330
331 def test_itimer_virtual(self):
332 self.itimer = signal.ITIMER_VIRTUAL
333 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
334 signal.setitimer(self.itimer, 0.3, 0.2)
335
336 for i in range(100000000):
337 if signal.getitimer(self.itimer) == (0.0, 0.0):
338 break # sig_vtalrm handler stopped this itimer
339
340 # virtual itimer should be (0.0, 0.0) now
341 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
342 # and the handler should have been called
343 self.assertEquals(self.hndl_called, True)
344
345 def test_itimer_prof(self):
346 self.itimer = signal.ITIMER_PROF
347 signal.signal(signal.SIGPROF, self.sig_prof)
348 signal.setitimer(self.itimer, 0.2)
349
350 for i in range(100000000):
351 if signal.getitimer(self.itimer) == (0.0, 0.0):
352 break # sig_prof handler stopped this itimer
353
354 self.assertEqual(self.hndl_called, True)
355
Thomas Woutersed03b412007-08-28 21:37:11 +0000356def test_main():
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000357 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Martin v. Löwis823725e2008-03-24 13:39:54 +0000358 WakeupSignalTests, SiginterruptTest, ItimerTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000359
360
361if __name__ == "__main__":
362 test_main()