bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)



Roughly based on https://github.com/python/cpython/commit/904e34d4e6b6007986dcc585d5c553ee8ae06f95, but with a few substantial differences.

/cc @pitrou @brianquinlan
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 4c39500..36355ae 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -318,6 +318,12 @@
                 # while waiting on new results.
                 del result_item
 
+                # attempt to increment idle process count
+                executor = self.executor_reference()
+                if executor is not None:
+                    executor._idle_worker_semaphore.release()
+                del executor
+
             if self.is_shutting_down():
                 self.flag_executor_shutting_down()
 
@@ -601,6 +607,7 @@
         # Shutdown is a two-step process.
         self._shutdown_thread = False
         self._shutdown_lock = threading.Lock()
+        self._idle_worker_semaphore = threading.Semaphore(0)
         self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
@@ -633,14 +640,18 @@
     def _start_executor_manager_thread(self):
         if self._executor_manager_thread is None:
             # Start the processes so that their sentinels are known.
-            self._adjust_process_count()
             self._executor_manager_thread = _ExecutorManagerThread(self)
             self._executor_manager_thread.start()
             _threads_wakeups[self._executor_manager_thread] = \
                 self._executor_manager_thread_wakeup
 
     def _adjust_process_count(self):
-        for _ in range(len(self._processes), self._max_workers):
+        # if there's an idle process, we don't need to spawn a new one.
+        if self._idle_worker_semaphore.acquire(blocking=False):
+            return
+
+        process_count = len(self._processes)
+        if process_count < self._max_workers:
             p = self._mp_context.Process(
                 target=_process_worker,
                 args=(self._call_queue,
@@ -669,6 +680,7 @@
             # Wake up queue management thread
             self._executor_manager_thread_wakeup.wakeup()
 
+            self._adjust_process_count()
             self._start_executor_manager_thread()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__