[2.7] bpo-24484: Avoid race condition in multiprocessing cleanup (GH-2159) (#2168)
* bpo-24484: Avoid race condition in multiprocessing cleanup
The finalizer registry can be mutated while inspected by multiprocessing
at process exit.
* Use test.support.start_threads()
* Add Misc/NEWS.
(cherry picked from commit 1eb6c0074d17f4fd425cacfdda893d65f5f77f0a)
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 394cc44..2920f24 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -265,6 +265,9 @@
else:
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
+ # Careful: _finalizer_registry may be mutated while this function
+ # is running (either by a GC run or by another thread).
+
items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 8a6bb78..9fcd3bd 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -2088,6 +2088,14 @@
ALLOWED_TYPES = ('processes',)
+ def setUp(self):
+ self.registry_backup = util._finalizer_registry.copy()
+ util._finalizer_registry.clear()
+
+ def tearDown(self):
+ self.assertFalse(util._finalizer_registry)
+ util._finalizer_registry.update(self.registry_backup)
+
@classmethod
def _test_finalize(cls, conn):
class Foo(object):
@@ -2137,6 +2145,59 @@
result = [obj for obj in iter(conn.recv, 'STOP')]
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
+ def test_thread_safety(self):
+ # bpo-24484: _run_finalizers() should be thread-safe
+ def cb():
+ pass
+
+ class Foo(object):
+ def __init__(self):
+ self.ref = self # create reference cycle
+ # insert finalizer at random key
+ util.Finalize(self, cb, exitpriority=random.randint(1, 100))
+
+ finish = False
+ exc = []
+
+ def run_finalizers():
+ while not finish:
+ time.sleep(random.random() * 1e-1)
+ try:
+ # A GC run will eventually happen during this,
+ # collecting stale Foo's and mutating the registry
+ util._run_finalizers()
+ except Exception as e:
+ exc.append(e)
+
+ def make_finalizers():
+ d = {}
+ while not finish:
+ try:
+ # Old Foo's get gradually replaced and later
+ # collected by the GC (because of the cyclic ref)
+ d[random.getrandbits(5)] = {Foo() for i in range(10)}
+ except Exception as e:
+ exc.append(e)
+ d.clear()
+
+ old_interval = sys.getcheckinterval()
+ old_threshold = gc.get_threshold()
+ try:
+ sys.setcheckinterval(10)
+ gc.set_threshold(5, 5, 5)
+ threads = [threading.Thread(target=run_finalizers),
+ threading.Thread(target=make_finalizers)]
+ with test_support.start_threads(threads):
+ time.sleep(4.0) # Wait a bit to trigger race condition
+ finish = True
+ if exc:
+ raise exc[0]
+ finally:
+ sys.setcheckinterval(old_interval)
+ gc.set_threshold(*old_threshold)
+ gc.collect() # Collect remaining Foo's
+
+
#
# Test that from ... import * works for each module
#