blob: 80c9b87988781babe8088370b4cc8fd9d5363ce1 [file] [log] [blame]
Antoine Pitrou796564c2013-07-30 19:59:21 +02001"""
2Tests for object finalization semantics, as outlined in PEP 442.
3"""
4
5import contextlib
6import gc
7import unittest
8import weakref
9
10import _testcapi
11from test import support
12
13
14class NonGCSimpleBase:
15 """
16 The base class for all the objects under test, equipped with various
17 testing features.
18 """
19
20 survivors = []
21 del_calls = []
22 tp_del_calls = []
23 errors = []
24
25 _cleaning = False
26
27 __slots__ = ()
28
29 @classmethod
30 def _cleanup(cls):
31 cls.survivors.clear()
32 cls.errors.clear()
33 gc.garbage.clear()
34 gc.collect()
35 cls.del_calls.clear()
36 cls.tp_del_calls.clear()
37
38 @classmethod
39 @contextlib.contextmanager
40 def test(cls):
41 """
42 A context manager to use around all finalization tests.
43 """
44 with support.disable_gc():
45 cls.del_calls.clear()
46 cls.tp_del_calls.clear()
47 NonGCSimpleBase._cleaning = False
48 try:
49 yield
50 if cls.errors:
51 raise cls.errors[0]
52 finally:
53 NonGCSimpleBase._cleaning = True
54 cls._cleanup()
55
56 def check_sanity(self):
57 """
58 Check the object is sane (non-broken).
59 """
60
61 def __del__(self):
62 """
63 PEP 442 finalizer. Record that this was called, check the
64 object is in a sane state, and invoke a side effect.
65 """
66 try:
67 if not self._cleaning:
68 self.del_calls.append(id(self))
69 self.check_sanity()
70 self.side_effect()
71 except Exception as e:
72 self.errors.append(e)
73
74 def side_effect(self):
75 """
76 A side effect called on destruction.
77 """
78
79
80class SimpleBase(NonGCSimpleBase):
81
82 def __init__(self):
83 self.id_ = id(self)
84
85 def check_sanity(self):
86 assert self.id_ == id(self)
87
88
89class NonGC(NonGCSimpleBase):
90 __slots__ = ()
91
92class NonGCResurrector(NonGCSimpleBase):
93 __slots__ = ()
94
95 def side_effect(self):
96 """
97 Resurrect self by storing self in a class-wide list.
98 """
99 self.survivors.append(self)
100
101class Simple(SimpleBase):
102 pass
103
104class SimpleResurrector(NonGCResurrector, SimpleBase):
105 pass
106
107
108class TestBase:
109
110 def setUp(self):
111 self.old_garbage = gc.garbage[:]
112 gc.garbage[:] = []
113
114 def tearDown(self):
115 # None of the tests here should put anything in gc.garbage
116 try:
117 self.assertEqual(gc.garbage, [])
118 finally:
119 del self.old_garbage
120 gc.collect()
121
122 def assert_del_calls(self, ids):
123 self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
124
125 def assert_tp_del_calls(self, ids):
126 self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
127
128 def assert_survivors(self, ids):
129 self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
130
131 def assert_garbage(self, ids):
132 self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
133
134 def clear_survivors(self):
135 SimpleBase.survivors.clear()
136
137
138class SimpleFinalizationTest(TestBase, unittest.TestCase):
139 """
140 Test finalization without refcycles.
141 """
142
143 def test_simple(self):
144 with SimpleBase.test():
145 s = Simple()
146 ids = [id(s)]
147 wr = weakref.ref(s)
148 del s
149 gc.collect()
150 self.assert_del_calls(ids)
151 self.assert_survivors([])
152 self.assertIs(wr(), None)
153 gc.collect()
154 self.assert_del_calls(ids)
155 self.assert_survivors([])
156
157 def test_simple_resurrect(self):
158 with SimpleBase.test():
159 s = SimpleResurrector()
160 ids = [id(s)]
161 wr = weakref.ref(s)
162 del s
163 gc.collect()
164 self.assert_del_calls(ids)
165 self.assert_survivors(ids)
166 self.assertIsNot(wr(), None)
167 self.clear_survivors()
168 gc.collect()
169 self.assert_del_calls(ids)
170 self.assert_survivors([])
171 self.assertIs(wr(), None)
172
173 def test_non_gc(self):
174 with SimpleBase.test():
175 s = NonGC()
176 self.assertFalse(gc.is_tracked(s))
177 ids = [id(s)]
178 del s
179 gc.collect()
180 self.assert_del_calls(ids)
181 self.assert_survivors([])
182 gc.collect()
183 self.assert_del_calls(ids)
184 self.assert_survivors([])
185
186 def test_non_gc_resurrect(self):
187 with SimpleBase.test():
188 s = NonGCResurrector()
189 self.assertFalse(gc.is_tracked(s))
190 ids = [id(s)]
191 del s
192 gc.collect()
193 self.assert_del_calls(ids)
194 self.assert_survivors(ids)
195 self.clear_survivors()
196 gc.collect()
197 self.assert_del_calls(ids * 2)
198 self.assert_survivors(ids)
199
200
201class SelfCycleBase:
202
203 def __init__(self):
204 super().__init__()
205 self.ref = self
206
207 def check_sanity(self):
208 super().check_sanity()
209 assert self.ref is self
210
211class SimpleSelfCycle(SelfCycleBase, Simple):
212 pass
213
214class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
215 pass
216
217class SuicidalSelfCycle(SelfCycleBase, Simple):
218
219 def side_effect(self):
220 """
221 Explicitly break the reference cycle.
222 """
223 self.ref = None
224
225
226class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
227 """
228 Test finalization of an object having a single cyclic reference to
229 itself.
230 """
231
232 def test_simple(self):
233 with SimpleBase.test():
234 s = SimpleSelfCycle()
235 ids = [id(s)]
236 wr = weakref.ref(s)
237 del s
238 gc.collect()
239 self.assert_del_calls(ids)
240 self.assert_survivors([])
241 self.assertIs(wr(), None)
242 gc.collect()
243 self.assert_del_calls(ids)
244 self.assert_survivors([])
245
246 def test_simple_resurrect(self):
247 # Test that __del__ can resurrect the object being finalized.
248 with SimpleBase.test():
249 s = SelfCycleResurrector()
250 ids = [id(s)]
251 wr = weakref.ref(s)
252 del s
253 gc.collect()
254 self.assert_del_calls(ids)
255 self.assert_survivors(ids)
256 # XXX is this desirable?
257 self.assertIs(wr(), None)
258 # When trying to destroy the object a second time, __del__
259 # isn't called anymore (and the object isn't resurrected).
260 self.clear_survivors()
261 gc.collect()
262 self.assert_del_calls(ids)
263 self.assert_survivors([])
264 self.assertIs(wr(), None)
265
266 def test_simple_suicide(self):
267 # Test the GC is able to deal with an object that kills its last
268 # reference during __del__.
269 with SimpleBase.test():
270 s = SuicidalSelfCycle()
271 ids = [id(s)]
272 wr = weakref.ref(s)
273 del s
274 gc.collect()
275 self.assert_del_calls(ids)
276 self.assert_survivors([])
277 self.assertIs(wr(), None)
278 gc.collect()
279 self.assert_del_calls(ids)
280 self.assert_survivors([])
281 self.assertIs(wr(), None)
282
283
284class ChainedBase:
285
286 def chain(self, left):
287 self.suicided = False
288 self.left = left
289 left.right = self
290
291 def check_sanity(self):
292 super().check_sanity()
293 if self.suicided:
294 assert self.left is None
295 assert self.right is None
296 else:
297 left = self.left
298 if left.suicided:
299 assert left.right is None
300 else:
301 assert left.right is self
302 right = self.right
303 if right.suicided:
304 assert right.left is None
305 else:
306 assert right.left is self
307
308class SimpleChained(ChainedBase, Simple):
309 pass
310
311class ChainedResurrector(ChainedBase, SimpleResurrector):
312 pass
313
314class SuicidalChained(ChainedBase, Simple):
315
316 def side_effect(self):
317 """
318 Explicitly break the reference cycle.
319 """
320 self.suicided = True
321 self.left = None
322 self.right = None
323
324
325class CycleChainFinalizationTest(TestBase, unittest.TestCase):
326 """
327 Test finalization of a cyclic chain. These tests are similar in
328 spirit to the self-cycle tests above, but the collectable object
329 graph isn't trivial anymore.
330 """
331
332 def build_chain(self, classes):
333 nodes = [cls() for cls in classes]
334 for i in range(len(nodes)):
335 nodes[i].chain(nodes[i-1])
336 return nodes
337
338 def check_non_resurrecting_chain(self, classes):
339 N = len(classes)
340 with SimpleBase.test():
341 nodes = self.build_chain(classes)
342 ids = [id(s) for s in nodes]
343 wrs = [weakref.ref(s) for s in nodes]
344 del nodes
345 gc.collect()
346 self.assert_del_calls(ids)
347 self.assert_survivors([])
348 self.assertEqual([wr() for wr in wrs], [None] * N)
349 gc.collect()
350 self.assert_del_calls(ids)
351
352 def check_resurrecting_chain(self, classes):
353 N = len(classes)
354 with SimpleBase.test():
355 nodes = self.build_chain(classes)
356 N = len(nodes)
357 ids = [id(s) for s in nodes]
358 survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
359 wrs = [weakref.ref(s) for s in nodes]
360 del nodes
361 gc.collect()
362 self.assert_del_calls(ids)
363 self.assert_survivors(survivor_ids)
364 # XXX desirable?
365 self.assertEqual([wr() for wr in wrs], [None] * N)
366 self.clear_survivors()
367 gc.collect()
368 self.assert_del_calls(ids)
369 self.assert_survivors([])
370
371 def test_homogenous(self):
372 self.check_non_resurrecting_chain([SimpleChained] * 3)
373
374 def test_homogenous_resurrect(self):
375 self.check_resurrecting_chain([ChainedResurrector] * 3)
376
377 def test_homogenous_suicidal(self):
378 self.check_non_resurrecting_chain([SuicidalChained] * 3)
379
380 def test_heterogenous_suicidal_one(self):
381 self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
382
383 def test_heterogenous_suicidal_two(self):
384 self.check_non_resurrecting_chain(
385 [SuicidalChained] * 2 + [SimpleChained] * 2)
386
387 def test_heterogenous_resurrect_one(self):
388 self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
389
390 def test_heterogenous_resurrect_two(self):
391 self.check_resurrecting_chain(
392 [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
393
394 def test_heterogenous_resurrect_three(self):
395 self.check_resurrecting_chain(
396 [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
397
398
399# NOTE: the tp_del slot isn't automatically inherited, so we have to call
400# with_tp_del() for each instantiated class.
401
402class LegacyBase(SimpleBase):
403
404 def __del__(self):
405 try:
406 # Do not invoke side_effect here, since we are now exercising
407 # the tp_del slot.
408 if not self._cleaning:
409 self.del_calls.append(id(self))
410 self.check_sanity()
411 except Exception as e:
412 self.errors.append(e)
413
414 def __tp_del__(self):
415 """
416 Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
417 """
418 try:
419 if not self._cleaning:
420 self.tp_del_calls.append(id(self))
421 self.check_sanity()
422 self.side_effect()
423 except Exception as e:
424 self.errors.append(e)
425
426@_testcapi.with_tp_del
427class Legacy(LegacyBase):
428 pass
429
430@_testcapi.with_tp_del
431class LegacyResurrector(LegacyBase):
432
433 def side_effect(self):
434 """
435 Resurrect self by storing self in a class-wide list.
436 """
437 self.survivors.append(self)
438
439@_testcapi.with_tp_del
440class LegacySelfCycle(SelfCycleBase, LegacyBase):
441 pass
442
443
444class LegacyFinalizationTest(TestBase, unittest.TestCase):
445 """
446 Test finalization of objects with a tp_del.
447 """
448
449 def tearDown(self):
450 # These tests need to clean up a bit more, since they create
451 # uncollectable objects.
452 gc.garbage.clear()
453 gc.collect()
454 super().tearDown()
455
456 def test_legacy(self):
457 with SimpleBase.test():
458 s = Legacy()
459 ids = [id(s)]
460 wr = weakref.ref(s)
461 del s
462 gc.collect()
463 self.assert_del_calls(ids)
464 self.assert_tp_del_calls(ids)
465 self.assert_survivors([])
466 self.assertIs(wr(), None)
467 gc.collect()
468 self.assert_del_calls(ids)
469 self.assert_tp_del_calls(ids)
470
471 def test_legacy_resurrect(self):
472 with SimpleBase.test():
473 s = LegacyResurrector()
474 ids = [id(s)]
475 wr = weakref.ref(s)
476 del s
477 gc.collect()
478 self.assert_del_calls(ids)
479 self.assert_tp_del_calls(ids)
480 self.assert_survivors(ids)
481 # weakrefs are cleared before tp_del is called.
482 self.assertIs(wr(), None)
483 self.clear_survivors()
484 gc.collect()
485 self.assert_del_calls(ids)
486 self.assert_tp_del_calls(ids * 2)
487 self.assert_survivors(ids)
488 self.assertIs(wr(), None)
489
490 def test_legacy_self_cycle(self):
491 # Self-cycles with legacy finalizers end up in gc.garbage.
492 with SimpleBase.test():
493 s = LegacySelfCycle()
494 ids = [id(s)]
495 wr = weakref.ref(s)
496 del s
497 gc.collect()
498 self.assert_del_calls([])
499 self.assert_tp_del_calls([])
500 self.assert_survivors([])
501 self.assert_garbage(ids)
502 self.assertIsNot(wr(), None)
503 # Break the cycle to allow collection
504 gc.garbage[0].ref = None
505 self.assert_garbage([])
506 self.assertIs(wr(), None)
507
508
509def test_main():
510 support.run_unittest(__name__)
511
512if __name__ == "__main__":
513 test_main()