blob: 1d25814b0e52e9029b53a1c6fac59a172c95b6f1 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00003import signal
Christian Heimesc06950e2008-02-28 21:17:00 +00004import sys, os, time, errno
5
6if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
7 raise test_support.TestSkipped("Can't test signal on %s" % \
8 sys.platform)
9
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000010
Armin Rigo8b2cbfd2004-08-07 21:27:43 +000011class HandlerBCalled(Exception):
12 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000013
Thomas Woutersed03b412007-08-28 21:37:11 +000014class InterProcessSignalTests(unittest.TestCase):
15 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000016
Thomas Woutersed03b412007-08-28 21:37:11 +000017 # Set up a child to send signals to us (the parent) after waiting
18 # long enough to receive the alarm. It seems we miss the alarm
19 # for some reason. This will hopefully stop the hangs on
20 # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
21 # signals at times, or seemingly random subsets of them, and
22 # nothing done in force_test_exit so far has actually helped.
23 def spawn_force_test_exit_process(self, parent_pid):
24 # Sigh, both imports seem necessary to avoid errors.
25 import os
26 fork_pid = os.fork()
27 if fork_pid:
28 # In parent.
29 return fork_pid
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000030
Thomas Woutersed03b412007-08-28 21:37:11 +000031 # In child.
32 import os, time
33 try:
34 # Wait 5 seconds longer than the expected alarm to give enough
35 # time for the normal sequence of events to occur. This is
36 # just a stop-gap to try to prevent the test from hanging.
37 time.sleep(self.MAX_DURATION + 5)
38 print(" child should not have to kill parent",
39 file=sys.__stdout__)
40 for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
41 os.kill(parent_pid, getattr(signal, signame))
42 print(" child sent", signame, "to",
43 parent_pid, file=sys.__stdout__)
44 time.sleep(1)
45 finally:
46 os._exit(0)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000047
Thomas Woutersed03b412007-08-28 21:37:11 +000048 def handlerA(self, *args):
49 self.a_called = True
50 if test_support.verbose:
51 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000052
Thomas Woutersed03b412007-08-28 21:37:11 +000053 def handlerB(self, *args):
54 self.b_called = True
55 if test_support.verbose:
56 print("handlerB invoked", args)
57 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000058
Thomas Woutersed03b412007-08-28 21:37:11 +000059 def test_main(self):
60 self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
61 self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
62 self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
63 self.assertEquals(signal.getsignal(signal.SIGALRM),
64 signal.default_int_handler)
Fred Drake004d5e62000-10-23 17:22:08 +000065
Thomas Woutersed03b412007-08-28 21:37:11 +000066 # Launch an external script to send us signals.
67 # We expect the external script to:
68 # send HUP, which invokes handlerA to set a_called
69 # send USR1, which invokes handlerB to set b_called and raise
70 # HandlerBCalled
71 # send USR2, which is ignored
72 #
73 # Then we expect the alarm to go off, and its handler raises
74 # KeyboardInterrupt, finally getting us out of the loop.
Michael W. Hudson5c26e862004-06-11 18:09:28 +000075
Thomas Woutersed03b412007-08-28 21:37:11 +000076 if test_support.verbose:
77 verboseflag = '-x'
78 else:
79 verboseflag = '+x'
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000080
Thomas Woutersed03b412007-08-28 21:37:11 +000081 pid = self.pid
82 if test_support.verbose:
83 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000084
Thomas Woutersed03b412007-08-28 21:37:11 +000085 # Shell script that will send us asynchronous signals
86 script = """
87 (
88 set %(verboseflag)s
89 sleep 2
90 kill -HUP %(pid)d
91 sleep 2
92 kill -USR1 %(pid)d
93 sleep 2
94 kill -USR2 %(pid)d
95 ) &
96 """ % vars()
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000097
Thomas Woutersed03b412007-08-28 21:37:11 +000098 signal.alarm(self.MAX_DURATION)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000099
Thomas Woutersed03b412007-08-28 21:37:11 +0000100 handler_b_exception_raised = False
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000101
Thomas Woutersed03b412007-08-28 21:37:11 +0000102 os.system(script)
103 try:
104 if test_support.verbose:
105 print("starting pause() loop...")
106 while 1:
107 try:
108 if test_support.verbose:
109 print("call pause()...")
110 signal.pause()
111 if test_support.verbose:
112 print("pause() returned")
113 except HandlerBCalled:
114 handler_b_exception_raised = True
115 if test_support.verbose:
116 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Thomas Woutersed03b412007-08-28 21:37:11 +0000118 except KeyboardInterrupt:
119 if test_support.verbose:
120 print("KeyboardInterrupt (the alarm() went off)")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121
Thomas Woutersed03b412007-08-28 21:37:11 +0000122 self.assert_(self.a_called)
123 self.assert_(self.b_called)
124 self.assert_(handler_b_exception_raised)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +0000125
Thomas Woutersed03b412007-08-28 21:37:11 +0000126 def setUp(self):
127 # Install handlers.
128 self.hup = signal.signal(signal.SIGHUP, self.handlerA)
129 self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
130 self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
131 self.alrm = signal.signal(signal.SIGALRM,
132 signal.default_int_handler)
133 self.a_called = False
134 self.b_called = False
135 self.pid = os.getpid()
136 self.fork_pid = self.spawn_force_test_exit_process(self.pid)
137
138 def tearDown(self):
139 # Forcibly kill the child we created to ping us if there was a
140 # test error.
141 try:
142 # Make sure we don't kill ourself if there was a fork
143 # error.
144 if self.fork_pid > 0:
145 os.kill(self.fork_pid, signal.SIGKILL)
146 except:
147 # If the child killed us, it has probably exited. Killing
148 # a non-existent process will raise an error which we
149 # don't care about.
150 pass
151
152 # Restore handlers.
153 signal.alarm(0) # cancel alarm in case we died early
154 signal.signal(signal.SIGHUP, self.hup)
155 signal.signal(signal.SIGUSR1, self.usr1)
156 signal.signal(signal.SIGUSR2, self.usr2)
157 signal.signal(signal.SIGALRM, self.alrm)
158
159
160class BasicSignalTests(unittest.TestCase):
161 def test_out_of_range_signal_number_raises_error(self):
162 self.assertRaises(ValueError, signal.getsignal, 4242)
163
164 def trivial_signal_handler(*args):
165 pass
166
167 self.assertRaises(ValueError, signal.signal, 4242,
168 trivial_signal_handler)
169
170 def test_setting_signal_handler_to_none_raises_error(self):
171 self.assertRaises(TypeError, signal.signal,
172 signal.SIGUSR1, None)
173
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000174class WakeupSignalTests(unittest.TestCase):
175 TIMEOUT_FULL = 10
176 TIMEOUT_HALF = 5
177
178 def test_wakeup_fd_early(self):
179 import select
180
181 signal.alarm(1)
182 before_time = time.time()
183 # We attempt to get a signal during the sleep,
184 # before select is called
185 time.sleep(self.TIMEOUT_FULL)
186 mid_time = time.time()
187 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
188 select.select([self.read], [], [], self.TIMEOUT_FULL)
189 after_time = time.time()
190 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
191
192 def test_wakeup_fd_during(self):
193 import select
194
195 signal.alarm(1)
196 before_time = time.time()
197 # We attempt to get a signal during the select call
198 self.assertRaises(select.error, select.select,
199 [self.read], [], [], self.TIMEOUT_FULL)
200 after_time = time.time()
201 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
202
203 def setUp(self):
204 import fcntl
205
206 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
207 self.read, self.write = os.pipe()
208 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
209 flags = flags | os.O_NONBLOCK
210 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
211 self.old_wakeup = signal.set_wakeup_fd(self.write)
212
213 def tearDown(self):
214 signal.set_wakeup_fd(self.old_wakeup)
215 os.close(self.read)
216 os.close(self.write)
217 signal.signal(signal.SIGALRM, self.alrm)
218
Christian Heimes8640e742008-02-23 16:23:06 +0000219class SiginterruptTest(unittest.TestCase):
220 signum = signal.SIGUSR1
221 def readpipe_interrupted(self, cb):
222 r, w = os.pipe()
223 ppid = os.getpid()
224 pid = os.fork()
225
226 oldhandler = signal.signal(self.signum, lambda x,y: None)
227 cb()
228 if pid==0:
229 # child code: sleep, kill, sleep. and then exit,
230 # which closes the pipe from which the parent process reads
231 try:
232 time.sleep(0.2)
233 os.kill(ppid, self.signum)
234 time.sleep(0.2)
235 finally:
236 os._exit(0)
237
238 try:
239 os.close(w)
240
241 try:
242 d=os.read(r, 1)
243 return False
244 except OSError as err:
245 if err.errno != errno.EINTR:
246 raise
247 return True
248 finally:
249 signal.signal(self.signum, oldhandler)
250 os.waitpid(pid, 0)
251
252 def test_without_siginterrupt(self):
253 i=self.readpipe_interrupted(lambda: None)
254 self.assertEquals(i, True)
255
256 def test_siginterrupt_on(self):
257 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
258 self.assertEquals(i, True)
259
260 def test_siginterrupt_off(self):
261 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
262 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000263
Thomas Woutersed03b412007-08-28 21:37:11 +0000264def test_main():
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000265 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Christian Heimes8640e742008-02-23 16:23:06 +0000266 WakeupSignalTests, SiginterruptTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000267
268
269if __name__ == "__main__":
270 test_main()