blob: 4f89e87715ff7bc854353adc641f0e338ae91b6c [file] [log] [blame]
Michael Foord65b69a12010-03-27 13:25:41 +00001import gc
2import io
3import os
Brian Curtineb24d742010-04-12 17:16:38 +00004import sys
Michael Foord65b69a12010-03-27 13:25:41 +00005import signal
6import weakref
7
8import unittest
9
10
11@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
Brian Curtineb24d742010-04-12 17:16:38 +000012@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
R. David Murray3141a632010-04-22 00:53:47 +000013@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
14 "if threads have been used")
Michael Foord65b69a12010-03-27 13:25:41 +000015class TestBreak(unittest.TestCase):
16
17 def setUp(self):
18 self._default_handler = signal.getsignal(signal.SIGINT)
19
20 def tearDown(self):
21 signal.signal(signal.SIGINT, self._default_handler)
22 unittest.signals._results = weakref.WeakKeyDictionary()
23 unittest.signals._interrupt_handler = None
24
25
26 def testInstallHandler(self):
27 default_handler = signal.getsignal(signal.SIGINT)
28 unittest.installHandler()
29 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
30
31 try:
32 pid = os.getpid()
33 os.kill(pid, signal.SIGINT)
34 except KeyboardInterrupt:
35 self.fail("KeyboardInterrupt not handled")
36
37 self.assertTrue(unittest.signals._interrupt_handler.called)
38
39 def testRegisterResult(self):
40 result = unittest.TestResult()
41 unittest.registerResult(result)
42
43 for ref in unittest.signals._results:
44 if ref is result:
45 break
46 elif ref is not result:
47 self.fail("odd object in result set")
48 else:
49 self.fail("result not found")
50
51
52 def testInterruptCaught(self):
53 default_handler = signal.getsignal(signal.SIGINT)
54
55 result = unittest.TestResult()
56 unittest.installHandler()
57 unittest.registerResult(result)
58
59 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
60
61 def test(result):
62 pid = os.getpid()
63 os.kill(pid, signal.SIGINT)
64 result.breakCaught = True
65 self.assertTrue(result.shouldStop)
66
67 try:
68 test(result)
69 except KeyboardInterrupt:
70 self.fail("KeyboardInterrupt not handled")
71 self.assertTrue(result.breakCaught)
72
73
74 def testSecondInterrupt(self):
75 result = unittest.TestResult()
76 unittest.installHandler()
77 unittest.registerResult(result)
78
79 def test(result):
80 pid = os.getpid()
81 os.kill(pid, signal.SIGINT)
82 result.breakCaught = True
83 self.assertTrue(result.shouldStop)
84 os.kill(pid, signal.SIGINT)
85 self.fail("Second KeyboardInterrupt not raised")
86
87 try:
88 test(result)
89 except KeyboardInterrupt:
90 pass
91 else:
92 self.fail("Second KeyboardInterrupt not raised")
93 self.assertTrue(result.breakCaught)
94
95
96 def testTwoResults(self):
97 unittest.installHandler()
98
99 result = unittest.TestResult()
100 unittest.registerResult(result)
101 new_handler = signal.getsignal(signal.SIGINT)
102
103 result2 = unittest.TestResult()
104 unittest.registerResult(result2)
105 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
106
107 result3 = unittest.TestResult()
108
109 def test(result):
110 pid = os.getpid()
111 os.kill(pid, signal.SIGINT)
112
113 try:
114 test(result)
115 except KeyboardInterrupt:
116 self.fail("KeyboardInterrupt not handled")
117
118 self.assertTrue(result.shouldStop)
119 self.assertTrue(result2.shouldStop)
120 self.assertFalse(result3.shouldStop)
121
122
123 def testHandlerReplacedButCalled(self):
124 # If our handler has been replaced (is no longer installed) but is
125 # called by the *new* handler, then it isn't safe to delay the
126 # SIGINT and we should immediately delegate to the default handler
127 unittest.installHandler()
128
129 handler = signal.getsignal(signal.SIGINT)
130 def new_handler(frame, signum):
131 handler(frame, signum)
132 signal.signal(signal.SIGINT, new_handler)
133
134 try:
135 pid = os.getpid()
136 os.kill(pid, signal.SIGINT)
137 except KeyboardInterrupt:
138 pass
139 else:
140 self.fail("replaced but delegated handler doesn't raise interrupt")
141
142 def testRunner(self):
143 # Creating a TextTestRunner with the appropriate argument should
144 # register the TextTestResult it creates
145 runner = unittest.TextTestRunner(stream=io.StringIO())
146
147 result = runner.run(unittest.TestSuite())
148 self.assertIn(result, unittest.signals._results)
149
150 def testWeakReferences(self):
151 # Calling registerResult on a result should not keep it alive
152 result = unittest.TestResult()
153 unittest.registerResult(result)
154
155 ref = weakref.ref(result)
156 del result
157
158 # For non-reference counting implementations
159 gc.collect();gc.collect()
160 self.assertIsNone(ref())
161
162
163 def testRemoveResult(self):
164 result = unittest.TestResult()
165 unittest.registerResult(result)
166
167 unittest.installHandler()
168 self.assertTrue(unittest.removeResult(result))
169
170 # Should this raise an error instead?
171 self.assertFalse(unittest.removeResult(unittest.TestResult()))
172
173 try:
174 pid = os.getpid()
175 os.kill(pid, signal.SIGINT)
176 except KeyboardInterrupt:
177 pass
178
179 self.assertFalse(result.shouldStop)
180
181 def testMainInstallsHandler(self):
182 failfast = object()
183 test = object()
184 verbosity = object()
185 result = object()
186 default_handler = signal.getsignal(signal.SIGINT)
187
188 class FakeRunner(object):
189 initArgs = []
190 runArgs = []
191 def __init__(self, *args, **kwargs):
192 self.initArgs.append((args, kwargs))
193 def run(self, test):
194 self.runArgs.append(test)
195 return result
196
197 class Program(unittest.TestProgram):
198 def __init__(self, catchbreak):
199 self.exit = False
200 self.verbosity = verbosity
201 self.failfast = failfast
202 self.catchbreak = catchbreak
203 self.testRunner = FakeRunner
204 self.test = test
205 self.result = None
206
207 p = Program(False)
208 p.runTests()
209
Benjamin Petersonb48af542010-04-11 20:43:16 +0000210 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
211 'verbosity': verbosity,
212 'failfast': failfast})])
Michael Foord65b69a12010-03-27 13:25:41 +0000213 self.assertEqual(FakeRunner.runArgs, [test])
214 self.assertEqual(p.result, result)
215
216 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
217
218 FakeRunner.initArgs = []
219 FakeRunner.runArgs = []
220 p = Program(True)
221 p.runTests()
222
Benjamin Petersonb48af542010-04-11 20:43:16 +0000223 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
224 'verbosity': verbosity,
225 'failfast': failfast})])
Michael Foord65b69a12010-03-27 13:25:41 +0000226 self.assertEqual(FakeRunner.runArgs, [test])
227 self.assertEqual(p.result, result)
228
229 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)