blob: 4cf98b0491a73523dcc2dd595ed08a2d629f079e [file] [log] [blame]
Victor Stinner024e37a2011-03-31 01:31:06 +02001from contextlib import contextmanager
2import faulthandler
3import re
4import signal
5import subprocess
6import sys
7from test import support, script_helper
8import tempfile
9import unittest
10
11try:
12 from resource import setrlimit, RLIMIT_CORE, error as resource_error
13except ImportError:
14 prepare_subprocess = None
15else:
16 def prepare_subprocess():
17 # don't create core file
18 try:
19 setrlimit(RLIMIT_CORE, (0, 0))
20 except (ValueError, resource_error):
21 pass
22
23def expected_traceback(lineno1, lineno2, header, count=1):
24 regex = header
25 regex += r' File "\<string\>", line %s in func\n' % lineno1
26 regex += r' File "\<string\>", line %s in \<module\>' % lineno2
27 if count != 1:
28 regex = (regex + '\n') * (count - 1) + regex
29 return '^' + regex + '$'
30
31@contextmanager
32def temporary_filename():
33 filename = tempfile.mktemp()
34 try:
35 yield filename
36 finally:
37 support.unlink(filename)
38
39class FaultHandlerTests(unittest.TestCase):
40 def get_output(self, code, expect_success, filename=None):
41 """
42 Run the specified code in Python (in a new child process) and read the
43 output from the standard error or from a file (if filename is set).
44 Return the output lines as a list.
45
46 Strip the reference count from the standard error for Python debug
47 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
48 thread XXX".
49 """
50 options = {}
51 if prepare_subprocess:
52 options['preexec_fn'] = prepare_subprocess
53 process = script_helper.spawn_python('-c', code, **options)
54 stdout, stderr = process.communicate()
55 exitcode = process.wait()
56 if expect_success:
57 self.assertEqual(exitcode, 0)
58 else:
59 self.assertNotEqual(exitcode, 0)
60 if filename:
61 with open(filename, "rb") as fp:
62 output = fp.read()
63 else:
64 output = support.strip_python_stderr(stdout)
65 output = output.decode('ascii', 'backslashreplace')
66 output = re.sub('Current thread 0x[0-9a-f]+',
67 'Current thread XXX',
68 output)
69 return output.splitlines()
70
71 def check_fatal_error(self, code, line_number, name_regex,
72 filename=None, all_threads=False):
73 """
74 Check that the fault handler for fatal errors is enabled and check the
75 traceback from the child process output.
76
77 Raise an error if the output doesn't match the expected format.
78 """
79 if all_threads:
80 header = 'Current thread XXX'
81 else:
82 header = 'Traceback (most recent call first)'
83 regex = """
84^Fatal Python error: {name}
85
86{header}:
87 File "<string>", line {lineno} in <module>$
88""".strip()
89 regex = regex.format(
90 lineno=line_number,
91 name=name_regex,
92 header=re.escape(header))
93 output = self.get_output(code, False, filename)
94 output = '\n'.join(output)
95 self.assertRegex(output, regex)
96
97 def test_read_null(self):
98 self.check_fatal_error("""
99import faulthandler
100faulthandler.enable()
101faulthandler._read_null()
102""".strip(),
103 3,
104 '(?:Segmentation fault|Bus error)')
105
106 def test_sigsegv(self):
107 self.check_fatal_error("""
108import faulthandler
109faulthandler.enable()
110faulthandler._sigsegv()
111""".strip(),
112 3,
113 'Segmentation fault')
114
115 @unittest.skipIf(sys.platform == 'win32',
116 "SIGFPE cannot be caught on Windows")
117 def test_sigfpe(self):
118 self.check_fatal_error("""
119import faulthandler
120faulthandler.enable()
121faulthandler._sigfpe()
122""".strip(),
123 3,
124 'Floating point exception')
125
126 @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
127 "need faulthandler._sigbus()")
128 def test_sigbus(self):
129 self.check_fatal_error("""
130import faulthandler
131faulthandler.enable()
132faulthandler._sigbus()
133""".strip(),
134 3,
135 'Bus error')
136
137 @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
138 "need faulthandler._sigill()")
139 def test_sigill(self):
140 self.check_fatal_error("""
141import faulthandler
142faulthandler.enable()
143faulthandler._sigill()
144""".strip(),
145 3,
146 'Illegal instruction')
147
148 def test_fatal_error(self):
149 self.check_fatal_error("""
150import faulthandler
151faulthandler._fatal_error(b'xyz')
152""".strip(),
153 2,
154 'xyz')
155
Victor Stinner0862d6e2011-03-31 02:05:54 +0200156 @unittest.skipIf(True, 'test disabled, see #11393')
Victor Stinner024e37a2011-03-31 01:31:06 +0200157 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
158 'need faulthandler._stack_overflow()')
159 def test_stack_overflow(self):
160 self.check_fatal_error("""
161import faulthandler
162faulthandler.enable()
163faulthandler._stack_overflow()
164""".strip(),
165 3,
166 '(?:Segmentation fault|Bus error)')
167
168 def test_gil_released(self):
169 self.check_fatal_error("""
170import faulthandler
171faulthandler.enable()
172faulthandler._read_null(True)
173""".strip(),
174 3,
175 '(?:Segmentation fault|Bus error)')
176
177 def test_enable_file(self):
178 with temporary_filename() as filename:
179 self.check_fatal_error("""
180import faulthandler
181output = open({filename}, 'wb')
182faulthandler.enable(output)
183faulthandler._read_null(True)
184""".strip().format(filename=repr(filename)),
185 4,
186 '(?:Segmentation fault|Bus error)',
187 filename=filename)
188
189 def test_enable_threads(self):
190 self.check_fatal_error("""
191import faulthandler
192faulthandler.enable(all_threads=True)
193faulthandler._read_null(True)
194""".strip(),
195 3,
196 '(?:Segmentation fault|Bus error)',
197 all_threads=True)
198
199 def test_disable(self):
200 code = """
201import faulthandler
202faulthandler.enable()
203faulthandler.disable()
204faulthandler._read_null()
205""".strip()
206 not_expected = 'Fatal Python error'
207 stderr = self.get_output(code, False)
208 stder = '\n'.join(stderr)
209 self.assertTrue(not_expected not in stderr,
210 "%r is present in %r" % (not_expected, stderr))
211
212 def test_is_enabled(self):
213 was_enabled = faulthandler.is_enabled()
214 try:
215 faulthandler.enable()
216 self.assertTrue(faulthandler.is_enabled())
217 faulthandler.disable()
218 self.assertFalse(faulthandler.is_enabled())
219 finally:
220 if was_enabled:
221 faulthandler.enable()
222 else:
223 faulthandler.disable()
224
225 def check_dump_traceback(self, filename):
226 """
227 Explicitly call dump_traceback() function and check its output.
228 Raise an error if the output doesn't match the expected format.
229 """
230 code = """
231import faulthandler
232
233def funcB():
234 if {has_filename}:
235 with open({filename}, "wb") as fp:
236 faulthandler.dump_traceback(fp)
237 else:
238 faulthandler.dump_traceback()
239
240def funcA():
241 funcB()
242
243funcA()
244""".strip()
245 code = code.format(
246 filename=repr(filename),
247 has_filename=bool(filename),
248 )
249 if filename:
250 lineno = 6
251 else:
252 lineno = 8
253 expected = [
254 'Traceback (most recent call first):',
255 ' File "<string>", line %s in funcB' % lineno,
256 ' File "<string>", line 11 in funcA',
257 ' File "<string>", line 13 in <module>'
258 ]
259 trace = self.get_output(code, True, filename)
260 self.assertEqual(trace, expected)
261
262 def test_dump_traceback(self):
263 self.check_dump_traceback(None)
264 with temporary_filename() as filename:
265 self.check_dump_traceback(filename)
266
267 def check_dump_traceback_threads(self, filename):
268 """
269 Call explicitly dump_traceback(all_threads=True) and check the output.
270 Raise an error if the output doesn't match the expected format.
271 """
272 code = """
273import faulthandler
274from threading import Thread, Event
275import time
276
277def dump():
278 if {filename}:
279 with open({filename}, "wb") as fp:
280 faulthandler.dump_traceback(fp, all_threads=True)
281 else:
282 faulthandler.dump_traceback(all_threads=True)
283
284class Waiter(Thread):
285 # avoid blocking if the main thread raises an exception.
286 daemon = True
287
288 def __init__(self):
289 Thread.__init__(self)
290 self.running = Event()
291 self.stop = Event()
292
293 def run(self):
294 self.running.set()
295 self.stop.wait()
296
297waiter = Waiter()
298waiter.start()
299waiter.running.wait()
300dump()
301waiter.stop.set()
302waiter.join()
303""".strip()
304 code = code.format(filename=repr(filename))
305 output = self.get_output(code, True, filename)
306 output = '\n'.join(output)
307 if filename:
308 lineno = 8
309 else:
310 lineno = 10
311 regex = """
312^Thread 0x[0-9a-f]+:
313(?: File ".*threading.py", line [0-9]+ in wait
314)? File ".*threading.py", line [0-9]+ in wait
315 File "<string>", line 23 in run
316 File ".*threading.py", line [0-9]+ in _bootstrap_inner
317 File ".*threading.py", line [0-9]+ in _bootstrap
318
319Current thread XXX:
320 File "<string>", line {lineno} in dump
321 File "<string>", line 28 in <module>$
322""".strip()
323 regex = regex.format(lineno=lineno)
324 self.assertRegex(output, regex)
325
326 def test_dump_traceback_threads(self):
327 self.check_dump_traceback_threads(None)
328 with temporary_filename() as filename:
329 self.check_dump_traceback_threads(filename)
330
331 def _check_dump_tracebacks_later(self, repeat, cancel, filename):
332 """
333 Check how many times the traceback is written in timeout x 2.5 seconds,
334 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
335 on repeat and cancel options.
336
337 Raise an error if the output doesn't match the expect format.
338 """
339 code = """
340import faulthandler
341import time
342
343def func(repeat, cancel, timeout):
344 pause = timeout * 2.5
345 a = time.time()
346 time.sleep(pause)
347 faulthandler.cancel_dump_tracebacks_later()
348 b = time.time()
349 # Check that sleep() was not interrupted
350 assert (b -a) >= pause
351
352 if cancel:
353 pause = timeout * 1.5
354 a = time.time()
355 time.sleep(pause)
356 b = time.time()
357 # Check that sleep() was not interrupted
358 assert (b -a) >= pause
359
360timeout = 0.5
361repeat = {repeat}
362cancel = {cancel}
363if {has_filename}:
364 file = open({filename}, "wb")
365else:
366 file = None
367faulthandler.dump_tracebacks_later(timeout,
368 repeat=repeat, file=file)
369func(repeat, cancel, timeout)
370if file is not None:
371 file.close()
372""".strip()
373 code = code.format(
374 filename=repr(filename),
375 has_filename=bool(filename),
376 repeat=repeat,
377 cancel=cancel,
378 )
379 trace = self.get_output(code, True, filename)
380 trace = '\n'.join(trace)
381
382 if repeat:
383 count = 2
384 else:
385 count = 1
386 header = 'Thread 0x[0-9a-f]+:\n'
387 regex = expected_traceback(7, 30, header, count=count)
388 self.assertRegex(trace, '^%s$' % regex)
389
390 @unittest.skipIf(not hasattr(faulthandler, 'dump_tracebacks_later'),
391 'need faulthandler.dump_tracebacks_later()')
392 def check_dump_tracebacks_later(self, repeat=False, cancel=False,
393 file=False):
394 if file:
395 with temporary_filename() as filename:
396 self._check_dump_tracebacks_later(repeat, cancel, filename)
397 else:
398 self._check_dump_tracebacks_later(repeat, cancel, None)
399
400 def test_dump_tracebacks_later(self):
401 self.check_dump_tracebacks_later()
402
403 def test_dump_tracebacks_later_repeat(self):
404 self.check_dump_tracebacks_later(repeat=True)
405
406 def test_dump_tracebacks_later_repeat_cancel(self):
407 self.check_dump_tracebacks_later(repeat=True, cancel=True)
408
409 def test_dump_tracebacks_later_file(self):
410 self.check_dump_tracebacks_later(file=True)
411
412 @unittest.skipIf(not hasattr(faulthandler, "register"),
413 "need faulthandler.register")
414 def check_register(self, filename=False, all_threads=False):
415 """
416 Register a handler displaying the traceback on a user signal. Raise the
417 signal and check the written traceback.
418
419 Raise an error if the output doesn't match the expected format.
420 """
421 code = """
422import faulthandler
423import os
424import signal
425
426def func(signum):
427 os.kill(os.getpid(), signum)
428
429signum = signal.SIGUSR1
430if {has_filename}:
431 file = open({filename}, "wb")
432else:
433 file = None
434faulthandler.register(signum, file=file, all_threads={all_threads})
435func(signum)
436if file is not None:
437 file.close()
438""".strip()
439 code = code.format(
440 filename=repr(filename),
441 has_filename=bool(filename),
442 all_threads=all_threads,
443 )
444 trace = self.get_output(code, True, filename)
445 trace = '\n'.join(trace)
446 if all_threads:
447 regex = 'Current thread XXX:\n'
448 else:
449 regex = 'Traceback \(most recent call first\):\n'
450 regex = expected_traceback(6, 14, regex)
451 self.assertTrue(re.match(regex, trace),
452 "[%s] doesn't match [%s]: use_filename=%s, all_threads=%s"
453 % (regex, trace, bool(filename), all_threads))
454
455 def test_register(self):
456 self.check_register()
457
458 def test_register_file(self):
459 with temporary_filename() as filename:
460 self.check_register(filename=filename)
461
462 def test_register_threads(self):
463 self.check_register(all_threads=True)
464
465
466def test_main():
467 support.run_unittest(FaultHandlerTests)
468
469if __name__ == "__main__":
470 test_main()