Issue #12352: Fix a deadlock in multiprocessing.Heap when a block is freed by
the garbage collector while the Heap lock is held.
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 52ee49d..a1f3711 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -101,6 +101,8 @@
         self._stop_to_block = {}
         self._allocated_blocks = set()
         self._arenas = []
+        # list of pending blocks to free - see free() comment below
+        self._pending_free_blocks = []
 
     @staticmethod
     def _roundup(n, alignment):
@@ -175,15 +177,39 @@
 
         return start, stop
 
-    def free(self, block):
-        # free a block returned by malloc()
-        assert os.getpid() == self._lastpid
-        self._lock.acquire()
-        try:
+    def _free_pending_blocks(self):
+        # Free all the blocks in the pending list - called with the lock held.
+        while True:
+            try:
+                block = self._pending_free_blocks.pop()
+            except IndexError:
+                break
             self._allocated_blocks.remove(block)
             self._free(block)
-        finally:
-            self._lock.release()
+
+    def free(self, block):
+        # free a block returned by malloc()
+        # Since free() can be called asynchronously by the GC, it could happen
+        # that it's called while self._lock is held: in that case,
+        # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
+        # trylock is used instead, and if the lock can't be acquired
+        # immediately, the block is added to a list of blocks to be freed
+        # synchronously sometimes later from malloc() or free(), by calling
+        # _free_pending_blocks() (appending and retrieving from a list is not
+        # strictly thread-safe but under cPython it's atomic thanks to the GIL).
+        assert os.getpid() == self._lastpid
+        if not self._lock.acquire(False):
+            # can't acquire the lock right now, add the block to the list of
+            # pending blocks to free
+            self._pending_free_blocks.append(block)
+        else:
+            # we hold the lock
+            try:
+                self._free_pending_blocks()
+                self._allocated_blocks.remove(block)
+                self._free(block)
+            finally:
+                self._lock.release()
 
     def malloc(self, size):
         # return a block of right size (possibly rounded up)
@@ -191,6 +217,7 @@
         if os.getpid() != self._lastpid:
             self.__init__()                     # reinitialize after fork
         self._lock.acquire()
+        self._free_pending_blocks()
         try:
             size = self._roundup(max(size,1), self._alignment)
             (arena, start, stop) = self._malloc(size)