bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure (GH-17670)



As reported initially by @rad-pat in #6084, the following script causes a deadlock.

```
from concurrent.futures import ProcessPoolExecutor


class ObjectWithPickleError():
    """Triggers a RuntimeError when sending job to the workers"""

    def __reduce__(self):
        raise RuntimeError()


if __name__ == "__main__":
    e = ProcessPoolExecutor()
    f = e.submit(id, ObjectWithPickleError())
    e.shutdown(wait=False)
    f.result()  # Deadlock on get
```

This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing.



https://bugs.python.org/issue39104



Automerge-Triggered-By: @pitrou
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index fd9f572..d773228 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -80,18 +80,23 @@
 
 class _ThreadWakeup:
     def __init__(self):
+        self._closed = False
         self._reader, self._writer = mp.Pipe(duplex=False)
 
     def close(self):
-        self._writer.close()
-        self._reader.close()
+        if not self._closed:
+            self._closed = True
+            self._writer.close()
+            self._reader.close()
 
     def wakeup(self):
-        self._writer.send_bytes(b"")
+        if not self._closed:
+            self._writer.send_bytes(b"")
 
     def clear(self):
-        while self._reader.poll():
-            self._reader.recv_bytes()
+        if not self._closed:
+            while self._reader.poll():
+                self._reader.recv_bytes()
 
 
 def _python_exit():
@@ -160,8 +165,9 @@
 
 class _SafeQueue(Queue):
     """Safe Queue set exception to the future object linked to a job"""
-    def __init__(self, max_size=0, *, ctx, pending_work_items):
+    def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
         self.pending_work_items = pending_work_items
+        self.thread_wakeup = thread_wakeup
         super().__init__(max_size, ctx=ctx)
 
     def _on_queue_feeder_error(self, e, obj):
@@ -169,6 +175,7 @@
             tb = traceback.format_exception(type(e), e, e.__traceback__)
             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
             work_item = self.pending_work_items.pop(obj.work_id, None)
+            self.thread_wakeup.wakeup()
             # work_item can be None if another process terminated. In this case,
             # the queue_manager_thread fails all work_items with BrokenProcessPool
             if work_item is not None:
@@ -339,6 +346,8 @@
 
         # Release the queue's resources as soon as possible.
         call_queue.close()
+        call_queue.join_thread()
+        thread_wakeup.close()
         # If .join() is not called on the created processes then
         # some ctx.Queue methods may deadlock on Mac OS X.
         for p in processes.values():
@@ -566,21 +575,6 @@
         self._pending_work_items = {}
         self._cancel_pending_futures = False
 
-        # Create communication channels for the executor
-        # Make the call queue slightly larger than the number of processes to
-        # prevent the worker processes from idling. But don't make it too big
-        # because futures in the call queue cannot be cancelled.
-        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
-        self._call_queue = _SafeQueue(
-            max_size=queue_size, ctx=self._mp_context,
-            pending_work_items=self._pending_work_items)
-        # Killed worker processes can produce spurious "broken pipe"
-        # tracebacks in the queue's own worker thread. But we detect killed
-        # processes anyway, so silence the tracebacks.
-        self._call_queue._ignore_epipe = True
-        self._result_queue = mp_context.SimpleQueue()
-        self._work_ids = queue.Queue()
-
         # _ThreadWakeup is a communication channel used to interrupt the wait
         # of the main loop of queue_manager_thread from another thread (e.g.
         # when calling executor.submit or executor.shutdown). We do not use the
@@ -589,6 +583,22 @@
         # _result_queue write lock still acquired.
         self._queue_management_thread_wakeup = _ThreadWakeup()
 
+        # Create communication channels for the executor
+        # Make the call queue slightly larger than the number of processes to
+        # prevent the worker processes from idling. But don't make it too big
+        # because futures in the call queue cannot be cancelled.
+        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
+        self._call_queue = _SafeQueue(
+            max_size=queue_size, ctx=self._mp_context,
+            pending_work_items=self._pending_work_items,
+            thread_wakeup=self._queue_management_thread_wakeup)
+        # Killed worker processes can produce spurious "broken pipe"
+        # tracebacks in the queue's own worker thread. But we detect killed
+        # processes anyway, so silence the tracebacks.
+        self._call_queue._ignore_epipe = True
+        self._result_queue = mp_context.SimpleQueue()
+        self._work_ids = queue.Queue()
+
     def _start_queue_management_thread(self):
         if self._queue_management_thread is None:
             # When the executor gets garbarge collected, the weakref callback
@@ -692,16 +702,11 @@
         # To reduce the risk of opening too many files, remove references to
         # objects that use file descriptors.
         self._queue_management_thread = None
-        if self._call_queue is not None:
-            self._call_queue.close()
-            if wait:
-                self._call_queue.join_thread()
-            self._call_queue = None
+        self._call_queue = None
         self._result_queue = None
         self._processes = None
 
         if self._queue_management_thread_wakeup:
-            self._queue_management_thread_wakeup.close()
             self._queue_management_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__