| """ | 
 | Tests for object finalization semantics, as outlined in PEP 442. | 
 | """ | 
 |  | 
 | import contextlib | 
 | import gc | 
 | import unittest | 
 | import weakref | 
 |  | 
 | import _testcapi | 
 | from test import support | 
 |  | 
 |  | 
 | class NonGCSimpleBase: | 
 |     """ | 
 |     The base class for all the objects under test, equipped with various | 
 |     testing features. | 
 |     """ | 
 |  | 
 |     survivors = [] | 
 |     del_calls = [] | 
 |     tp_del_calls = [] | 
 |     errors = [] | 
 |  | 
 |     _cleaning = False | 
 |  | 
 |     __slots__ = () | 
 |  | 
 |     @classmethod | 
 |     def _cleanup(cls): | 
 |         cls.survivors.clear() | 
 |         cls.errors.clear() | 
 |         gc.garbage.clear() | 
 |         gc.collect() | 
 |         cls.del_calls.clear() | 
 |         cls.tp_del_calls.clear() | 
 |  | 
 |     @classmethod | 
 |     @contextlib.contextmanager | 
 |     def test(cls): | 
 |         """ | 
 |         A context manager to use around all finalization tests. | 
 |         """ | 
 |         with support.disable_gc(): | 
 |             cls.del_calls.clear() | 
 |             cls.tp_del_calls.clear() | 
 |             NonGCSimpleBase._cleaning = False | 
 |             try: | 
 |                 yield | 
 |                 if cls.errors: | 
 |                     raise cls.errors[0] | 
 |             finally: | 
 |                 NonGCSimpleBase._cleaning = True | 
 |                 cls._cleanup() | 
 |  | 
 |     def check_sanity(self): | 
 |         """ | 
 |         Check the object is sane (non-broken). | 
 |         """ | 
 |  | 
 |     def __del__(self): | 
 |         """ | 
 |         PEP 442 finalizer.  Record that this was called, check the | 
 |         object is in a sane state, and invoke a side effect. | 
 |         """ | 
 |         try: | 
 |             if not self._cleaning: | 
 |                 self.del_calls.append(id(self)) | 
 |                 self.check_sanity() | 
 |                 self.side_effect() | 
 |         except Exception as e: | 
 |             self.errors.append(e) | 
 |  | 
 |     def side_effect(self): | 
 |         """ | 
 |         A side effect called on destruction. | 
 |         """ | 
 |  | 
 |  | 
 | class SimpleBase(NonGCSimpleBase): | 
 |  | 
 |     def __init__(self): | 
 |         self.id_ = id(self) | 
 |  | 
 |     def check_sanity(self): | 
 |         assert self.id_ == id(self) | 
 |  | 
 |  | 
 | class NonGC(NonGCSimpleBase): | 
 |     __slots__ = () | 
 |  | 
 | class NonGCResurrector(NonGCSimpleBase): | 
 |     __slots__ = () | 
 |  | 
 |     def side_effect(self): | 
 |         """ | 
 |         Resurrect self by storing self in a class-wide list. | 
 |         """ | 
 |         self.survivors.append(self) | 
 |  | 
 | class Simple(SimpleBase): | 
 |     pass | 
 |  | 
 | class SimpleResurrector(NonGCResurrector, SimpleBase): | 
 |     pass | 
 |  | 
 |  | 
 | class TestBase: | 
 |  | 
 |     def setUp(self): | 
 |         self.old_garbage = gc.garbage[:] | 
 |         gc.garbage[:] = [] | 
 |  | 
 |     def tearDown(self): | 
 |         # None of the tests here should put anything in gc.garbage | 
 |         try: | 
 |             self.assertEqual(gc.garbage, []) | 
 |         finally: | 
 |             del self.old_garbage | 
 |             gc.collect() | 
 |  | 
 |     def assert_del_calls(self, ids): | 
 |         self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids)) | 
 |  | 
 |     def assert_tp_del_calls(self, ids): | 
 |         self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids)) | 
 |  | 
 |     def assert_survivors(self, ids): | 
 |         self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids)) | 
 |  | 
 |     def assert_garbage(self, ids): | 
 |         self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids)) | 
 |  | 
 |     def clear_survivors(self): | 
 |         SimpleBase.survivors.clear() | 
 |  | 
 |  | 
 | class SimpleFinalizationTest(TestBase, unittest.TestCase): | 
 |     """ | 
 |     Test finalization without refcycles. | 
 |     """ | 
 |  | 
 |     def test_simple(self): | 
 |         with SimpleBase.test(): | 
 |             s = Simple() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |  | 
 |     def test_simple_resurrect(self): | 
 |         with SimpleBase.test(): | 
 |             s = SimpleResurrector() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors(ids) | 
 |             self.assertIsNot(wr(), None) | 
 |             self.clear_survivors() | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |         self.assertIs(wr(), None) | 
 |  | 
 |     def test_non_gc(self): | 
 |         with SimpleBase.test(): | 
 |             s = NonGC() | 
 |             self.assertFalse(gc.is_tracked(s)) | 
 |             ids = [id(s)] | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |  | 
 |     def test_non_gc_resurrect(self): | 
 |         with SimpleBase.test(): | 
 |             s = NonGCResurrector() | 
 |             self.assertFalse(gc.is_tracked(s)) | 
 |             ids = [id(s)] | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors(ids) | 
 |             self.clear_survivors() | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids * 2) | 
 |             self.assert_survivors(ids) | 
 |  | 
 |  | 
 | class SelfCycleBase: | 
 |  | 
 |     def __init__(self): | 
 |         super().__init__() | 
 |         self.ref = self | 
 |  | 
 |     def check_sanity(self): | 
 |         super().check_sanity() | 
 |         assert self.ref is self | 
 |  | 
 | class SimpleSelfCycle(SelfCycleBase, Simple): | 
 |     pass | 
 |  | 
 | class SelfCycleResurrector(SelfCycleBase, SimpleResurrector): | 
 |     pass | 
 |  | 
 | class SuicidalSelfCycle(SelfCycleBase, Simple): | 
 |  | 
 |     def side_effect(self): | 
 |         """ | 
 |         Explicitly break the reference cycle. | 
 |         """ | 
 |         self.ref = None | 
 |  | 
 |  | 
 | class SelfCycleFinalizationTest(TestBase, unittest.TestCase): | 
 |     """ | 
 |     Test finalization of an object having a single cyclic reference to | 
 |     itself. | 
 |     """ | 
 |  | 
 |     def test_simple(self): | 
 |         with SimpleBase.test(): | 
 |             s = SimpleSelfCycle() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |  | 
 |     def test_simple_resurrect(self): | 
 |         # Test that __del__ can resurrect the object being finalized. | 
 |         with SimpleBase.test(): | 
 |             s = SelfCycleResurrector() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors(ids) | 
 |             # XXX is this desirable? | 
 |             self.assertIs(wr(), None) | 
 |             # When trying to destroy the object a second time, __del__ | 
 |             # isn't called anymore (and the object isn't resurrected). | 
 |             self.clear_survivors() | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |  | 
 |     def test_simple_suicide(self): | 
 |         # Test the GC is able to deal with an object that kills its last | 
 |         # reference during __del__. | 
 |         with SimpleBase.test(): | 
 |             s = SuicidalSelfCycle() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |  | 
 |  | 
 | class ChainedBase: | 
 |  | 
 |     def chain(self, left): | 
 |         self.suicided = False | 
 |         self.left = left | 
 |         left.right = self | 
 |  | 
 |     def check_sanity(self): | 
 |         super().check_sanity() | 
 |         if self.suicided: | 
 |             assert self.left is None | 
 |             assert self.right is None | 
 |         else: | 
 |             left = self.left | 
 |             if left.suicided: | 
 |                 assert left.right is None | 
 |             else: | 
 |                 assert left.right is self | 
 |             right = self.right | 
 |             if right.suicided: | 
 |                 assert right.left is None | 
 |             else: | 
 |                 assert right.left is self | 
 |  | 
 | class SimpleChained(ChainedBase, Simple): | 
 |     pass | 
 |  | 
 | class ChainedResurrector(ChainedBase, SimpleResurrector): | 
 |     pass | 
 |  | 
 | class SuicidalChained(ChainedBase, Simple): | 
 |  | 
 |     def side_effect(self): | 
 |         """ | 
 |         Explicitly break the reference cycle. | 
 |         """ | 
 |         self.suicided = True | 
 |         self.left = None | 
 |         self.right = None | 
 |  | 
 |  | 
 | class CycleChainFinalizationTest(TestBase, unittest.TestCase): | 
 |     """ | 
 |     Test finalization of a cyclic chain.  These tests are similar in | 
 |     spirit to the self-cycle tests above, but the collectable object | 
 |     graph isn't trivial anymore. | 
 |     """ | 
 |  | 
 |     def build_chain(self, classes): | 
 |         nodes = [cls() for cls in classes] | 
 |         for i in range(len(nodes)): | 
 |             nodes[i].chain(nodes[i-1]) | 
 |         return nodes | 
 |  | 
 |     def check_non_resurrecting_chain(self, classes): | 
 |         N = len(classes) | 
 |         with SimpleBase.test(): | 
 |             nodes = self.build_chain(classes) | 
 |             ids = [id(s) for s in nodes] | 
 |             wrs = [weakref.ref(s) for s in nodes] | 
 |             del nodes | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertEqual([wr() for wr in wrs], [None] * N) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |  | 
 |     def check_resurrecting_chain(self, classes): | 
 |         N = len(classes) | 
 |         with SimpleBase.test(): | 
 |             nodes = self.build_chain(classes) | 
 |             N = len(nodes) | 
 |             ids = [id(s) for s in nodes] | 
 |             survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)] | 
 |             wrs = [weakref.ref(s) for s in nodes] | 
 |             del nodes | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors(survivor_ids) | 
 |             # XXX desirable? | 
 |             self.assertEqual([wr() for wr in wrs], [None] * N) | 
 |             self.clear_survivors() | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |  | 
 |     def test_homogenous(self): | 
 |         self.check_non_resurrecting_chain([SimpleChained] * 3) | 
 |  | 
 |     def test_homogenous_resurrect(self): | 
 |         self.check_resurrecting_chain([ChainedResurrector] * 3) | 
 |  | 
 |     def test_homogenous_suicidal(self): | 
 |         self.check_non_resurrecting_chain([SuicidalChained] * 3) | 
 |  | 
 |     def test_heterogenous_suicidal_one(self): | 
 |         self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2) | 
 |  | 
 |     def test_heterogenous_suicidal_two(self): | 
 |         self.check_non_resurrecting_chain( | 
 |             [SuicidalChained] * 2 + [SimpleChained] * 2) | 
 |  | 
 |     def test_heterogenous_resurrect_one(self): | 
 |         self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2) | 
 |  | 
 |     def test_heterogenous_resurrect_two(self): | 
 |         self.check_resurrecting_chain( | 
 |             [ChainedResurrector, SimpleChained, SuicidalChained] * 2) | 
 |  | 
 |     def test_heterogenous_resurrect_three(self): | 
 |         self.check_resurrecting_chain( | 
 |             [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2) | 
 |  | 
 |  | 
 | # NOTE: the tp_del slot isn't automatically inherited, so we have to call | 
 | # with_tp_del() for each instantiated class. | 
 |  | 
 | class LegacyBase(SimpleBase): | 
 |  | 
 |     def __del__(self): | 
 |         try: | 
 |             # Do not invoke side_effect here, since we are now exercising | 
 |             # the tp_del slot. | 
 |             if not self._cleaning: | 
 |                 self.del_calls.append(id(self)) | 
 |                 self.check_sanity() | 
 |         except Exception as e: | 
 |             self.errors.append(e) | 
 |  | 
 |     def __tp_del__(self): | 
 |         """ | 
 |         Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot. | 
 |         """ | 
 |         try: | 
 |             if not self._cleaning: | 
 |                 self.tp_del_calls.append(id(self)) | 
 |                 self.check_sanity() | 
 |                 self.side_effect() | 
 |         except Exception as e: | 
 |             self.errors.append(e) | 
 |  | 
 | @_testcapi.with_tp_del | 
 | class Legacy(LegacyBase): | 
 |     pass | 
 |  | 
 | @_testcapi.with_tp_del | 
 | class LegacyResurrector(LegacyBase): | 
 |  | 
 |     def side_effect(self): | 
 |         """ | 
 |         Resurrect self by storing self in a class-wide list. | 
 |         """ | 
 |         self.survivors.append(self) | 
 |  | 
 | @_testcapi.with_tp_del | 
 | class LegacySelfCycle(SelfCycleBase, LegacyBase): | 
 |     pass | 
 |  | 
 |  | 
 | class LegacyFinalizationTest(TestBase, unittest.TestCase): | 
 |     """ | 
 |     Test finalization of objects with a tp_del. | 
 |     """ | 
 |  | 
 |     def tearDown(self): | 
 |         # These tests need to clean up a bit more, since they create | 
 |         # uncollectable objects. | 
 |         gc.garbage.clear() | 
 |         gc.collect() | 
 |         super().tearDown() | 
 |  | 
 |     def test_legacy(self): | 
 |         with SimpleBase.test(): | 
 |             s = Legacy() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_tp_del_calls(ids) | 
 |             self.assert_survivors([]) | 
 |             self.assertIs(wr(), None) | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_tp_del_calls(ids) | 
 |  | 
 |     def test_legacy_resurrect(self): | 
 |         with SimpleBase.test(): | 
 |             s = LegacyResurrector() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_tp_del_calls(ids) | 
 |             self.assert_survivors(ids) | 
 |             # weakrefs are cleared before tp_del is called. | 
 |             self.assertIs(wr(), None) | 
 |             self.clear_survivors() | 
 |             gc.collect() | 
 |             self.assert_del_calls(ids) | 
 |             self.assert_tp_del_calls(ids * 2) | 
 |             self.assert_survivors(ids) | 
 |         self.assertIs(wr(), None) | 
 |  | 
 |     def test_legacy_self_cycle(self): | 
 |         # Self-cycles with legacy finalizers end up in gc.garbage. | 
 |         with SimpleBase.test(): | 
 |             s = LegacySelfCycle() | 
 |             ids = [id(s)] | 
 |             wr = weakref.ref(s) | 
 |             del s | 
 |             gc.collect() | 
 |             self.assert_del_calls([]) | 
 |             self.assert_tp_del_calls([]) | 
 |             self.assert_survivors([]) | 
 |             self.assert_garbage(ids) | 
 |             self.assertIsNot(wr(), None) | 
 |             # Break the cycle to allow collection | 
 |             gc.garbage[0].ref = None | 
 |         self.assert_garbage([]) | 
 |         self.assertIs(wr(), None) | 
 |  | 
 |  | 
 | def test_main(): | 
 |     support.run_unittest(__name__) | 
 |  | 
 | if __name__ == "__main__": | 
 |     test_main() |