blob: b0195d72110d395915fd99419d8895f59275707a [file] [log] [blame]
Georg Brandl9f2b93e2007-08-24 18:07:52 +00001import unittest
2from test import test_support
Jeffrey Yasskincf26f542008-03-21 05:02:44 +00003from contextlib import closing, nested
4import pickle
5import select
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00006import signal
Jeffrey Yasskincf26f542008-03-21 05:02:44 +00007import traceback
Christian Heimesacfd8ed2008-02-28 21:00:45 +00008import sys, os, time, errno
9
10if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
11 raise test_support.TestSkipped("Can't test signal on %s" % \
12 sys.platform)
13
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000014
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000015class HandlerBCalled(Exception):
16 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000017
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000018
19def exit_subprocess():
20 """Use os._exit(0) to exit the current subprocess.
21
22 Otherwise, the test catches the SystemExit and continues executing
23 in parallel with the original test, so you wind up with an
24 exponential number of tests running concurrently.
25 """
26 os._exit(0)
27
28
Georg Brandl9f2b93e2007-08-24 18:07:52 +000029class InterProcessSignalTests(unittest.TestCase):
30 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000031
Georg Brandl9f2b93e2007-08-24 18:07:52 +000032 def handlerA(self, *args):
33 self.a_called = True
34 if test_support.verbose:
35 print "handlerA invoked", args
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000036
Georg Brandl9f2b93e2007-08-24 18:07:52 +000037 def handlerB(self, *args):
38 self.b_called = True
39 if test_support.verbose:
40 print "handlerB invoked", args
41 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000042
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000043 def wait(self, child_pid):
44 """Wait for child_pid to finish, ignoring EINTR."""
45 while True:
46 try:
Jeffrey Yasskin6cda88e2008-03-21 05:51:37 +000047 os.waitpid(child_pid, 0)
48 return
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000049 except OSError as e:
Jeffrey Yasskin6cda88e2008-03-21 05:51:37 +000050 if e.errno == errno.ECHILD:
51 return
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000052 if e.errno != errno.EINTR:
53 raise
Fred Drake004d5e62000-10-23 17:22:08 +000054
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000055 def run_test(self):
56 # Install handlers. This function runs in a sub-process, so we
57 # don't worry about re-setting the default handlers.
58 signal.signal(signal.SIGHUP, self.handlerA)
59 signal.signal(signal.SIGUSR1, self.handlerB)
60 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
61 signal.signal(signal.SIGALRM, signal.default_int_handler)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000062
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000063 # Variables the signals will modify:
64 self.a_called = False
65 self.b_called = False
Tim Peters1742f332006-08-12 04:42:47 +000066
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000067 # Let the sub-processes know who to send signals to.
68 pid = os.getpid()
Georg Brandl9f2b93e2007-08-24 18:07:52 +000069 if test_support.verbose:
70 print "test runner's pid is", pid
Tim Peters1742f332006-08-12 04:42:47 +000071
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000072 child = os.fork()
73 if child == 0:
74 os.kill(pid, signal.SIGHUP)
75 exit_subprocess()
76 self.wait(child)
77 self.assertTrue(self.a_called)
78 self.assertFalse(self.b_called)
79 self.a_called = False
Tim Peters1742f332006-08-12 04:42:47 +000080
Georg Brandl9f2b93e2007-08-24 18:07:52 +000081 try:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000082 child = os.fork()
83 if child == 0:
84 os.kill(pid, signal.SIGUSR1)
85 exit_subprocess()
86 # This wait should be interrupted by the signal's exception.
87 self.wait(child)
88 self.fail('HandlerBCalled exception not thrown')
89 except HandlerBCalled:
90 # So we call it again to reap the child's zombie.
91 self.wait(child)
92 self.assertTrue(self.b_called)
93 self.assertFalse(self.a_called)
Georg Brandl9f2b93e2007-08-24 18:07:52 +000094 if test_support.verbose:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000095 print "HandlerBCalled exception caught"
Neal Norwitzec3c5e32006-07-30 19:18:38 +000096
Jeffrey Yasskincf26f542008-03-21 05:02:44 +000097 child = os.fork()
98 if child == 0:
99 os.kill(pid, signal.SIGUSR2)
100 exit_subprocess()
101 self.wait(child) # Nothing should happen.
102
103 try:
104 signal.alarm(1)
105 # The race condition in pause doesn't matter in this case,
106 # since alarm is going to raise a KeyboardException, which
107 # will skip the call.
108 signal.pause()
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000109 except KeyboardInterrupt:
110 if test_support.verbose:
111 print "KeyboardInterrupt (the alarm() went off)"
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000112 except:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000113 self.fail('Some other exception woke us from pause: %s' %
114 traceback.format_exc())
115 else:
116 self.fail('pause returned of its own accord')
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000117
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000118 def test_main(self):
119 # This function spawns a child process to insulate the main
120 # test-running process from all the signals. It then
121 # communicates with that child process over a pipe and
122 # re-raises information about any exceptions the child
123 # throws. The real work happens in self.run_test().
124 os_done_r, os_done_w = os.pipe()
125 with nested(closing(os.fdopen(os_done_r)),
126 closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
127 child = os.fork()
128 if child == 0:
129 # In the child process; run the test and report results
130 # through the pipe.
131 try:
132 done_r.close()
133 # Have to close done_w again here because
134 # exit_subprocess() will skip the enclosing with block.
135 with closing(done_w):
136 try:
137 self.run_test()
138 except:
139 pickle.dump(traceback.format_exc(), done_w)
140 else:
141 pickle.dump(None, done_w)
142 except:
143 print 'Uh oh, raised from pickle.'
144 traceback.print_exc()
145 finally:
146 exit_subprocess()
147
148 done_w.close()
149 # Block for up to MAX_DURATION seconds for the test to finish.
150 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
151 if done_r in r:
152 tb = pickle.load(done_r)
153 if tb:
154 self.fail(tb)
155 else:
156 os.kill(child, signal.SIGKILL)
157 self.fail('Test deadlocked after %d seconds.' %
158 self.MAX_DURATION)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000159
160
161class BasicSignalTests(unittest.TestCase):
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000162 def trivial_signal_handler(self, *args):
163 pass
164
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000165 def test_out_of_range_signal_number_raises_error(self):
166 self.assertRaises(ValueError, signal.getsignal, 4242)
167
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000168 self.assertRaises(ValueError, signal.signal, 4242,
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000169 self.trivial_signal_handler)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000170
171 def test_setting_signal_handler_to_none_raises_error(self):
172 self.assertRaises(TypeError, signal.signal,
173 signal.SIGUSR1, None)
174
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000175 def test_getsignal(self):
176 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
177 self.assertEquals(signal.getsignal(signal.SIGHUP),
178 self.trivial_signal_handler)
179 signal.signal(signal.SIGHUP, hup)
180 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
181
182
Guido van Rossum02de8972007-12-19 19:41:06 +0000183class WakeupSignalTests(unittest.TestCase):
184 TIMEOUT_FULL = 10
185 TIMEOUT_HALF = 5
186
187 def test_wakeup_fd_early(self):
188 import select
189
190 signal.alarm(1)
191 before_time = time.time()
192 # We attempt to get a signal during the sleep,
193 # before select is called
194 time.sleep(self.TIMEOUT_FULL)
195 mid_time = time.time()
196 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
197 select.select([self.read], [], [], self.TIMEOUT_FULL)
198 after_time = time.time()
199 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
200
201 def test_wakeup_fd_during(self):
202 import select
203
204 signal.alarm(1)
205 before_time = time.time()
206 # We attempt to get a signal during the select call
207 self.assertRaises(select.error, select.select,
208 [self.read], [], [], self.TIMEOUT_FULL)
209 after_time = time.time()
210 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
211
212 def setUp(self):
213 import fcntl
214
215 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
216 self.read, self.write = os.pipe()
217 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
218 flags = flags | os.O_NONBLOCK
219 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
220 self.old_wakeup = signal.set_wakeup_fd(self.write)
221
222 def tearDown(self):
223 signal.set_wakeup_fd(self.old_wakeup)
224 os.close(self.read)
225 os.close(self.write)
226 signal.signal(signal.SIGALRM, self.alrm)
227
Facundo Batista7e251e82008-02-23 15:07:35 +0000228class SiginterruptTest(unittest.TestCase):
229 signum = signal.SIGUSR1
230 def readpipe_interrupted(self, cb):
231 r, w = os.pipe()
232 ppid = os.getpid()
233 pid = os.fork()
234
235 oldhandler = signal.signal(self.signum, lambda x,y: None)
236 cb()
237 if pid==0:
238 # child code: sleep, kill, sleep. and then exit,
239 # which closes the pipe from which the parent process reads
240 try:
241 time.sleep(0.2)
242 os.kill(ppid, self.signum)
243 time.sleep(0.2)
244 finally:
Jeffrey Yasskincf26f542008-03-21 05:02:44 +0000245 exit_subprocess()
Facundo Batista7e251e82008-02-23 15:07:35 +0000246
247 try:
248 os.close(w)
249
250 try:
251 d=os.read(r, 1)
252 return False
253 except OSError, err:
254 if err.errno != errno.EINTR:
255 raise
256 return True
257 finally:
258 signal.signal(self.signum, oldhandler)
259 os.waitpid(pid, 0)
260
261 def test_without_siginterrupt(self):
262 i=self.readpipe_interrupted(lambda: None)
263 self.assertEquals(i, True)
264
265 def test_siginterrupt_on(self):
266 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
267 self.assertEquals(i, True)
268
269 def test_siginterrupt_off(self):
270 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
271 self.assertEquals(i, False)
Guido van Rossum02de8972007-12-19 19:41:06 +0000272
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000273def test_main():
Guido van Rossum02de8972007-12-19 19:41:06 +0000274 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Facundo Batista7e251e82008-02-23 15:07:35 +0000275 WakeupSignalTests, SiginterruptTest)
Georg Brandl9f2b93e2007-08-24 18:07:52 +0000276
277
278if __name__ == "__main__":
279 test_main()