blob: 03e8101c45b39b054aa6f504dffc82dd89a87086 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001import unittest
2from test import test_support
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00003import signal
Christian Heimes8640e742008-02-23 16:23:06 +00004import os, sys, time, errno
Guido van Rossumcc5a91d1997-04-16 00:29:15 +00005
Armin Rigo8b2cbfd2004-08-07 21:27:43 +00006class HandlerBCalled(Exception):
7 pass
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00008
Thomas Woutersed03b412007-08-28 21:37:11 +00009class InterProcessSignalTests(unittest.TestCase):
10 MAX_DURATION = 20 # Entire test should last at most 20 sec.
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000011
Thomas Woutersed03b412007-08-28 21:37:11 +000012 # Set up a child to send signals to us (the parent) after waiting
13 # long enough to receive the alarm. It seems we miss the alarm
14 # for some reason. This will hopefully stop the hangs on
15 # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
16 # signals at times, or seemingly random subsets of them, and
17 # nothing done in force_test_exit so far has actually helped.
18 def spawn_force_test_exit_process(self, parent_pid):
19 # Sigh, both imports seem necessary to avoid errors.
20 import os
21 fork_pid = os.fork()
22 if fork_pid:
23 # In parent.
24 return fork_pid
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000025
Thomas Woutersed03b412007-08-28 21:37:11 +000026 # In child.
27 import os, time
28 try:
29 # Wait 5 seconds longer than the expected alarm to give enough
30 # time for the normal sequence of events to occur. This is
31 # just a stop-gap to try to prevent the test from hanging.
32 time.sleep(self.MAX_DURATION + 5)
33 print(" child should not have to kill parent",
34 file=sys.__stdout__)
35 for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
36 os.kill(parent_pid, getattr(signal, signame))
37 print(" child sent", signame, "to",
38 parent_pid, file=sys.__stdout__)
39 time.sleep(1)
40 finally:
41 os._exit(0)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000042
Thomas Woutersed03b412007-08-28 21:37:11 +000043 def handlerA(self, *args):
44 self.a_called = True
45 if test_support.verbose:
46 print("handlerA invoked", args)
Guido van Rossum4f17e3e1995-03-16 15:07:38 +000047
Thomas Woutersed03b412007-08-28 21:37:11 +000048 def handlerB(self, *args):
49 self.b_called = True
50 if test_support.verbose:
51 print("handlerB invoked", args)
52 raise HandlerBCalled(*args)
Neal Norwitz9730bcb2006-01-23 07:50:06 +000053
Thomas Woutersed03b412007-08-28 21:37:11 +000054 def test_main(self):
55 self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
56 self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
57 self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
58 self.assertEquals(signal.getsignal(signal.SIGALRM),
59 signal.default_int_handler)
Fred Drake004d5e62000-10-23 17:22:08 +000060
Thomas Woutersed03b412007-08-28 21:37:11 +000061 # Launch an external script to send us signals.
62 # We expect the external script to:
63 # send HUP, which invokes handlerA to set a_called
64 # send USR1, which invokes handlerB to set b_called and raise
65 # HandlerBCalled
66 # send USR2, which is ignored
67 #
68 # Then we expect the alarm to go off, and its handler raises
69 # KeyboardInterrupt, finally getting us out of the loop.
Michael W. Hudson5c26e862004-06-11 18:09:28 +000070
Thomas Woutersed03b412007-08-28 21:37:11 +000071 if test_support.verbose:
72 verboseflag = '-x'
73 else:
74 verboseflag = '+x'
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000075
Thomas Woutersed03b412007-08-28 21:37:11 +000076 pid = self.pid
77 if test_support.verbose:
78 print("test runner's pid is", pid)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000079
Thomas Woutersed03b412007-08-28 21:37:11 +000080 # Shell script that will send us asynchronous signals
81 script = """
82 (
83 set %(verboseflag)s
84 sleep 2
85 kill -HUP %(pid)d
86 sleep 2
87 kill -USR1 %(pid)d
88 sleep 2
89 kill -USR2 %(pid)d
90 ) &
91 """ % vars()
Thomas Wouters00ee7ba2006-08-21 19:07:27 +000092
Thomas Woutersed03b412007-08-28 21:37:11 +000093 signal.alarm(self.MAX_DURATION)
Michael W. Hudson5c26e862004-06-11 18:09:28 +000094
Thomas Woutersed03b412007-08-28 21:37:11 +000095 handler_b_exception_raised = False
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Thomas Woutersed03b412007-08-28 21:37:11 +000097 os.system(script)
98 try:
99 if test_support.verbose:
100 print("starting pause() loop...")
101 while 1:
102 try:
103 if test_support.verbose:
104 print("call pause()...")
105 signal.pause()
106 if test_support.verbose:
107 print("pause() returned")
108 except HandlerBCalled:
109 handler_b_exception_raised = True
110 if test_support.verbose:
111 print("HandlerBCalled exception caught")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000112
Thomas Woutersed03b412007-08-28 21:37:11 +0000113 except KeyboardInterrupt:
114 if test_support.verbose:
115 print("KeyboardInterrupt (the alarm() went off)")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116
Thomas Woutersed03b412007-08-28 21:37:11 +0000117 self.assert_(self.a_called)
118 self.assert_(self.b_called)
119 self.assert_(handler_b_exception_raised)
Thomas Wouters00ee7ba2006-08-21 19:07:27 +0000120
Thomas Woutersed03b412007-08-28 21:37:11 +0000121 def setUp(self):
122 # Install handlers.
123 self.hup = signal.signal(signal.SIGHUP, self.handlerA)
124 self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
125 self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
126 self.alrm = signal.signal(signal.SIGALRM,
127 signal.default_int_handler)
128 self.a_called = False
129 self.b_called = False
130 self.pid = os.getpid()
131 self.fork_pid = self.spawn_force_test_exit_process(self.pid)
132
133 def tearDown(self):
134 # Forcibly kill the child we created to ping us if there was a
135 # test error.
136 try:
137 # Make sure we don't kill ourself if there was a fork
138 # error.
139 if self.fork_pid > 0:
140 os.kill(self.fork_pid, signal.SIGKILL)
141 except:
142 # If the child killed us, it has probably exited. Killing
143 # a non-existent process will raise an error which we
144 # don't care about.
145 pass
146
147 # Restore handlers.
148 signal.alarm(0) # cancel alarm in case we died early
149 signal.signal(signal.SIGHUP, self.hup)
150 signal.signal(signal.SIGUSR1, self.usr1)
151 signal.signal(signal.SIGUSR2, self.usr2)
152 signal.signal(signal.SIGALRM, self.alrm)
153
154
155class BasicSignalTests(unittest.TestCase):
156 def test_out_of_range_signal_number_raises_error(self):
157 self.assertRaises(ValueError, signal.getsignal, 4242)
158
159 def trivial_signal_handler(*args):
160 pass
161
162 self.assertRaises(ValueError, signal.signal, 4242,
163 trivial_signal_handler)
164
165 def test_setting_signal_handler_to_none_raises_error(self):
166 self.assertRaises(TypeError, signal.signal,
167 signal.SIGUSR1, None)
168
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000169class WakeupSignalTests(unittest.TestCase):
170 TIMEOUT_FULL = 10
171 TIMEOUT_HALF = 5
172
173 def test_wakeup_fd_early(self):
174 import select
175
176 signal.alarm(1)
177 before_time = time.time()
178 # We attempt to get a signal during the sleep,
179 # before select is called
180 time.sleep(self.TIMEOUT_FULL)
181 mid_time = time.time()
182 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
183 select.select([self.read], [], [], self.TIMEOUT_FULL)
184 after_time = time.time()
185 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
186
187 def test_wakeup_fd_during(self):
188 import select
189
190 signal.alarm(1)
191 before_time = time.time()
192 # We attempt to get a signal during the select call
193 self.assertRaises(select.error, select.select,
194 [self.read], [], [], self.TIMEOUT_FULL)
195 after_time = time.time()
196 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
197
198 def setUp(self):
199 import fcntl
200
201 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
202 self.read, self.write = os.pipe()
203 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
204 flags = flags | os.O_NONBLOCK
205 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
206 self.old_wakeup = signal.set_wakeup_fd(self.write)
207
208 def tearDown(self):
209 signal.set_wakeup_fd(self.old_wakeup)
210 os.close(self.read)
211 os.close(self.write)
212 signal.signal(signal.SIGALRM, self.alrm)
213
Christian Heimes8640e742008-02-23 16:23:06 +0000214class SiginterruptTest(unittest.TestCase):
215 signum = signal.SIGUSR1
216 def readpipe_interrupted(self, cb):
217 r, w = os.pipe()
218 ppid = os.getpid()
219 pid = os.fork()
220
221 oldhandler = signal.signal(self.signum, lambda x,y: None)
222 cb()
223 if pid==0:
224 # child code: sleep, kill, sleep. and then exit,
225 # which closes the pipe from which the parent process reads
226 try:
227 time.sleep(0.2)
228 os.kill(ppid, self.signum)
229 time.sleep(0.2)
230 finally:
231 os._exit(0)
232
233 try:
234 os.close(w)
235
236 try:
237 d=os.read(r, 1)
238 return False
239 except OSError as err:
240 if err.errno != errno.EINTR:
241 raise
242 return True
243 finally:
244 signal.signal(self.signum, oldhandler)
245 os.waitpid(pid, 0)
246
247 def test_without_siginterrupt(self):
248 i=self.readpipe_interrupted(lambda: None)
249 self.assertEquals(i, True)
250
251 def test_siginterrupt_on(self):
252 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
253 self.assertEquals(i, True)
254
255 def test_siginterrupt_off(self):
256 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
257 self.assertEquals(i, False)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000258
Thomas Woutersed03b412007-08-28 21:37:11 +0000259def test_main():
260 if sys.platform[:3] in ('win', 'os2'):
261 raise test_support.TestSkipped("Can't test signal on %s" % \
262 sys.platform)
263
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000264 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
Christian Heimes8640e742008-02-23 16:23:06 +0000265 WakeupSignalTests, SiginterruptTest)
Thomas Woutersed03b412007-08-28 21:37:11 +0000266
267
268if __name__ == "__main__":
269 test_main()