blob: 82a76ed4c505496ae049051342b9b98f04895b59 [file] [log] [blame]
Victor Stinner024e37a2011-03-31 01:31:06 +02001from contextlib import contextmanager
2import faulthandler
3import re
4import signal
5import subprocess
6import sys
7from test import support, script_helper
8import tempfile
9import unittest
10
11try:
12 from resource import setrlimit, RLIMIT_CORE, error as resource_error
13except ImportError:
14 prepare_subprocess = None
15else:
16 def prepare_subprocess():
17 # don't create core file
18 try:
19 setrlimit(RLIMIT_CORE, (0, 0))
20 except (ValueError, resource_error):
21 pass
22
23def expected_traceback(lineno1, lineno2, header, count=1):
24 regex = header
25 regex += r' File "\<string\>", line %s in func\n' % lineno1
26 regex += r' File "\<string\>", line %s in \<module\>' % lineno2
27 if count != 1:
28 regex = (regex + '\n') * (count - 1) + regex
29 return '^' + regex + '$'
30
31@contextmanager
32def temporary_filename():
33 filename = tempfile.mktemp()
34 try:
35 yield filename
36 finally:
37 support.unlink(filename)
38
39class FaultHandlerTests(unittest.TestCase):
40 def get_output(self, code, expect_success, filename=None):
41 """
42 Run the specified code in Python (in a new child process) and read the
43 output from the standard error or from a file (if filename is set).
44 Return the output lines as a list.
45
46 Strip the reference count from the standard error for Python debug
47 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
48 thread XXX".
49 """
50 options = {}
51 if prepare_subprocess:
52 options['preexec_fn'] = prepare_subprocess
53 process = script_helper.spawn_python('-c', code, **options)
54 stdout, stderr = process.communicate()
55 exitcode = process.wait()
56 if expect_success:
57 self.assertEqual(exitcode, 0)
58 else:
59 self.assertNotEqual(exitcode, 0)
60 if filename:
61 with open(filename, "rb") as fp:
62 output = fp.read()
63 else:
64 output = support.strip_python_stderr(stdout)
65 output = output.decode('ascii', 'backslashreplace')
66 output = re.sub('Current thread 0x[0-9a-f]+',
67 'Current thread XXX',
68 output)
69 return output.splitlines()
70
71 def check_fatal_error(self, code, line_number, name_regex,
72 filename=None, all_threads=False):
73 """
74 Check that the fault handler for fatal errors is enabled and check the
75 traceback from the child process output.
76
77 Raise an error if the output doesn't match the expected format.
78 """
79 if all_threads:
80 header = 'Current thread XXX'
81 else:
82 header = 'Traceback (most recent call first)'
83 regex = """
84^Fatal Python error: {name}
85
86{header}:
87 File "<string>", line {lineno} in <module>$
88""".strip()
89 regex = regex.format(
90 lineno=line_number,
91 name=name_regex,
92 header=re.escape(header))
93 output = self.get_output(code, False, filename)
94 output = '\n'.join(output)
95 self.assertRegex(output, regex)
96
97 def test_read_null(self):
98 self.check_fatal_error("""
99import faulthandler
100faulthandler.enable()
101faulthandler._read_null()
102""".strip(),
103 3,
104 '(?:Segmentation fault|Bus error)')
105
106 def test_sigsegv(self):
107 self.check_fatal_error("""
108import faulthandler
109faulthandler.enable()
110faulthandler._sigsegv()
111""".strip(),
112 3,
113 'Segmentation fault')
114
115 @unittest.skipIf(sys.platform == 'win32',
116 "SIGFPE cannot be caught on Windows")
117 def test_sigfpe(self):
118 self.check_fatal_error("""
119import faulthandler
120faulthandler.enable()
121faulthandler._sigfpe()
122""".strip(),
123 3,
124 'Floating point exception')
125
126 @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
127 "need faulthandler._sigbus()")
128 def test_sigbus(self):
129 self.check_fatal_error("""
130import faulthandler
131faulthandler.enable()
132faulthandler._sigbus()
133""".strip(),
134 3,
135 'Bus error')
136
137 @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
138 "need faulthandler._sigill()")
139 def test_sigill(self):
140 self.check_fatal_error("""
141import faulthandler
142faulthandler.enable()
143faulthandler._sigill()
144""".strip(),
145 3,
146 'Illegal instruction')
147
148 def test_fatal_error(self):
149 self.check_fatal_error("""
150import faulthandler
151faulthandler._fatal_error(b'xyz')
152""".strip(),
153 2,
154 'xyz')
155
156 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
157 'need faulthandler._stack_overflow()')
158 def test_stack_overflow(self):
159 self.check_fatal_error("""
160import faulthandler
161faulthandler.enable()
162faulthandler._stack_overflow()
163""".strip(),
164 3,
165 '(?:Segmentation fault|Bus error)')
166
167 def test_gil_released(self):
168 self.check_fatal_error("""
169import faulthandler
170faulthandler.enable()
171faulthandler._read_null(True)
172""".strip(),
173 3,
174 '(?:Segmentation fault|Bus error)')
175
176 def test_enable_file(self):
177 with temporary_filename() as filename:
178 self.check_fatal_error("""
179import faulthandler
180output = open({filename}, 'wb')
181faulthandler.enable(output)
182faulthandler._read_null(True)
183""".strip().format(filename=repr(filename)),
184 4,
185 '(?:Segmentation fault|Bus error)',
186 filename=filename)
187
188 def test_enable_threads(self):
189 self.check_fatal_error("""
190import faulthandler
191faulthandler.enable(all_threads=True)
192faulthandler._read_null(True)
193""".strip(),
194 3,
195 '(?:Segmentation fault|Bus error)',
196 all_threads=True)
197
198 def test_disable(self):
199 code = """
200import faulthandler
201faulthandler.enable()
202faulthandler.disable()
203faulthandler._read_null()
204""".strip()
205 not_expected = 'Fatal Python error'
206 stderr = self.get_output(code, False)
207 stder = '\n'.join(stderr)
208 self.assertTrue(not_expected not in stderr,
209 "%r is present in %r" % (not_expected, stderr))
210
211 def test_is_enabled(self):
212 was_enabled = faulthandler.is_enabled()
213 try:
214 faulthandler.enable()
215 self.assertTrue(faulthandler.is_enabled())
216 faulthandler.disable()
217 self.assertFalse(faulthandler.is_enabled())
218 finally:
219 if was_enabled:
220 faulthandler.enable()
221 else:
222 faulthandler.disable()
223
224 def check_dump_traceback(self, filename):
225 """
226 Explicitly call dump_traceback() function and check its output.
227 Raise an error if the output doesn't match the expected format.
228 """
229 code = """
230import faulthandler
231
232def funcB():
233 if {has_filename}:
234 with open({filename}, "wb") as fp:
235 faulthandler.dump_traceback(fp)
236 else:
237 faulthandler.dump_traceback()
238
239def funcA():
240 funcB()
241
242funcA()
243""".strip()
244 code = code.format(
245 filename=repr(filename),
246 has_filename=bool(filename),
247 )
248 if filename:
249 lineno = 6
250 else:
251 lineno = 8
252 expected = [
253 'Traceback (most recent call first):',
254 ' File "<string>", line %s in funcB' % lineno,
255 ' File "<string>", line 11 in funcA',
256 ' File "<string>", line 13 in <module>'
257 ]
258 trace = self.get_output(code, True, filename)
259 self.assertEqual(trace, expected)
260
261 def test_dump_traceback(self):
262 self.check_dump_traceback(None)
263 with temporary_filename() as filename:
264 self.check_dump_traceback(filename)
265
266 def check_dump_traceback_threads(self, filename):
267 """
268 Call explicitly dump_traceback(all_threads=True) and check the output.
269 Raise an error if the output doesn't match the expected format.
270 """
271 code = """
272import faulthandler
273from threading import Thread, Event
274import time
275
276def dump():
277 if {filename}:
278 with open({filename}, "wb") as fp:
279 faulthandler.dump_traceback(fp, all_threads=True)
280 else:
281 faulthandler.dump_traceback(all_threads=True)
282
283class Waiter(Thread):
284 # avoid blocking if the main thread raises an exception.
285 daemon = True
286
287 def __init__(self):
288 Thread.__init__(self)
289 self.running = Event()
290 self.stop = Event()
291
292 def run(self):
293 self.running.set()
294 self.stop.wait()
295
296waiter = Waiter()
297waiter.start()
298waiter.running.wait()
299dump()
300waiter.stop.set()
301waiter.join()
302""".strip()
303 code = code.format(filename=repr(filename))
304 output = self.get_output(code, True, filename)
305 output = '\n'.join(output)
306 if filename:
307 lineno = 8
308 else:
309 lineno = 10
310 regex = """
311^Thread 0x[0-9a-f]+:
312(?: File ".*threading.py", line [0-9]+ in wait
313)? File ".*threading.py", line [0-9]+ in wait
314 File "<string>", line 23 in run
315 File ".*threading.py", line [0-9]+ in _bootstrap_inner
316 File ".*threading.py", line [0-9]+ in _bootstrap
317
318Current thread XXX:
319 File "<string>", line {lineno} in dump
320 File "<string>", line 28 in <module>$
321""".strip()
322 regex = regex.format(lineno=lineno)
323 self.assertRegex(output, regex)
324
325 def test_dump_traceback_threads(self):
326 self.check_dump_traceback_threads(None)
327 with temporary_filename() as filename:
328 self.check_dump_traceback_threads(filename)
329
330 def _check_dump_tracebacks_later(self, repeat, cancel, filename):
331 """
332 Check how many times the traceback is written in timeout x 2.5 seconds,
333 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
334 on repeat and cancel options.
335
336 Raise an error if the output doesn't match the expect format.
337 """
338 code = """
339import faulthandler
340import time
341
342def func(repeat, cancel, timeout):
343 pause = timeout * 2.5
344 a = time.time()
345 time.sleep(pause)
346 faulthandler.cancel_dump_tracebacks_later()
347 b = time.time()
348 # Check that sleep() was not interrupted
349 assert (b -a) >= pause
350
351 if cancel:
352 pause = timeout * 1.5
353 a = time.time()
354 time.sleep(pause)
355 b = time.time()
356 # Check that sleep() was not interrupted
357 assert (b -a) >= pause
358
359timeout = 0.5
360repeat = {repeat}
361cancel = {cancel}
362if {has_filename}:
363 file = open({filename}, "wb")
364else:
365 file = None
366faulthandler.dump_tracebacks_later(timeout,
367 repeat=repeat, file=file)
368func(repeat, cancel, timeout)
369if file is not None:
370 file.close()
371""".strip()
372 code = code.format(
373 filename=repr(filename),
374 has_filename=bool(filename),
375 repeat=repeat,
376 cancel=cancel,
377 )
378 trace = self.get_output(code, True, filename)
379 trace = '\n'.join(trace)
380
381 if repeat:
382 count = 2
383 else:
384 count = 1
385 header = 'Thread 0x[0-9a-f]+:\n'
386 regex = expected_traceback(7, 30, header, count=count)
387 self.assertRegex(trace, '^%s$' % regex)
388
389 @unittest.skipIf(not hasattr(faulthandler, 'dump_tracebacks_later'),
390 'need faulthandler.dump_tracebacks_later()')
391 def check_dump_tracebacks_later(self, repeat=False, cancel=False,
392 file=False):
393 if file:
394 with temporary_filename() as filename:
395 self._check_dump_tracebacks_later(repeat, cancel, filename)
396 else:
397 self._check_dump_tracebacks_later(repeat, cancel, None)
398
399 def test_dump_tracebacks_later(self):
400 self.check_dump_tracebacks_later()
401
402 def test_dump_tracebacks_later_repeat(self):
403 self.check_dump_tracebacks_later(repeat=True)
404
405 def test_dump_tracebacks_later_repeat_cancel(self):
406 self.check_dump_tracebacks_later(repeat=True, cancel=True)
407
408 def test_dump_tracebacks_later_file(self):
409 self.check_dump_tracebacks_later(file=True)
410
411 @unittest.skipIf(not hasattr(faulthandler, "register"),
412 "need faulthandler.register")
413 def check_register(self, filename=False, all_threads=False):
414 """
415 Register a handler displaying the traceback on a user signal. Raise the
416 signal and check the written traceback.
417
418 Raise an error if the output doesn't match the expected format.
419 """
420 code = """
421import faulthandler
422import os
423import signal
424
425def func(signum):
426 os.kill(os.getpid(), signum)
427
428signum = signal.SIGUSR1
429if {has_filename}:
430 file = open({filename}, "wb")
431else:
432 file = None
433faulthandler.register(signum, file=file, all_threads={all_threads})
434func(signum)
435if file is not None:
436 file.close()
437""".strip()
438 code = code.format(
439 filename=repr(filename),
440 has_filename=bool(filename),
441 all_threads=all_threads,
442 )
443 trace = self.get_output(code, True, filename)
444 trace = '\n'.join(trace)
445 if all_threads:
446 regex = 'Current thread XXX:\n'
447 else:
448 regex = 'Traceback \(most recent call first\):\n'
449 regex = expected_traceback(6, 14, regex)
450 self.assertTrue(re.match(regex, trace),
451 "[%s] doesn't match [%s]: use_filename=%s, all_threads=%s"
452 % (regex, trace, bool(filename), all_threads))
453
454 def test_register(self):
455 self.check_register()
456
457 def test_register_file(self):
458 with temporary_filename() as filename:
459 self.check_register(filename=filename)
460
461 def test_register_threads(self):
462 self.check_register(all_threads=True)
463
464
465def test_main():
466 support.run_unittest(FaultHandlerTests)
467
468if __name__ == "__main__":
469 test_main()