bpo-29861: release references to multiprocessing Pool tasks (#743) (#803)
* bpo-29861: release references to multiprocessing Pool tasks (#743)
* bpo-29861: release references to multiprocessing Pool tasks
Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.
* Comments in test
(cherry picked from commit 8988945cdc27ffa86ba8c624e095b51c459f5154)
* Fix Misc/NEWS ?
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index ceb93aa..a47cd0f 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -120,6 +120,8 @@
debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
+
+ task = job = result = func = args = kwds = None
completed += 1
debug('worker exiting after %d tasks' % completed)
@@ -362,10 +364,11 @@
if set_length:
debug('doing set_length()')
set_length(i+1)
+ finally:
+ task = taskseq = job = None
else:
debug('task handler got sentinel')
-
try:
# tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler')
@@ -405,6 +408,7 @@
cache[job]._set(i, obj)
except KeyError:
pass
+ task = job = obj = None
while cache and thread._state != TERMINATE:
try:
@@ -421,6 +425,7 @@
cache[job]._set(i, obj)
except KeyError:
pass
+ task = job = obj = None
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index c66727c..163c42f 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -14,6 +14,7 @@
import random
import logging
import errno
+import weakref
import test.script_helper
from test import test_support
from StringIO import StringIO
@@ -1123,6 +1124,19 @@
time.sleep(wait)
return x*x
+def identity(x):
+ return x
+
+class CountedObject(object):
+ n_instances = 0
+
+ def __new__(cls):
+ cls.n_instances += 1
+ return object.__new__(cls)
+
+ def __del__(self):
+ type(self).n_instances -= 1
+
class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when):
@@ -1268,6 +1282,20 @@
p.close()
p.join()
+ def test_release_task_refs(self):
+ # Issue #29861: task arguments and results should not be kept
+ # alive after we are done with them.
+ objs = list(CountedObject() for i in range(10))
+ refs = list(weakref.ref(o) for o in objs)
+ self.pool.map(identity, objs)
+
+ del objs
+ self.assertEqual(set(wr() for wr in refs), {None})
+ # With a process pool, copies of the objects are returned, check
+ # they were released too.
+ self.assertEqual(CountedObject.n_instances, 0)
+
+
def unpickleable_result():
return lambda: 42
diff --git a/Misc/NEWS b/Misc/NEWS
index 2b492f4..de5ef7d 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,9 @@
Library
-------
+- bpo-29861: Release references to tasks, their arguments and their results
+ as soon as they are finished in multiprocessing.Pool.
+
- bpo-27880: Fixed integer overflow in cPickle when pickle large strings or
too many objects.