| import atexit |
| import os |
| import sys |
| import textwrap |
| import unittest |
| from test import support |
| from test.support import script_helper |
| |
| |
| class GeneralTest(unittest.TestCase): |
| def test_general(self): |
| # Run _test_atexit.py in a subprocess since it calls atexit._clear() |
| script = support.findfile("_test_atexit.py") |
| script_helper.run_test_script(script) |
| |
| class FunctionalTest(unittest.TestCase): |
| def test_shutdown(self): |
| # Actually test the shutdown mechanism in a subprocess |
| code = textwrap.dedent(""" |
| import atexit |
| |
| def f(msg): |
| print(msg) |
| |
| atexit.register(f, "one") |
| atexit.register(f, "two") |
| """) |
| res = script_helper.assert_python_ok("-c", code) |
| self.assertEqual(res.out.decode().splitlines(), ["two", "one"]) |
| self.assertFalse(res.err) |
| |
| def test_atexit_instances(self): |
| # bpo-42639: It is safe to have more than one atexit instance. |
| code = textwrap.dedent(""" |
| import sys |
| import atexit as atexit1 |
| del sys.modules['atexit'] |
| import atexit as atexit2 |
| del sys.modules['atexit'] |
| |
| assert atexit2 is not atexit1 |
| |
| atexit1.register(print, "atexit1") |
| atexit2.register(print, "atexit2") |
| """) |
| res = script_helper.assert_python_ok("-c", code) |
| self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"]) |
| self.assertFalse(res.err) |
| |
| |
| @support.cpython_only |
| class SubinterpreterTest(unittest.TestCase): |
| |
| def test_callbacks_leak(self): |
| # This test shows a leak in refleak mode if atexit doesn't |
| # take care to free callbacks in its per-subinterpreter module |
| # state. |
| n = atexit._ncallbacks() |
| code = textwrap.dedent(r""" |
| import atexit |
| def f(): |
| pass |
| atexit.register(f) |
| del atexit |
| """) |
| ret = support.run_in_subinterp(code) |
| self.assertEqual(ret, 0) |
| self.assertEqual(atexit._ncallbacks(), n) |
| |
| def test_callbacks_leak_refcycle(self): |
| # Similar to the above, but with a refcycle through the atexit |
| # module. |
| n = atexit._ncallbacks() |
| code = textwrap.dedent(r""" |
| import atexit |
| def f(): |
| pass |
| atexit.register(f) |
| atexit.__atexit = atexit |
| """) |
| ret = support.run_in_subinterp(code) |
| self.assertEqual(ret, 0) |
| self.assertEqual(atexit._ncallbacks(), n) |
| |
| def test_callback_on_subinterpreter_teardown(self): |
| # This tests if a callback is called on |
| # subinterpreter teardown. |
| expected = b"The test has passed!" |
| r, w = os.pipe() |
| |
| code = textwrap.dedent(r""" |
| import os |
| import atexit |
| def callback(): |
| os.write({:d}, b"The test has passed!") |
| atexit.register(callback) |
| """.format(w)) |
| ret = support.run_in_subinterp(code) |
| os.close(w) |
| self.assertEqual(os.read(r, len(expected)), expected) |
| os.close(r) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |