Issue #19542: Fix bugs in WeakValueDictionary.setdefault() and WeakValueDictionary.pop()
when a GC collection happens in another thread.
Original patch and report by Armin Rigo.
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 871f427..e339da5 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -1710,3 +1710,13 @@
# The sequence should be deallocated just after the end of iterating
gc_collect()
test.assertTrue(done[0])
+
+@contextlib.contextmanager
+def disable_gc():
+ have_gc = gc.isenabled()
+ gc.disable()
+ try:
+ yield
+ finally:
+ if have_gc:
+ gc.enable()
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index 4073d49..779a9b3 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -6,6 +6,7 @@
import operator
import contextlib
import copy
+import time
from test import test_support
@@ -56,6 +57,32 @@
self.cycle = self
+@contextlib.contextmanager
+def collect_in_thread(period=0.0001):
+ """
+ Ensure GC collections happen in a different thread, at a high frequency.
+ """
+ threading = test_support.import_module('threading')
+ please_stop = False
+
+ def collect():
+ while not please_stop:
+ time.sleep(period)
+ gc.collect()
+
+ with test_support.disable_gc():
+ old_interval = sys.getcheckinterval()
+ sys.setcheckinterval(20)
+ t = threading.Thread(target=collect)
+ t.start()
+ try:
+ yield
+ finally:
+ please_stop = True
+ t.join()
+ sys.setcheckinterval(old_interval)
+
+
class TestBase(unittest.TestCase):
def setUp(self):
@@ -1394,6 +1421,23 @@
self.assertEqual(len(d), 0)
self.assertEqual(count, 2)
+ def test_threaded_weak_valued_setdefault(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(50000):
+ x = d.setdefault(10, RefCycle())
+ self.assertIsNot(x, None) # we never put None in there!
+ del x
+
+ def test_threaded_weak_valued_pop(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(50000):
+ d[10] = RefCycle()
+ x = d.pop(10, 10)
+ self.assertIsNot(x, None) # we never put None in there!
+
+
from test import mapping_tests
class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):