blob: 2cee626e91fe3a3fd8acbbc0ab2fb6bc32a23618 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Christian Heimes4fbc72b2008-03-22 00:47:35 +00003from contextlib import closing, nested
4import pickle
5import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00006import signal
Christian Heimes4fbc72b2008-03-22 00:47:35 +00007import subprocess
8import traceback
Christian Heimesc06950e2008-02-28 21:17:00 +00009import sys, os, time, errno
10
11if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
12 raise test_support.TestSkipped("Can't test signal on %s" % \
13 sys.platform)
14
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000015
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000016class HandlerBCalled(Exception):
17 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000018
Christian Heimes4fbc72b2008-03-22 00:47:35 +000019
20def exit_subprocess():
21 """Use os._exit(0) to exit the current subprocess.
22
23 Otherwise, the test catches the SystemExit and continues executing
24 in parallel with the original test, so you wind up with an
25 exponential number of tests running concurrently.
26 """
27 os._exit(0)
28
29
Thomas Woutersed03b412007-08-28 21:37:11 +000030class InterProcessSignalTests(unittest.TestCase):
31 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000032
Thomas Woutersed03b412007-08-28 21:37:11 +000033 def handlerA(self, *args):
34 self.a_called = True
35 if test_support.verbose:
36 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000037
Thomas Woutersed03b412007-08-28 21:37:11 +000038 def handlerB(self, *args):
39 self.b_called = True
40 if test_support.verbose:
41 print("handlerB invoked", args)
42 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000043
Christian Heimes4fbc72b2008-03-22 00:47:35 +000044 def wait(self, child):
45 """Wait for child to finish, ignoring EINTR."""
46 while True:
47 try:
48 child.wait()
49 return
50 except OSError as e:
51 if e.errno != errno.EINTR:
52 raise
Fred Drake004d5e62000-10-23 17:22:08 +000053
Christian Heimes4fbc72b2008-03-22 00:47:35 +000054 def run_test(self):
55 # Install handlers. This function runs in a sub-process, so we
56 # don't worry about re-setting the default handlers.
57 signal.signal(signal.SIGHUP, self.handlerA)
58 signal.signal(signal.SIGUSR1, self.handlerB)
59 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
60 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000061
Christian Heimes4fbc72b2008-03-22 00:47:35 +000062 # Variables the signals will modify:
63 self.a_called = False
64 self.b_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000065
Christian Heimes4fbc72b2008-03-22 00:47:35 +000066 # Let the sub-processes know who to send signals to.
67 pid = os.getpid()
Thomas Woutersed03b412007-08-28 21:37:11 +000068 if test_support.verbose:
69 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000070
Christian Heimes4fbc72b2008-03-22 00:47:35 +000071 child = subprocess.Popen(['kill', '-HUP', str(pid)])
72 self.wait(child)
73 self.assertTrue(self.a_called)
74 self.assertFalse(self.b_called)
75 self.a_called = False
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000076
Thomas Woutersed03b412007-08-28 21:37:11 +000077 try:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000078 child = subprocess.Popen(['kill', '-USR1', str(pid)])
79 # This wait should be interrupted by the signal's exception.
80 self.wait(child)
81 self.fail('HandlerBCalled exception not thrown')
82 except HandlerBCalled:
83 self.assertTrue(self.b_called)
84 self.assertFalse(self.a_called)
Thomas Woutersed03b412007-08-28 21:37:11 +000085 if test_support.verbose:
Christian Heimes4fbc72b2008-03-22 00:47:35 +000086 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000087
Christian Heimes4fbc72b2008-03-22 00:47:35 +000088 child = subprocess.Popen(['kill', '-USR2', str(pid)])
89 self.wait(child) # Nothing should happen.
90
91 try:
92 signal.alarm(1)
93 # The race condition in pause doesn't matter in this case,
94 # since alarm is going to raise a KeyboardException, which
95 # will skip the call.
96 signal.pause()
Thomas Woutersed03b412007-08-28 21:37:11 +000097 except KeyboardInterrupt:
98 if test_support.verbose:
99 print("KeyboardInterrupt (the alarm() went off)")
Thomas Woutersed03b412007-08-28 21:37:11 +0000100 except:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000101 self.fail('Some other exception woke us from pause: %s' %
102 traceback.format_exc())
103 else:
104 self.fail('pause returned of its own accord')
Thomas Woutersed03b412007-08-28 21:37:11 +0000105
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000106 def test_main(self):
107 # This function spawns a child process to insulate the main
108 # test-running process from all the signals. It then
109 # communicates with that child process over a pipe and
110 # re-raises information about any exceptions the child
111 # throws. The real work happens in self.run_test().
112 os_done_r, os_done_w = os.pipe()
113 with nested(closing(os.fdopen(os_done_r, 'rb')),
114 closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
115 child = os.fork()
116 if child == 0:
117 # In the child process; run the test and report results
118 # through the pipe.
119 try:
120 done_r.close()
121 # Have to close done_w again here because
122 # exit_subprocess() will skip the enclosing with block.
123 with closing(done_w):
124 try:
125 self.run_test()
126 except:
127 pickle.dump(traceback.format_exc(), done_w)
128 else:
129 pickle.dump(None, done_w)
130 except:
131 print('Uh oh, raised from pickle.')
132 traceback.print_exc()
133 finally:
134 exit_subprocess()
135
136 done_w.close()
137 # Block for up to MAX_DURATION seconds for the test to finish.
138 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
139 if done_r in r:
140 tb = pickle.load(done_r)
141 if tb:
142 self.fail(tb)
143 else:
144 os.kill(child, signal.SIGKILL)
145 self.fail('Test deadlocked after %d seconds.' %
146 self.MAX_DURATION)
Thomas Woutersed03b412007-08-28 21:37:11 +0000147
148
149class BasicSignalTests(unittest.TestCase):
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000150 def trivial_signal_handler(self, *args):
151 pass
152
Thomas Woutersed03b412007-08-28 21:37:11 +0000153 def test_out_of_range_signal_number_raises_error(self):
154 self.assertRaises(ValueError, signal.getsignal, 4242)
155
Thomas Woutersed03b412007-08-28 21:37:11 +0000156 self.assertRaises(ValueError, signal.signal, 4242,
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000157 self.trivial_signal_handler)
Thomas Woutersed03b412007-08-28 21:37:11 +0000158
159 def test_setting_signal_handler_to_none_raises_error(self):
160 self.assertRaises(TypeError, signal.signal,
161 signal.SIGUSR1, None)
162
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000163 def test_getsignal(self):
164 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
165 self.assertEquals(signal.getsignal(signal.SIGHUP),
166 self.trivial_signal_handler)
167 signal.signal(signal.SIGHUP, hup)
168 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
169
170
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000171class WakeupSignalTests(unittest.TestCase):
172 TIMEOUT_FULL = 10
173 TIMEOUT_HALF = 5
174
175 def test_wakeup_fd_early(self):
176 import select
177
178 signal.alarm(1)
179 before_time = time.time()
180 # We attempt to get a signal during the sleep,
181 # before select is called
182 time.sleep(self.TIMEOUT_FULL)
183 mid_time = time.time()
184 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
185 select.select([self.read], [], [], self.TIMEOUT_FULL)
186 after_time = time.time()
187 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
188
189 def test_wakeup_fd_during(self):
190 import select
191
192 signal.alarm(1)
193 before_time = time.time()
194 # We attempt to get a signal during the select call
195 self.assertRaises(select.error, select.select,
196 [self.read], [], [], self.TIMEOUT_FULL)
197 after_time = time.time()
198 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
199
200 def setUp(self):
201 import fcntl
202
203 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
204 self.read, self.write = os.pipe()
205 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
206 flags = flags | os.O_NONBLOCK
207 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
208 self.old_wakeup = signal.set_wakeup_fd(self.write)
209
210 def tearDown(self):
211 signal.set_wakeup_fd(self.old_wakeup)
212 os.close(self.read)
213 os.close(self.write)
214 signal.signal(signal.SIGALRM, self.alrm)
215
Christian Heimes8640e742008-02-23 16:23:06 +0000216class SiginterruptTest(unittest.TestCase):
217 signum = signal.SIGUSR1
218 def readpipe_interrupted(self, cb):
219 r, w = os.pipe()
220 ppid = os.getpid()
221 pid = os.fork()
222
223 oldhandler = signal.signal(self.signum, lambda x,y: None)
224 cb()
225 if pid==0:
226 # child code: sleep, kill, sleep. and then exit,
227 # which closes the pipe from which the parent process reads
228 try:
229 time.sleep(0.2)
230 os.kill(ppid, self.signum)
231 time.sleep(0.2)
232 finally:
Christian Heimes4fbc72b2008-03-22 00:47:35 +0000233 exit_subprocess()
Christian Heimes8640e742008-02-23 16:23:06 +0000234
235 try:
236 os.close(w)
237
238 try:
239 d=os.read(r, 1)
240 return False
241 except OSError as err:
242 if err.errno != errno.EINTR:
243 raise
244 return True
245 finally:
246 signal.signal(self.signum, oldhandler)
247 os.waitpid(pid, 0)
248
249 def test_without_siginterrupt(self):
250 i=self.readpipe_interrupted(lambda: None)
251 self.assertEquals(i, True)
252
253 def test_siginterrupt_on(self):
254 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
255 self.assertEquals(i, True)
256
257 def test_siginterrupt_off(self):
258 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
259 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000260
Martin v. Löwis823725e2008-03-24 13:39:54 +0000261class ItimerTest(unittest.TestCase):
262 def setUp(self):
263 self.hndl_called = False
264 self.hndl_count = 0
265 self.itimer = None
266
267 def tearDown(self):
268 if self.itimer is not None: # test_itimer_exc doesn't change this attr
269 # just ensure that itimer is stopped
270 signal.setitimer(self.itimer, 0)
271
272 def sig_alrm(self, *args):
273 self.hndl_called = True
274 if test_support.verbose:
275 print("SIGALRM handler invoked", args)
276
277 def sig_vtalrm(self, *args):
278 self.hndl_called = True
279
280 if self.hndl_count > 3:
281 # it shouldn't be here, because it should have been disabled.
282 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
283 "timer.")
284 elif self.hndl_count == 3:
285 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
286 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
287 if test_support.verbose:
288 print("last SIGVTALRM handler call")
289
290 self.hndl_count += 1
291
292 if test_support.verbose:
293 print("SIGVTALRM handler invoked", args)
294
295 def sig_prof(self, *args):
296 self.hndl_called = True
297 signal.setitimer(signal.ITIMER_PROF, 0)
298
299 if test_support.verbose:
300 print("SIGPROF handler invoked", args)
301
302 def test_itimer_exc(self):
303 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
304 # defines it ?
305 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
306 # negative time
307 self.assertRaises(signal.ItimerError, signal.setitimer,
308 signal.ITIMER_REAL, -1)
309
310 def test_itimer_real(self):
311 self.itimer = signal.ITIMER_REAL
312 signal.signal(signal.SIGALRM, self.sig_alrm)
313 signal.setitimer(self.itimer, 1.0)
314 if test_support.verbose:
315 print("\ncall pause()...")
316 signal.pause()
317
318 self.assertEqual(self.hndl_called, True)
319
320 def test_itimer_virtual(self):
321 self.itimer = signal.ITIMER_VIRTUAL
322 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
323 signal.setitimer(self.itimer, 0.3, 0.2)
324
325 for i in range(100000000):
326 if signal.getitimer(self.itimer) == (0.0, 0.0):
327 break # sig_vtalrm handler stopped this itimer
328
329 # virtual itimer should be (0.0, 0.0) now
330 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
331 # and the handler should have been called
332 self.assertEquals(self.hndl_called, True)
333
334 def test_itimer_prof(self):
335 self.itimer = signal.ITIMER_PROF
336 signal.signal(signal.SIGPROF, self.sig_prof)
337 signal.setitimer(self.itimer, 0.2)
338
339 for i in range(100000000):
340 if signal.getitimer(self.itimer) == (0.0, 0.0):
341 break # sig_prof handler stopped this itimer
342
343 self.assertEqual(self.hndl_called, True)
344
Thomas Woutersed03b412007-08-28 21:37:11 +0000345def test_main():
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000346 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Martin v. Löwis823725e2008-03-24 13:39:54 +0000347 WakeupSignalTests, SiginterruptTest, ItimerTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000348
349
350if __name__ == "__main__":
351 test_main()