blob: 35d7913e5b89b52d90494da41bdee6fc92de4607 [file] [log] [blame]
Antoine Pitrou796564c2013-07-30 19:59:21 +02001"""
2Tests for object finalization semantics, as outlined in PEP 442.
3"""
4
5import contextlib
6import gc
7import unittest
8import weakref
9
Serhiy Storchakaf28ba362014-02-07 10:10:55 +020010try:
11 from _testcapi import with_tp_del
12except ImportError:
13 def with_tp_del(cls):
14 class C(object):
15 def __new__(cls, *args, **kwargs):
16 raise TypeError('requires _testcapi.with_tp_del')
17 return C
18
Antoine Pitrou796564c2013-07-30 19:59:21 +020019from test import support
20
21
22class NonGCSimpleBase:
23 """
24 The base class for all the objects under test, equipped with various
25 testing features.
26 """
27
28 survivors = []
29 del_calls = []
30 tp_del_calls = []
31 errors = []
32
33 _cleaning = False
34
35 __slots__ = ()
36
37 @classmethod
38 def _cleanup(cls):
39 cls.survivors.clear()
40 cls.errors.clear()
41 gc.garbage.clear()
42 gc.collect()
43 cls.del_calls.clear()
44 cls.tp_del_calls.clear()
45
46 @classmethod
47 @contextlib.contextmanager
48 def test(cls):
49 """
50 A context manager to use around all finalization tests.
51 """
52 with support.disable_gc():
53 cls.del_calls.clear()
54 cls.tp_del_calls.clear()
55 NonGCSimpleBase._cleaning = False
56 try:
57 yield
58 if cls.errors:
59 raise cls.errors[0]
60 finally:
61 NonGCSimpleBase._cleaning = True
62 cls._cleanup()
63
64 def check_sanity(self):
65 """
66 Check the object is sane (non-broken).
67 """
68
69 def __del__(self):
70 """
71 PEP 442 finalizer. Record that this was called, check the
72 object is in a sane state, and invoke a side effect.
73 """
74 try:
75 if not self._cleaning:
76 self.del_calls.append(id(self))
77 self.check_sanity()
78 self.side_effect()
79 except Exception as e:
80 self.errors.append(e)
81
82 def side_effect(self):
83 """
84 A side effect called on destruction.
85 """
86
87
88class SimpleBase(NonGCSimpleBase):
89
90 def __init__(self):
91 self.id_ = id(self)
92
93 def check_sanity(self):
94 assert self.id_ == id(self)
95
96
97class NonGC(NonGCSimpleBase):
98 __slots__ = ()
99
100class NonGCResurrector(NonGCSimpleBase):
101 __slots__ = ()
102
103 def side_effect(self):
104 """
105 Resurrect self by storing self in a class-wide list.
106 """
107 self.survivors.append(self)
108
109class Simple(SimpleBase):
110 pass
111
112class SimpleResurrector(NonGCResurrector, SimpleBase):
113 pass
114
115
116class TestBase:
117
118 def setUp(self):
119 self.old_garbage = gc.garbage[:]
120 gc.garbage[:] = []
121
122 def tearDown(self):
123 # None of the tests here should put anything in gc.garbage
124 try:
125 self.assertEqual(gc.garbage, [])
126 finally:
127 del self.old_garbage
128 gc.collect()
129
130 def assert_del_calls(self, ids):
131 self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
132
133 def assert_tp_del_calls(self, ids):
134 self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
135
136 def assert_survivors(self, ids):
137 self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
138
139 def assert_garbage(self, ids):
140 self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
141
142 def clear_survivors(self):
143 SimpleBase.survivors.clear()
144
145
146class SimpleFinalizationTest(TestBase, unittest.TestCase):
147 """
148 Test finalization without refcycles.
149 """
150
151 def test_simple(self):
152 with SimpleBase.test():
153 s = Simple()
154 ids = [id(s)]
155 wr = weakref.ref(s)
156 del s
157 gc.collect()
158 self.assert_del_calls(ids)
159 self.assert_survivors([])
160 self.assertIs(wr(), None)
161 gc.collect()
162 self.assert_del_calls(ids)
163 self.assert_survivors([])
164
165 def test_simple_resurrect(self):
166 with SimpleBase.test():
167 s = SimpleResurrector()
168 ids = [id(s)]
169 wr = weakref.ref(s)
170 del s
171 gc.collect()
172 self.assert_del_calls(ids)
173 self.assert_survivors(ids)
174 self.assertIsNot(wr(), None)
175 self.clear_survivors()
176 gc.collect()
177 self.assert_del_calls(ids)
178 self.assert_survivors([])
179 self.assertIs(wr(), None)
180
181 def test_non_gc(self):
182 with SimpleBase.test():
183 s = NonGC()
184 self.assertFalse(gc.is_tracked(s))
185 ids = [id(s)]
186 del s
187 gc.collect()
188 self.assert_del_calls(ids)
189 self.assert_survivors([])
190 gc.collect()
191 self.assert_del_calls(ids)
192 self.assert_survivors([])
193
194 def test_non_gc_resurrect(self):
195 with SimpleBase.test():
196 s = NonGCResurrector()
197 self.assertFalse(gc.is_tracked(s))
198 ids = [id(s)]
199 del s
200 gc.collect()
201 self.assert_del_calls(ids)
202 self.assert_survivors(ids)
203 self.clear_survivors()
204 gc.collect()
205 self.assert_del_calls(ids * 2)
206 self.assert_survivors(ids)
207
208
209class SelfCycleBase:
210
211 def __init__(self):
212 super().__init__()
213 self.ref = self
214
215 def check_sanity(self):
216 super().check_sanity()
217 assert self.ref is self
218
219class SimpleSelfCycle(SelfCycleBase, Simple):
220 pass
221
222class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
223 pass
224
225class SuicidalSelfCycle(SelfCycleBase, Simple):
226
227 def side_effect(self):
228 """
229 Explicitly break the reference cycle.
230 """
231 self.ref = None
232
233
234class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
235 """
236 Test finalization of an object having a single cyclic reference to
237 itself.
238 """
239
240 def test_simple(self):
241 with SimpleBase.test():
242 s = SimpleSelfCycle()
243 ids = [id(s)]
244 wr = weakref.ref(s)
245 del s
246 gc.collect()
247 self.assert_del_calls(ids)
248 self.assert_survivors([])
249 self.assertIs(wr(), None)
250 gc.collect()
251 self.assert_del_calls(ids)
252 self.assert_survivors([])
253
254 def test_simple_resurrect(self):
255 # Test that __del__ can resurrect the object being finalized.
256 with SimpleBase.test():
257 s = SelfCycleResurrector()
258 ids = [id(s)]
259 wr = weakref.ref(s)
260 del s
261 gc.collect()
262 self.assert_del_calls(ids)
263 self.assert_survivors(ids)
264 # XXX is this desirable?
265 self.assertIs(wr(), None)
266 # When trying to destroy the object a second time, __del__
267 # isn't called anymore (and the object isn't resurrected).
268 self.clear_survivors()
269 gc.collect()
270 self.assert_del_calls(ids)
271 self.assert_survivors([])
272 self.assertIs(wr(), None)
273
274 def test_simple_suicide(self):
275 # Test the GC is able to deal with an object that kills its last
276 # reference during __del__.
277 with SimpleBase.test():
278 s = SuicidalSelfCycle()
279 ids = [id(s)]
280 wr = weakref.ref(s)
281 del s
282 gc.collect()
283 self.assert_del_calls(ids)
284 self.assert_survivors([])
285 self.assertIs(wr(), None)
286 gc.collect()
287 self.assert_del_calls(ids)
288 self.assert_survivors([])
289 self.assertIs(wr(), None)
290
291
292class ChainedBase:
293
294 def chain(self, left):
295 self.suicided = False
296 self.left = left
297 left.right = self
298
299 def check_sanity(self):
300 super().check_sanity()
301 if self.suicided:
302 assert self.left is None
303 assert self.right is None
304 else:
305 left = self.left
306 if left.suicided:
307 assert left.right is None
308 else:
309 assert left.right is self
310 right = self.right
311 if right.suicided:
312 assert right.left is None
313 else:
314 assert right.left is self
315
316class SimpleChained(ChainedBase, Simple):
317 pass
318
319class ChainedResurrector(ChainedBase, SimpleResurrector):
320 pass
321
322class SuicidalChained(ChainedBase, Simple):
323
324 def side_effect(self):
325 """
326 Explicitly break the reference cycle.
327 """
328 self.suicided = True
329 self.left = None
330 self.right = None
331
332
333class CycleChainFinalizationTest(TestBase, unittest.TestCase):
334 """
335 Test finalization of a cyclic chain. These tests are similar in
336 spirit to the self-cycle tests above, but the collectable object
337 graph isn't trivial anymore.
338 """
339
340 def build_chain(self, classes):
341 nodes = [cls() for cls in classes]
342 for i in range(len(nodes)):
343 nodes[i].chain(nodes[i-1])
344 return nodes
345
346 def check_non_resurrecting_chain(self, classes):
347 N = len(classes)
348 with SimpleBase.test():
349 nodes = self.build_chain(classes)
350 ids = [id(s) for s in nodes]
351 wrs = [weakref.ref(s) for s in nodes]
352 del nodes
353 gc.collect()
354 self.assert_del_calls(ids)
355 self.assert_survivors([])
356 self.assertEqual([wr() for wr in wrs], [None] * N)
357 gc.collect()
358 self.assert_del_calls(ids)
359
360 def check_resurrecting_chain(self, classes):
361 N = len(classes)
362 with SimpleBase.test():
363 nodes = self.build_chain(classes)
364 N = len(nodes)
365 ids = [id(s) for s in nodes]
366 survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
367 wrs = [weakref.ref(s) for s in nodes]
368 del nodes
369 gc.collect()
370 self.assert_del_calls(ids)
371 self.assert_survivors(survivor_ids)
372 # XXX desirable?
373 self.assertEqual([wr() for wr in wrs], [None] * N)
374 self.clear_survivors()
375 gc.collect()
376 self.assert_del_calls(ids)
377 self.assert_survivors([])
378
379 def test_homogenous(self):
380 self.check_non_resurrecting_chain([SimpleChained] * 3)
381
382 def test_homogenous_resurrect(self):
383 self.check_resurrecting_chain([ChainedResurrector] * 3)
384
385 def test_homogenous_suicidal(self):
386 self.check_non_resurrecting_chain([SuicidalChained] * 3)
387
388 def test_heterogenous_suicidal_one(self):
389 self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
390
391 def test_heterogenous_suicidal_two(self):
392 self.check_non_resurrecting_chain(
393 [SuicidalChained] * 2 + [SimpleChained] * 2)
394
395 def test_heterogenous_resurrect_one(self):
396 self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
397
398 def test_heterogenous_resurrect_two(self):
399 self.check_resurrecting_chain(
400 [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
401
402 def test_heterogenous_resurrect_three(self):
403 self.check_resurrecting_chain(
404 [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
405
406
407# NOTE: the tp_del slot isn't automatically inherited, so we have to call
408# with_tp_del() for each instantiated class.
409
410class LegacyBase(SimpleBase):
411
412 def __del__(self):
413 try:
414 # Do not invoke side_effect here, since we are now exercising
415 # the tp_del slot.
416 if not self._cleaning:
417 self.del_calls.append(id(self))
418 self.check_sanity()
419 except Exception as e:
420 self.errors.append(e)
421
422 def __tp_del__(self):
423 """
424 Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
425 """
426 try:
427 if not self._cleaning:
428 self.tp_del_calls.append(id(self))
429 self.check_sanity()
430 self.side_effect()
431 except Exception as e:
432 self.errors.append(e)
433
Serhiy Storchakaf28ba362014-02-07 10:10:55 +0200434@with_tp_del
Antoine Pitrou796564c2013-07-30 19:59:21 +0200435class Legacy(LegacyBase):
436 pass
437
Serhiy Storchakaf28ba362014-02-07 10:10:55 +0200438@with_tp_del
Antoine Pitrou796564c2013-07-30 19:59:21 +0200439class LegacyResurrector(LegacyBase):
440
441 def side_effect(self):
442 """
443 Resurrect self by storing self in a class-wide list.
444 """
445 self.survivors.append(self)
446
Serhiy Storchakaf28ba362014-02-07 10:10:55 +0200447@with_tp_del
Antoine Pitrou796564c2013-07-30 19:59:21 +0200448class LegacySelfCycle(SelfCycleBase, LegacyBase):
449 pass
450
451
Serhiy Storchakaf28ba362014-02-07 10:10:55 +0200452@support.cpython_only
Antoine Pitrou796564c2013-07-30 19:59:21 +0200453class LegacyFinalizationTest(TestBase, unittest.TestCase):
454 """
455 Test finalization of objects with a tp_del.
456 """
457
458 def tearDown(self):
459 # These tests need to clean up a bit more, since they create
460 # uncollectable objects.
461 gc.garbage.clear()
462 gc.collect()
463 super().tearDown()
464
465 def test_legacy(self):
466 with SimpleBase.test():
467 s = Legacy()
468 ids = [id(s)]
469 wr = weakref.ref(s)
470 del s
471 gc.collect()
472 self.assert_del_calls(ids)
473 self.assert_tp_del_calls(ids)
474 self.assert_survivors([])
475 self.assertIs(wr(), None)
476 gc.collect()
477 self.assert_del_calls(ids)
478 self.assert_tp_del_calls(ids)
479
480 def test_legacy_resurrect(self):
481 with SimpleBase.test():
482 s = LegacyResurrector()
483 ids = [id(s)]
484 wr = weakref.ref(s)
485 del s
486 gc.collect()
487 self.assert_del_calls(ids)
488 self.assert_tp_del_calls(ids)
489 self.assert_survivors(ids)
490 # weakrefs are cleared before tp_del is called.
491 self.assertIs(wr(), None)
492 self.clear_survivors()
493 gc.collect()
494 self.assert_del_calls(ids)
495 self.assert_tp_del_calls(ids * 2)
496 self.assert_survivors(ids)
497 self.assertIs(wr(), None)
498
499 def test_legacy_self_cycle(self):
500 # Self-cycles with legacy finalizers end up in gc.garbage.
501 with SimpleBase.test():
502 s = LegacySelfCycle()
503 ids = [id(s)]
504 wr = weakref.ref(s)
505 del s
506 gc.collect()
507 self.assert_del_calls([])
508 self.assert_tp_del_calls([])
509 self.assert_survivors([])
510 self.assert_garbage(ids)
511 self.assertIsNot(wr(), None)
512 # Break the cycle to allow collection
513 gc.garbage[0].ref = None
514 self.assert_garbage([])
515 self.assertIs(wr(), None)
516
517
Antoine Pitrou796564c2013-07-30 19:59:21 +0200518if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500519 unittest.main()