blob: 3c8c8ebfc51d1b6aae6ea66c62b007db0eef3aec [file] [log] [blame]
Victor Stinner024e37a2011-03-31 01:31:06 +02001from contextlib import contextmanager
2import faulthandler
3import re
4import signal
5import subprocess
6import sys
7from test import support, script_helper
8import tempfile
9import unittest
10
11try:
12 from resource import setrlimit, RLIMIT_CORE, error as resource_error
13except ImportError:
14 prepare_subprocess = None
15else:
16 def prepare_subprocess():
17 # don't create core file
18 try:
19 setrlimit(RLIMIT_CORE, (0, 0))
20 except (ValueError, resource_error):
21 pass
22
23def expected_traceback(lineno1, lineno2, header, count=1):
24 regex = header
25 regex += r' File "\<string\>", line %s in func\n' % lineno1
26 regex += r' File "\<string\>", line %s in \<module\>' % lineno2
27 if count != 1:
28 regex = (regex + '\n') * (count - 1) + regex
29 return '^' + regex + '$'
30
31@contextmanager
32def temporary_filename():
33 filename = tempfile.mktemp()
34 try:
35 yield filename
36 finally:
37 support.unlink(filename)
38
39class FaultHandlerTests(unittest.TestCase):
40 def get_output(self, code, expect_success, filename=None):
41 """
42 Run the specified code in Python (in a new child process) and read the
43 output from the standard error or from a file (if filename is set).
44 Return the output lines as a list.
45
46 Strip the reference count from the standard error for Python debug
47 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
48 thread XXX".
49 """
50 options = {}
51 if prepare_subprocess:
52 options['preexec_fn'] = prepare_subprocess
53 process = script_helper.spawn_python('-c', code, **options)
54 stdout, stderr = process.communicate()
55 exitcode = process.wait()
56 if expect_success:
57 self.assertEqual(exitcode, 0)
58 else:
59 self.assertNotEqual(exitcode, 0)
60 if filename:
61 with open(filename, "rb") as fp:
62 output = fp.read()
63 else:
64 output = support.strip_python_stderr(stdout)
65 output = output.decode('ascii', 'backslashreplace')
66 output = re.sub('Current thread 0x[0-9a-f]+',
67 'Current thread XXX',
68 output)
69 return output.splitlines()
70
71 def check_fatal_error(self, code, line_number, name_regex,
Victor Stinnerf0480752011-03-31 11:34:08 +020072 filename=None, all_threads=False, other_regex=None):
Victor Stinner024e37a2011-03-31 01:31:06 +020073 """
74 Check that the fault handler for fatal errors is enabled and check the
75 traceback from the child process output.
76
77 Raise an error if the output doesn't match the expected format.
78 """
79 if all_threads:
80 header = 'Current thread XXX'
81 else:
82 header = 'Traceback (most recent call first)'
83 regex = """
84^Fatal Python error: {name}
85
86{header}:
87 File "<string>", line {lineno} in <module>$
88""".strip()
89 regex = regex.format(
90 lineno=line_number,
91 name=name_regex,
92 header=re.escape(header))
Victor Stinnerf0480752011-03-31 11:34:08 +020093 if other_regex:
94 regex += '|' + other_regex
Victor Stinner024e37a2011-03-31 01:31:06 +020095 output = self.get_output(code, False, filename)
96 output = '\n'.join(output)
97 self.assertRegex(output, regex)
98
99 def test_read_null(self):
100 self.check_fatal_error("""
101import faulthandler
102faulthandler.enable()
103faulthandler._read_null()
104""".strip(),
105 3,
106 '(?:Segmentation fault|Bus error)')
107
108 def test_sigsegv(self):
109 self.check_fatal_error("""
110import faulthandler
111faulthandler.enable()
112faulthandler._sigsegv()
113""".strip(),
114 3,
115 'Segmentation fault')
116
117 @unittest.skipIf(sys.platform == 'win32',
118 "SIGFPE cannot be caught on Windows")
119 def test_sigfpe(self):
120 self.check_fatal_error("""
121import faulthandler
122faulthandler.enable()
123faulthandler._sigfpe()
124""".strip(),
125 3,
126 'Floating point exception')
127
128 @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
129 "need faulthandler._sigbus()")
130 def test_sigbus(self):
131 self.check_fatal_error("""
132import faulthandler
133faulthandler.enable()
134faulthandler._sigbus()
135""".strip(),
136 3,
137 'Bus error')
138
139 @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
140 "need faulthandler._sigill()")
141 def test_sigill(self):
142 self.check_fatal_error("""
143import faulthandler
144faulthandler.enable()
145faulthandler._sigill()
146""".strip(),
147 3,
148 'Illegal instruction')
149
150 def test_fatal_error(self):
151 self.check_fatal_error("""
152import faulthandler
153faulthandler._fatal_error(b'xyz')
154""".strip(),
155 2,
156 'xyz')
157
Victor Stinner024e37a2011-03-31 01:31:06 +0200158 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
159 'need faulthandler._stack_overflow()')
160 def test_stack_overflow(self):
161 self.check_fatal_error("""
162import faulthandler
163faulthandler.enable()
164faulthandler._stack_overflow()
165""".strip(),
166 3,
Victor Stinnerf0480752011-03-31 11:34:08 +0200167 '(?:Segmentation fault|Bus error)',
168 other_regex='unable to raise a stack overflow')
Victor Stinner024e37a2011-03-31 01:31:06 +0200169
170 def test_gil_released(self):
171 self.check_fatal_error("""
172import faulthandler
173faulthandler.enable()
174faulthandler._read_null(True)
175""".strip(),
176 3,
177 '(?:Segmentation fault|Bus error)')
178
179 def test_enable_file(self):
180 with temporary_filename() as filename:
181 self.check_fatal_error("""
182import faulthandler
183output = open({filename}, 'wb')
184faulthandler.enable(output)
185faulthandler._read_null(True)
186""".strip().format(filename=repr(filename)),
187 4,
188 '(?:Segmentation fault|Bus error)',
189 filename=filename)
190
191 def test_enable_threads(self):
192 self.check_fatal_error("""
193import faulthandler
194faulthandler.enable(all_threads=True)
195faulthandler._read_null(True)
196""".strip(),
197 3,
198 '(?:Segmentation fault|Bus error)',
199 all_threads=True)
200
201 def test_disable(self):
202 code = """
203import faulthandler
204faulthandler.enable()
205faulthandler.disable()
206faulthandler._read_null()
207""".strip()
208 not_expected = 'Fatal Python error'
209 stderr = self.get_output(code, False)
210 stder = '\n'.join(stderr)
211 self.assertTrue(not_expected not in stderr,
212 "%r is present in %r" % (not_expected, stderr))
213
214 def test_is_enabled(self):
215 was_enabled = faulthandler.is_enabled()
216 try:
217 faulthandler.enable()
218 self.assertTrue(faulthandler.is_enabled())
219 faulthandler.disable()
220 self.assertFalse(faulthandler.is_enabled())
221 finally:
222 if was_enabled:
223 faulthandler.enable()
224 else:
225 faulthandler.disable()
226
227 def check_dump_traceback(self, filename):
228 """
229 Explicitly call dump_traceback() function and check its output.
230 Raise an error if the output doesn't match the expected format.
231 """
232 code = """
233import faulthandler
234
235def funcB():
236 if {has_filename}:
237 with open({filename}, "wb") as fp:
238 faulthandler.dump_traceback(fp)
239 else:
240 faulthandler.dump_traceback()
241
242def funcA():
243 funcB()
244
245funcA()
246""".strip()
247 code = code.format(
248 filename=repr(filename),
249 has_filename=bool(filename),
250 )
251 if filename:
252 lineno = 6
253 else:
254 lineno = 8
255 expected = [
256 'Traceback (most recent call first):',
257 ' File "<string>", line %s in funcB' % lineno,
258 ' File "<string>", line 11 in funcA',
259 ' File "<string>", line 13 in <module>'
260 ]
261 trace = self.get_output(code, True, filename)
262 self.assertEqual(trace, expected)
263
264 def test_dump_traceback(self):
265 self.check_dump_traceback(None)
266 with temporary_filename() as filename:
267 self.check_dump_traceback(filename)
268
269 def check_dump_traceback_threads(self, filename):
270 """
271 Call explicitly dump_traceback(all_threads=True) and check the output.
272 Raise an error if the output doesn't match the expected format.
273 """
274 code = """
275import faulthandler
276from threading import Thread, Event
277import time
278
279def dump():
280 if {filename}:
281 with open({filename}, "wb") as fp:
282 faulthandler.dump_traceback(fp, all_threads=True)
283 else:
284 faulthandler.dump_traceback(all_threads=True)
285
286class Waiter(Thread):
287 # avoid blocking if the main thread raises an exception.
288 daemon = True
289
290 def __init__(self):
291 Thread.__init__(self)
292 self.running = Event()
293 self.stop = Event()
294
295 def run(self):
296 self.running.set()
297 self.stop.wait()
298
299waiter = Waiter()
300waiter.start()
301waiter.running.wait()
302dump()
303waiter.stop.set()
304waiter.join()
305""".strip()
306 code = code.format(filename=repr(filename))
307 output = self.get_output(code, True, filename)
308 output = '\n'.join(output)
309 if filename:
310 lineno = 8
311 else:
312 lineno = 10
313 regex = """
314^Thread 0x[0-9a-f]+:
315(?: File ".*threading.py", line [0-9]+ in wait
316)? File ".*threading.py", line [0-9]+ in wait
317 File "<string>", line 23 in run
318 File ".*threading.py", line [0-9]+ in _bootstrap_inner
319 File ".*threading.py", line [0-9]+ in _bootstrap
320
321Current thread XXX:
322 File "<string>", line {lineno} in dump
323 File "<string>", line 28 in <module>$
324""".strip()
325 regex = regex.format(lineno=lineno)
326 self.assertRegex(output, regex)
327
328 def test_dump_traceback_threads(self):
329 self.check_dump_traceback_threads(None)
330 with temporary_filename() as filename:
331 self.check_dump_traceback_threads(filename)
332
333 def _check_dump_tracebacks_later(self, repeat, cancel, filename):
334 """
335 Check how many times the traceback is written in timeout x 2.5 seconds,
336 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
337 on repeat and cancel options.
338
339 Raise an error if the output doesn't match the expect format.
340 """
341 code = """
342import faulthandler
343import time
344
345def func(repeat, cancel, timeout):
346 pause = timeout * 2.5
347 a = time.time()
348 time.sleep(pause)
349 faulthandler.cancel_dump_tracebacks_later()
350 b = time.time()
351 # Check that sleep() was not interrupted
352 assert (b -a) >= pause
353
354 if cancel:
355 pause = timeout * 1.5
356 a = time.time()
357 time.sleep(pause)
358 b = time.time()
359 # Check that sleep() was not interrupted
360 assert (b -a) >= pause
361
362timeout = 0.5
363repeat = {repeat}
364cancel = {cancel}
365if {has_filename}:
366 file = open({filename}, "wb")
367else:
368 file = None
369faulthandler.dump_tracebacks_later(timeout,
370 repeat=repeat, file=file)
371func(repeat, cancel, timeout)
372if file is not None:
373 file.close()
374""".strip()
375 code = code.format(
376 filename=repr(filename),
377 has_filename=bool(filename),
378 repeat=repeat,
379 cancel=cancel,
380 )
381 trace = self.get_output(code, True, filename)
382 trace = '\n'.join(trace)
383
384 if repeat:
385 count = 2
386 else:
387 count = 1
388 header = 'Thread 0x[0-9a-f]+:\n'
389 regex = expected_traceback(7, 30, header, count=count)
390 self.assertRegex(trace, '^%s$' % regex)
391
392 @unittest.skipIf(not hasattr(faulthandler, 'dump_tracebacks_later'),
393 'need faulthandler.dump_tracebacks_later()')
394 def check_dump_tracebacks_later(self, repeat=False, cancel=False,
395 file=False):
396 if file:
397 with temporary_filename() as filename:
398 self._check_dump_tracebacks_later(repeat, cancel, filename)
399 else:
400 self._check_dump_tracebacks_later(repeat, cancel, None)
401
402 def test_dump_tracebacks_later(self):
403 self.check_dump_tracebacks_later()
404
405 def test_dump_tracebacks_later_repeat(self):
406 self.check_dump_tracebacks_later(repeat=True)
407
408 def test_dump_tracebacks_later_repeat_cancel(self):
409 self.check_dump_tracebacks_later(repeat=True, cancel=True)
410
411 def test_dump_tracebacks_later_file(self):
412 self.check_dump_tracebacks_later(file=True)
413
414 @unittest.skipIf(not hasattr(faulthandler, "register"),
415 "need faulthandler.register")
416 def check_register(self, filename=False, all_threads=False):
417 """
418 Register a handler displaying the traceback on a user signal. Raise the
419 signal and check the written traceback.
420
421 Raise an error if the output doesn't match the expected format.
422 """
423 code = """
424import faulthandler
425import os
426import signal
427
428def func(signum):
429 os.kill(os.getpid(), signum)
430
431signum = signal.SIGUSR1
432if {has_filename}:
433 file = open({filename}, "wb")
434else:
435 file = None
436faulthandler.register(signum, file=file, all_threads={all_threads})
437func(signum)
438if file is not None:
439 file.close()
440""".strip()
441 code = code.format(
442 filename=repr(filename),
443 has_filename=bool(filename),
444 all_threads=all_threads,
445 )
446 trace = self.get_output(code, True, filename)
447 trace = '\n'.join(trace)
448 if all_threads:
449 regex = 'Current thread XXX:\n'
450 else:
451 regex = 'Traceback \(most recent call first\):\n'
452 regex = expected_traceback(6, 14, regex)
453 self.assertTrue(re.match(regex, trace),
454 "[%s] doesn't match [%s]: use_filename=%s, all_threads=%s"
455 % (regex, trace, bool(filename), all_threads))
456
457 def test_register(self):
458 self.check_register()
459
460 def test_register_file(self):
461 with temporary_filename() as filename:
462 self.check_register(filename=filename)
463
464 def test_register_threads(self):
465 self.check_register(all_threads=True)
466
467
468def test_main():
469 support.run_unittest(FaultHandlerTests)
470
471if __name__ == "__main__":
472 test_main()