Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation.  Previously it
would reliably freeze/deadlock.
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index f0bf6d5..c2331e7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -46,10 +46,11 @@
 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
 
 import atexit
+import os
 from concurrent.futures import _base
 import queue
 import multiprocessing
-from multiprocessing.queues import SimpleQueue
+from multiprocessing.queues import SimpleQueue, SentinelReady
 import threading
 import weakref
 
@@ -122,7 +123,7 @@
         call_item = call_queue.get(block=True)
         if call_item is None:
             # Wake up queue management thread
-            result_queue.put(None)
+            result_queue.put(os.getpid())
             return
         try:
             r = call_item.fn(*call_item.args, **call_item.kwargs)
@@ -194,29 +195,63 @@
         result_queue: A multiprocessing.Queue of _ResultItems generated by the
             process workers.
     """
-    nb_shutdown_processes = 0
-    def shutdown_one_process():
-        """Tell a worker to terminate, which will in turn wake us again"""
-        nonlocal nb_shutdown_processes
-        call_queue.put(None)
-        nb_shutdown_processes += 1
+
+    def shutdown_worker():
+        # This is an upper bound
+        nb_children_alive = sum(p.is_alive() for p in processes.values())
+        for i in range(0, nb_children_alive):
+            call_queue.put(None)
+        # If .join() is not called on the created processes then
+        # some multiprocessing.Queue methods may deadlock on Mac OS
+        # X.
+        for p in processes.values():
+            p.join()
+
     while True:
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
 
-        result_item = result_queue.get()
-        if result_item is not None:
-            work_item = pending_work_items[result_item.work_id]
-            del pending_work_items[result_item.work_id]
-
-            if result_item.exception:
-                work_item.future.set_exception(result_item.exception)
-            else:
-                work_item.future.set_result(result_item.result)
-            continue
-        # If we come here, we either got a timeout or were explicitly woken up.
-        # In either case, check whether we should start shutting down.
+        sentinels = [p.sentinel for p in processes.values()]
+        assert sentinels
+        try:
+            result_item = result_queue.get(sentinels=sentinels)
+        except SentinelReady as e:
+            # Mark the process pool broken so that submits fail right now.
+            executor = executor_reference()
+            if executor is not None:
+                executor._broken = True
+                executor._shutdown_thread = True
+                del executor
+            # All futures in flight must be marked failed
+            for work_id, work_item in pending_work_items.items():
+                work_item.future.set_exception(
+                    BrokenProcessPool(
+                        "A process in the process pool was "
+                        "terminated abruptly while the future was "
+                        "running or pending."
+                    ))
+            pending_work_items.clear()
+            # Terminate remaining workers forcibly: the queues or their
+            # locks may be in a dirty state and block forever.
+            for p in processes.values():
+                p.terminate()
+            for p in processes.values():
+                p.join()
+            return
+        if isinstance(result_item, int):
+            # Clean shutdown of a worker using its PID
+            # (avoids marking the executor broken)
+            del processes[result_item]
+        elif result_item is not None:
+            work_item = pending_work_items.pop(result_item.work_id, None)
+            # work_item can be None if another process terminated (see above)
+            if work_item is not None:
+                if result_item.exception:
+                    work_item.future.set_exception(result_item.exception)
+                else:
+                    work_item.future.set_result(result_item.result)
+        # Check whether we should start shutting down.
         executor = executor_reference()
         # No more work items can be added if:
         #   - The interpreter is shutting down OR
@@ -226,17 +261,11 @@
             # Since no new work items can be added, it is safe to shutdown
             # this thread if there are no pending work items.
             if not pending_work_items:
-                while nb_shutdown_processes < len(processes):
-                    shutdown_one_process()
-                # If .join() is not called on the created processes then
-                # some multiprocessing.Queue methods may deadlock on Mac OS
-                # X.
-                for p in processes:
-                    p.join()
+                shutdown_worker()
                 return
             else:
                 # Start shutting down by telling a process it can exit.
-                shutdown_one_process()
+                call_queue.put(None)
         del executor
 
 _system_limits_checked = False
@@ -264,6 +293,14 @@
     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
     raise NotImplementedError(_system_limited)
 
+
+class BrokenProcessPool(RuntimeError):
+    """
+    Raised when a process in a ProcessPoolExecutor terminated abruptly
+    while a future was in the running state.
+    """
+
+
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None):
         """Initializes a new ProcessPoolExecutor instance.
@@ -288,11 +325,13 @@
         self._result_queue = SimpleQueue()
         self._work_ids = queue.Queue()
         self._queue_management_thread = None
-        self._processes = set()
+        # Map of pids to processes
+        self._processes = {}
 
         # Shutdown is a two-step process.
         self._shutdown_thread = False
         self._shutdown_lock = threading.Lock()
+        self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
 
@@ -302,6 +341,8 @@
         def weakref_cb(_, q=self._result_queue):
             q.put(None)
         if self._queue_management_thread is None:
+            # Start the processes so that their sentinels are known.
+            self._adjust_process_count()
             self._queue_management_thread = threading.Thread(
                     target=_queue_management_worker,
                     args=(weakref.ref(self, weakref_cb),
@@ -321,10 +362,13 @@
                     args=(self._call_queue,
                           self._result_queue))
             p.start()
-            self._processes.add(p)
+            self._processes[p.pid] = p
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
+            if self._broken:
+                raise BrokenProcessPool('A child process terminated '
+                    'abruptly, the process pool is not usable anymore')
             if self._shutdown_thread:
                 raise RuntimeError('cannot schedule new futures after shutdown')
 
@@ -338,7 +382,6 @@
             self._result_queue.put(None)
 
             self._start_queue_management_thread()
-            self._adjust_process_count()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__