bpo-29861: release references to multiprocessing Pool tasks (#743) (#803)

* bpo-29861: release references to multiprocessing Pool tasks (#743)

* bpo-29861: release references to multiprocessing Pool tasks

Release references to tasks, their arguments and their results as soon
as they are finished, instead of keeping them alive until another task
arrives.

* Comments in test

(cherry picked from commit 8988945cdc27ffa86ba8c624e095b51c459f5154)

* Fix Misc/NEWS ?
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index c66727c..163c42f 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -14,6 +14,7 @@
 import random
 import logging
 import errno
+import weakref
 import test.script_helper
 from test import test_support
 from StringIO import StringIO
@@ -1123,6 +1124,19 @@
     time.sleep(wait)
     return x*x
 
+def identity(x):
+    return x
+
+class CountedObject(object):
+    n_instances = 0
+
+    def __new__(cls):
+        cls.n_instances += 1
+        return object.__new__(cls)
+
+    def __del__(self):
+        type(self).n_instances -= 1
+
 class SayWhenError(ValueError): pass
 
 def exception_throwing_generator(total, when):
@@ -1268,6 +1282,20 @@
         p.close()
         p.join()
 
+    def test_release_task_refs(self):
+        # Issue #29861: task arguments and results should not be kept
+        # alive after we are done with them.
+        objs = list(CountedObject() for i in range(10))
+        refs = list(weakref.ref(o) for o in objs)
+        self.pool.map(identity, objs)
+
+        del objs
+        self.assertEqual(set(wr() for wr in refs), {None})
+        # With a process pool, copies of the objects are returned, check
+        # they were released too.
+        self.assertEqual(CountedObject.n_instances, 0)
+
+
 def unpickleable_result():
     return lambda: 42