blob: dab91c135c61fc3bd903a7cd6c633efefa73bdd2 [file] [log] [blame]
Andrew Hsieh83760d22013-06-18 12:24:28 -07001import gc
2import os
3import sys
4import signal
5import weakref
6
7from cStringIO import StringIO
8
9
10import unittest
11
12
13@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
14@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
15@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
16 "if threads have been used")
17class TestBreak(unittest.TestCase):
18 int_handler = None
19
20 def setUp(self):
21 self._default_handler = signal.getsignal(signal.SIGINT)
22 if self.int_handler is not None:
23 signal.signal(signal.SIGINT, self.int_handler)
24
25 def tearDown(self):
26 signal.signal(signal.SIGINT, self._default_handler)
27 unittest.signals._results = weakref.WeakKeyDictionary()
28 unittest.signals._interrupt_handler = None
29
30
31 def testInstallHandler(self):
32 default_handler = signal.getsignal(signal.SIGINT)
33 unittest.installHandler()
34 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
35
36 try:
37 pid = os.getpid()
38 os.kill(pid, signal.SIGINT)
39 except KeyboardInterrupt:
40 self.fail("KeyboardInterrupt not handled")
41
42 self.assertTrue(unittest.signals._interrupt_handler.called)
43
44 def testRegisterResult(self):
45 result = unittest.TestResult()
46 unittest.registerResult(result)
47
48 for ref in unittest.signals._results:
49 if ref is result:
50 break
51 elif ref is not result:
52 self.fail("odd object in result set")
53 else:
54 self.fail("result not found")
55
56
57 def testInterruptCaught(self):
58 default_handler = signal.getsignal(signal.SIGINT)
59
60 result = unittest.TestResult()
61 unittest.installHandler()
62 unittest.registerResult(result)
63
64 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
65
66 def test(result):
67 pid = os.getpid()
68 os.kill(pid, signal.SIGINT)
69 result.breakCaught = True
70 self.assertTrue(result.shouldStop)
71
72 try:
73 test(result)
74 except KeyboardInterrupt:
75 self.fail("KeyboardInterrupt not handled")
76 self.assertTrue(result.breakCaught)
77
78
79 def testSecondInterrupt(self):
80 # Can't use skipIf decorator because the signal handler may have
81 # been changed after defining this method.
82 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
83 self.skipTest("test requires SIGINT to not be ignored")
84 result = unittest.TestResult()
85 unittest.installHandler()
86 unittest.registerResult(result)
87
88 def test(result):
89 pid = os.getpid()
90 os.kill(pid, signal.SIGINT)
91 result.breakCaught = True
92 self.assertTrue(result.shouldStop)
93 os.kill(pid, signal.SIGINT)
94 self.fail("Second KeyboardInterrupt not raised")
95
96 try:
97 test(result)
98 except KeyboardInterrupt:
99 pass
100 else:
101 self.fail("Second KeyboardInterrupt not raised")
102 self.assertTrue(result.breakCaught)
103
104
105 def testTwoResults(self):
106 unittest.installHandler()
107
108 result = unittest.TestResult()
109 unittest.registerResult(result)
110 new_handler = signal.getsignal(signal.SIGINT)
111
112 result2 = unittest.TestResult()
113 unittest.registerResult(result2)
114 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
115
116 result3 = unittest.TestResult()
117
118 def test(result):
119 pid = os.getpid()
120 os.kill(pid, signal.SIGINT)
121
122 try:
123 test(result)
124 except KeyboardInterrupt:
125 self.fail("KeyboardInterrupt not handled")
126
127 self.assertTrue(result.shouldStop)
128 self.assertTrue(result2.shouldStop)
129 self.assertFalse(result3.shouldStop)
130
131
132 def testHandlerReplacedButCalled(self):
133 # Can't use skipIf decorator because the signal handler may have
134 # been changed after defining this method.
135 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
136 self.skipTest("test requires SIGINT to not be ignored")
137 # If our handler has been replaced (is no longer installed) but is
138 # called by the *new* handler, then it isn't safe to delay the
139 # SIGINT and we should immediately delegate to the default handler
140 unittest.installHandler()
141
142 handler = signal.getsignal(signal.SIGINT)
143 def new_handler(frame, signum):
144 handler(frame, signum)
145 signal.signal(signal.SIGINT, new_handler)
146
147 try:
148 pid = os.getpid()
149 os.kill(pid, signal.SIGINT)
150 except KeyboardInterrupt:
151 pass
152 else:
153 self.fail("replaced but delegated handler doesn't raise interrupt")
154
155 def testRunner(self):
156 # Creating a TextTestRunner with the appropriate argument should
157 # register the TextTestResult it creates
158 runner = unittest.TextTestRunner(stream=StringIO())
159
160 result = runner.run(unittest.TestSuite())
161 self.assertIn(result, unittest.signals._results)
162
163 def testWeakReferences(self):
164 # Calling registerResult on a result should not keep it alive
165 result = unittest.TestResult()
166 unittest.registerResult(result)
167
168 ref = weakref.ref(result)
169 del result
170
171 # For non-reference counting implementations
172 gc.collect();gc.collect()
173 self.assertIsNone(ref())
174
175
176 def testRemoveResult(self):
177 result = unittest.TestResult()
178 unittest.registerResult(result)
179
180 unittest.installHandler()
181 self.assertTrue(unittest.removeResult(result))
182
183 # Should this raise an error instead?
184 self.assertFalse(unittest.removeResult(unittest.TestResult()))
185
186 try:
187 pid = os.getpid()
188 os.kill(pid, signal.SIGINT)
189 except KeyboardInterrupt:
190 pass
191
192 self.assertFalse(result.shouldStop)
193
194 def testMainInstallsHandler(self):
195 failfast = object()
196 test = object()
197 verbosity = object()
198 result = object()
199 default_handler = signal.getsignal(signal.SIGINT)
200
201 class FakeRunner(object):
202 initArgs = []
203 runArgs = []
204 def __init__(self, *args, **kwargs):
205 self.initArgs.append((args, kwargs))
206 def run(self, test):
207 self.runArgs.append(test)
208 return result
209
210 class Program(unittest.TestProgram):
211 def __init__(self, catchbreak):
212 self.exit = False
213 self.verbosity = verbosity
214 self.failfast = failfast
215 self.catchbreak = catchbreak
216 self.testRunner = FakeRunner
217 self.test = test
218 self.result = None
219
220 p = Program(False)
221 p.runTests()
222
223 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
224 'verbosity': verbosity,
225 'failfast': failfast})])
226 self.assertEqual(FakeRunner.runArgs, [test])
227 self.assertEqual(p.result, result)
228
229 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
230
231 FakeRunner.initArgs = []
232 FakeRunner.runArgs = []
233 p = Program(True)
234 p.runTests()
235
236 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
237 'verbosity': verbosity,
238 'failfast': failfast})])
239 self.assertEqual(FakeRunner.runArgs, [test])
240 self.assertEqual(p.result, result)
241
242 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
243
244 def testRemoveHandler(self):
245 default_handler = signal.getsignal(signal.SIGINT)
246 unittest.installHandler()
247 unittest.removeHandler()
248 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
249
250 # check that calling removeHandler multiple times has no ill-effect
251 unittest.removeHandler()
252 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
253
254 def testRemoveHandlerAsDecorator(self):
255 default_handler = signal.getsignal(signal.SIGINT)
256 unittest.installHandler()
257
258 @unittest.removeHandler
259 def test():
260 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
261
262 test()
263 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
264
265@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
266@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
267@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
268 "if threads have been used")
269class TestBreakDefaultIntHandler(TestBreak):
270 int_handler = signal.default_int_handler
271
272@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
273@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
274@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
275 "if threads have been used")
276class TestBreakSignalIgnored(TestBreak):
277 int_handler = signal.SIG_IGN
278
279@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
280@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
281@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
282 "if threads have been used")
283class TestBreakSignalDefault(TestBreak):
284 int_handler = signal.SIG_DFL