| """ |
| Tests for object finalization semantics, as outlined in PEP 442. |
| """ |
| |
| import contextlib |
| import gc |
| import unittest |
| import weakref |
| |
| try: |
| from _testcapi import with_tp_del |
| except ImportError: |
| def with_tp_del(cls): |
| class C(object): |
| def __new__(cls, *args, **kwargs): |
| raise TypeError('requires _testcapi.with_tp_del') |
| return C |
| |
| from test import support |
| |
| |
| class NonGCSimpleBase: |
| """ |
| The base class for all the objects under test, equipped with various |
| testing features. |
| """ |
| |
| survivors = [] |
| del_calls = [] |
| tp_del_calls = [] |
| errors = [] |
| |
| _cleaning = False |
| |
| __slots__ = () |
| |
| @classmethod |
| def _cleanup(cls): |
| cls.survivors.clear() |
| cls.errors.clear() |
| gc.garbage.clear() |
| gc.collect() |
| cls.del_calls.clear() |
| cls.tp_del_calls.clear() |
| |
| @classmethod |
| @contextlib.contextmanager |
| def test(cls): |
| """ |
| A context manager to use around all finalization tests. |
| """ |
| with support.disable_gc(): |
| cls.del_calls.clear() |
| cls.tp_del_calls.clear() |
| NonGCSimpleBase._cleaning = False |
| try: |
| yield |
| if cls.errors: |
| raise cls.errors[0] |
| finally: |
| NonGCSimpleBase._cleaning = True |
| cls._cleanup() |
| |
| def check_sanity(self): |
| """ |
| Check the object is sane (non-broken). |
| """ |
| |
| def __del__(self): |
| """ |
| PEP 442 finalizer. Record that this was called, check the |
| object is in a sane state, and invoke a side effect. |
| """ |
| try: |
| if not self._cleaning: |
| self.del_calls.append(id(self)) |
| self.check_sanity() |
| self.side_effect() |
| except Exception as e: |
| self.errors.append(e) |
| |
| def side_effect(self): |
| """ |
| A side effect called on destruction. |
| """ |
| |
| |
| class SimpleBase(NonGCSimpleBase): |
| |
| def __init__(self): |
| self.id_ = id(self) |
| |
| def check_sanity(self): |
| assert self.id_ == id(self) |
| |
| |
| class NonGC(NonGCSimpleBase): |
| __slots__ = () |
| |
| class NonGCResurrector(NonGCSimpleBase): |
| __slots__ = () |
| |
| def side_effect(self): |
| """ |
| Resurrect self by storing self in a class-wide list. |
| """ |
| self.survivors.append(self) |
| |
| class Simple(SimpleBase): |
| pass |
| |
| class SimpleResurrector(NonGCResurrector, SimpleBase): |
| pass |
| |
| |
| class TestBase: |
| |
| def setUp(self): |
| self.old_garbage = gc.garbage[:] |
| gc.garbage[:] = [] |
| |
| def tearDown(self): |
| # None of the tests here should put anything in gc.garbage |
| try: |
| self.assertEqual(gc.garbage, []) |
| finally: |
| del self.old_garbage |
| gc.collect() |
| |
| def assert_del_calls(self, ids): |
| self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids)) |
| |
| def assert_tp_del_calls(self, ids): |
| self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids)) |
| |
| def assert_survivors(self, ids): |
| self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids)) |
| |
| def assert_garbage(self, ids): |
| self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids)) |
| |
| def clear_survivors(self): |
| SimpleBase.survivors.clear() |
| |
| |
| class SimpleFinalizationTest(TestBase, unittest.TestCase): |
| """ |
| Test finalization without refcycles. |
| """ |
| |
| def test_simple(self): |
| with SimpleBase.test(): |
| s = Simple() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| |
| def test_simple_resurrect(self): |
| with SimpleBase.test(): |
| s = SimpleResurrector() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors(ids) |
| self.assertIsNot(wr(), None) |
| self.clear_survivors() |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| |
| def test_non_gc(self): |
| with SimpleBase.test(): |
| s = NonGC() |
| self.assertFalse(gc.is_tracked(s)) |
| ids = [id(s)] |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| |
| def test_non_gc_resurrect(self): |
| with SimpleBase.test(): |
| s = NonGCResurrector() |
| self.assertFalse(gc.is_tracked(s)) |
| ids = [id(s)] |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors(ids) |
| self.clear_survivors() |
| gc.collect() |
| self.assert_del_calls(ids * 2) |
| self.assert_survivors(ids) |
| |
| |
| class SelfCycleBase: |
| |
| def __init__(self): |
| super().__init__() |
| self.ref = self |
| |
| def check_sanity(self): |
| super().check_sanity() |
| assert self.ref is self |
| |
| class SimpleSelfCycle(SelfCycleBase, Simple): |
| pass |
| |
| class SelfCycleResurrector(SelfCycleBase, SimpleResurrector): |
| pass |
| |
| class SuicidalSelfCycle(SelfCycleBase, Simple): |
| |
| def side_effect(self): |
| """ |
| Explicitly break the reference cycle. |
| """ |
| self.ref = None |
| |
| |
| class SelfCycleFinalizationTest(TestBase, unittest.TestCase): |
| """ |
| Test finalization of an object having a single cyclic reference to |
| itself. |
| """ |
| |
| def test_simple(self): |
| with SimpleBase.test(): |
| s = SimpleSelfCycle() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| |
| def test_simple_resurrect(self): |
| # Test that __del__ can resurrect the object being finalized. |
| with SimpleBase.test(): |
| s = SelfCycleResurrector() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors(ids) |
| # XXX is this desirable? |
| self.assertIs(wr(), None) |
| # When trying to destroy the object a second time, __del__ |
| # isn't called anymore (and the object isn't resurrected). |
| self.clear_survivors() |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| |
| def test_simple_suicide(self): |
| # Test the GC is able to deal with an object that kills its last |
| # reference during __del__. |
| with SimpleBase.test(): |
| s = SuicidalSelfCycle() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| |
| |
| class ChainedBase: |
| |
| def chain(self, left): |
| self.suicided = False |
| self.left = left |
| left.right = self |
| |
| def check_sanity(self): |
| super().check_sanity() |
| if self.suicided: |
| assert self.left is None |
| assert self.right is None |
| else: |
| left = self.left |
| if left.suicided: |
| assert left.right is None |
| else: |
| assert left.right is self |
| right = self.right |
| if right.suicided: |
| assert right.left is None |
| else: |
| assert right.left is self |
| |
| class SimpleChained(ChainedBase, Simple): |
| pass |
| |
| class ChainedResurrector(ChainedBase, SimpleResurrector): |
| pass |
| |
| class SuicidalChained(ChainedBase, Simple): |
| |
| def side_effect(self): |
| """ |
| Explicitly break the reference cycle. |
| """ |
| self.suicided = True |
| self.left = None |
| self.right = None |
| |
| |
| class CycleChainFinalizationTest(TestBase, unittest.TestCase): |
| """ |
| Test finalization of a cyclic chain. These tests are similar in |
| spirit to the self-cycle tests above, but the collectable object |
| graph isn't trivial anymore. |
| """ |
| |
| def build_chain(self, classes): |
| nodes = [cls() for cls in classes] |
| for i in range(len(nodes)): |
| nodes[i].chain(nodes[i-1]) |
| return nodes |
| |
| def check_non_resurrecting_chain(self, classes): |
| N = len(classes) |
| with SimpleBase.test(): |
| nodes = self.build_chain(classes) |
| ids = [id(s) for s in nodes] |
| wrs = [weakref.ref(s) for s in nodes] |
| del nodes |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertEqual([wr() for wr in wrs], [None] * N) |
| gc.collect() |
| self.assert_del_calls(ids) |
| |
| def check_resurrecting_chain(self, classes): |
| N = len(classes) |
| with SimpleBase.test(): |
| nodes = self.build_chain(classes) |
| N = len(nodes) |
| ids = [id(s) for s in nodes] |
| survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)] |
| wrs = [weakref.ref(s) for s in nodes] |
| del nodes |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors(survivor_ids) |
| # XXX desirable? |
| self.assertEqual([wr() for wr in wrs], [None] * N) |
| self.clear_survivors() |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_survivors([]) |
| |
| def test_homogenous(self): |
| self.check_non_resurrecting_chain([SimpleChained] * 3) |
| |
| def test_homogenous_resurrect(self): |
| self.check_resurrecting_chain([ChainedResurrector] * 3) |
| |
| def test_homogenous_suicidal(self): |
| self.check_non_resurrecting_chain([SuicidalChained] * 3) |
| |
| def test_heterogenous_suicidal_one(self): |
| self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2) |
| |
| def test_heterogenous_suicidal_two(self): |
| self.check_non_resurrecting_chain( |
| [SuicidalChained] * 2 + [SimpleChained] * 2) |
| |
| def test_heterogenous_resurrect_one(self): |
| self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2) |
| |
| def test_heterogenous_resurrect_two(self): |
| self.check_resurrecting_chain( |
| [ChainedResurrector, SimpleChained, SuicidalChained] * 2) |
| |
| def test_heterogenous_resurrect_three(self): |
| self.check_resurrecting_chain( |
| [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2) |
| |
| |
| # NOTE: the tp_del slot isn't automatically inherited, so we have to call |
| # with_tp_del() for each instantiated class. |
| |
| class LegacyBase(SimpleBase): |
| |
| def __del__(self): |
| try: |
| # Do not invoke side_effect here, since we are now exercising |
| # the tp_del slot. |
| if not self._cleaning: |
| self.del_calls.append(id(self)) |
| self.check_sanity() |
| except Exception as e: |
| self.errors.append(e) |
| |
| def __tp_del__(self): |
| """ |
| Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot. |
| """ |
| try: |
| if not self._cleaning: |
| self.tp_del_calls.append(id(self)) |
| self.check_sanity() |
| self.side_effect() |
| except Exception as e: |
| self.errors.append(e) |
| |
| @with_tp_del |
| class Legacy(LegacyBase): |
| pass |
| |
| @with_tp_del |
| class LegacyResurrector(LegacyBase): |
| |
| def side_effect(self): |
| """ |
| Resurrect self by storing self in a class-wide list. |
| """ |
| self.survivors.append(self) |
| |
| @with_tp_del |
| class LegacySelfCycle(SelfCycleBase, LegacyBase): |
| pass |
| |
| |
| @support.cpython_only |
| class LegacyFinalizationTest(TestBase, unittest.TestCase): |
| """ |
| Test finalization of objects with a tp_del. |
| """ |
| |
| def tearDown(self): |
| # These tests need to clean up a bit more, since they create |
| # uncollectable objects. |
| gc.garbage.clear() |
| gc.collect() |
| super().tearDown() |
| |
| def test_legacy(self): |
| with SimpleBase.test(): |
| s = Legacy() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_tp_del_calls(ids) |
| self.assert_survivors([]) |
| self.assertIs(wr(), None) |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_tp_del_calls(ids) |
| |
| def test_legacy_resurrect(self): |
| with SimpleBase.test(): |
| s = LegacyResurrector() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_tp_del_calls(ids) |
| self.assert_survivors(ids) |
| # weakrefs are cleared before tp_del is called. |
| self.assertIs(wr(), None) |
| self.clear_survivors() |
| gc.collect() |
| self.assert_del_calls(ids) |
| self.assert_tp_del_calls(ids * 2) |
| self.assert_survivors(ids) |
| self.assertIs(wr(), None) |
| |
| def test_legacy_self_cycle(self): |
| # Self-cycles with legacy finalizers end up in gc.garbage. |
| with SimpleBase.test(): |
| s = LegacySelfCycle() |
| ids = [id(s)] |
| wr = weakref.ref(s) |
| del s |
| gc.collect() |
| self.assert_del_calls([]) |
| self.assert_tp_del_calls([]) |
| self.assert_survivors([]) |
| self.assert_garbage(ids) |
| self.assertIsNot(wr(), None) |
| # Break the cycle to allow collection |
| gc.garbage[0].ref = None |
| self.assert_garbage([]) |
| self.assertIs(wr(), None) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |