bpo-29861: release references to multiprocessing Pool tasks (#743)

* bpo-29861: release references to multiprocessing Pool tasks

Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.

* Comments in test
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index ffdf426..ae8cec4 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -128,6 +128,8 @@
             util.debug("Possible encoding error while sending result: %s" % (
                 wrapped))
             put((job, i, (False, wrapped)))
+
+        task = job = result = func = args = kwds = None
         completed += 1
     util.debug('worker exiting after %d tasks' % completed)
 
@@ -402,10 +404,11 @@
                 if set_length:
                     util.debug('doing set_length()')
                     set_length(i+1)
+            finally:
+                task = taskseq = job = None
         else:
             util.debug('task handler got sentinel')
 
-
         try:
             # tell result handler to finish when cache is empty
             util.debug('task handler sending sentinel to result handler')
@@ -445,6 +448,7 @@
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         while cache and thread._state != TERMINATE:
             try:
@@ -461,6 +465,7 @@
                 cache[job]._set(i, obj)
             except KeyError:
                 pass
+            task = job = obj = None
 
         if hasattr(outqueue, '_reader'):
             util.debug('ensuring that outqueue is not full')
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index b5f4782..1d3bb0f 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -18,6 +18,7 @@
 import logging
 import struct
 import operator
+import weakref
 import test.support
 import test.support.script_helper
 
@@ -1738,6 +1739,19 @@
     time.sleep(wait)
     raise ValueError("x" * 1024**2)
 
+def identity(x):
+    return x
+
+class CountedObject(object):
+    n_instances = 0
+
+    def __new__(cls):
+        cls.n_instances += 1
+        return object.__new__(cls)
+
+    def __del__(self):
+        type(self).n_instances -= 1
+
 class SayWhenError(ValueError): pass
 
 def exception_throwing_generator(total, when):
@@ -1746,6 +1760,7 @@
             raise SayWhenError("Somebody said when")
         yield i
 
+
 class _TestPool(BaseTestCase):
 
     @classmethod
@@ -2000,6 +2015,19 @@
         # check that we indeed waited for all jobs
         self.assertGreater(time.time() - t_start, 0.9)
 
+    def test_release_task_refs(self):
+        # Issue #29861: task arguments and results should not be kept
+        # alive after we are done with them.
+        objs = [CountedObject() for i in range(10)]
+        refs = [weakref.ref(o) for o in objs]
+        self.pool.map(identity, objs)
+
+        del objs
+        self.assertEqual(set(wr() for wr in refs), {None})
+        # With a process pool, copies of the objects are returned, check
+        # they were released too.
+        self.assertEqual(CountedObject.n_instances, 0)
+
 
 def raising():
     raise KeyError("key")
diff --git a/Misc/NEWS b/Misc/NEWS
index 8003fac..bd43227 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -287,6 +287,9 @@
 Library
 -------
 
+- bpo-29861: Release references to tasks, their arguments and their results
+  as soon as they are finished in multiprocessing.Pool.
+
 - bpo-19930: The mode argument of os.makedirs() no longer affects the file
   permission bits of newly-created intermediate-level directories.