bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)

* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor

* Fix docstring
diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py
index b5231f8..ba8de16 100644
--- a/Lib/concurrent/futures/__init__.py
+++ b/Lib/concurrent/futures/__init__.py
@@ -10,6 +10,7 @@
                                       ALL_COMPLETED,
                                       CancelledError,
                                       TimeoutError,
+                                      BrokenExecutor,
                                       Future,
                                       Executor,
                                       wait,
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 6bace6c..4f22f7e 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -610,3 +610,9 @@
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.shutdown(wait=True)
         return False
+
+
+class BrokenExecutor(RuntimeError):
+    """
+    Raised when a executor has become non-functional after a severe failure.
+    """
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 67ebbf5..35af65d 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -131,6 +131,7 @@
         self.args = args
         self.kwargs = kwargs
 
+
 def _get_chunks(*iterables, chunksize):
     """ Iterates over zip()ed iterables in chunks. """
     it = zip(*iterables)
@@ -151,7 +152,7 @@
     """
     return [fn(*args) for args in chunk]
 
-def _process_worker(call_queue, result_queue):
+def _process_worker(call_queue, result_queue, initializer, initargs):
     """Evaluates calls from call_queue and places the results in result_queue.
 
     This worker is run in a separate process.
@@ -161,7 +162,17 @@
             evaluated by the worker.
         result_queue: A ctx.Queue of _ResultItems that will written
             to by the worker.
+        initializer: A callable initializer, or None
+        initargs: A tuple of args for the initializer
     """
+    if initializer is not None:
+        try:
+            initializer(*initargs)
+        except BaseException:
+            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+            # The parent will notice that the process stopped and
+            # mark the pool broken
+            return
     while True:
         call_item = call_queue.get(block=True)
         if call_item is None:
@@ -277,7 +288,9 @@
             # Mark the process pool broken so that submits fail right now.
             executor = executor_reference()
             if executor is not None:
-                executor._broken = True
+                executor._broken = ('A child process terminated '
+                                    'abruptly, the process pool is not '
+                                    'usable anymore')
                 executor._shutdown_thread = True
                 executor = None
             # All futures in flight must be marked failed
@@ -372,7 +385,7 @@
             yield element.pop()
 
 
-class BrokenProcessPool(RuntimeError):
+class BrokenProcessPool(_base.BrokenExecutor):
     """
     Raised when a process in a ProcessPoolExecutor terminated abruptly
     while a future was in the running state.
@@ -380,7 +393,8 @@
 
 
 class ProcessPoolExecutor(_base.Executor):
-    def __init__(self, max_workers=None, mp_context=None):
+    def __init__(self, max_workers=None, mp_context=None,
+                 initializer=None, initargs=()):
         """Initializes a new ProcessPoolExecutor instance.
 
         Args:
@@ -389,6 +403,8 @@
                 worker processes will be created as the machine has processors.
             mp_context: A multiprocessing context to launch the workers. This
                 object should provide SimpleQueue, Queue and Process.
+            initializer: An callable used to initialize worker processes.
+            initargs: A tuple of arguments to pass to the initializer.
         """
         _check_system_limits()
 
@@ -403,6 +419,11 @@
             mp_context = mp.get_context()
         self._mp_context = mp_context
 
+        if initializer is not None and not callable(initializer):
+            raise TypeError("initializer must be a callable")
+        self._initializer = initializer
+        self._initargs = initargs
+
         # Make the call queue slightly larger than the number of processes to
         # prevent the worker processes from idling. But don't make it too big
         # because futures in the call queue cannot be cancelled.
@@ -450,15 +471,16 @@
             p = self._mp_context.Process(
                 target=_process_worker,
                 args=(self._call_queue,
-                      self._result_queue))
+                      self._result_queue,
+                      self._initializer,
+                      self._initargs))
             p.start()
             self._processes[p.pid] = p
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
             if self._broken:
-                raise BrokenProcessPool('A child process terminated '
-                    'abruptly, the process pool is not usable anymore')
+                raise BrokenProcessPool(self._broken)
             if self._shutdown_thread:
                 raise RuntimeError('cannot schedule new futures after shutdown')
 
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 0b5d537..2e7100b 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -41,6 +41,7 @@
 
 atexit.register(_python_exit)
 
+
 class _WorkItem(object):
     def __init__(self, future, fn, args, kwargs):
         self.future = future
@@ -61,7 +62,17 @@
         else:
             self.future.set_result(result)
 
-def _worker(executor_reference, work_queue):
+
+def _worker(executor_reference, work_queue, initializer, initargs):
+    if initializer is not None:
+        try:
+            initializer(*initargs)
+        except BaseException:
+            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+            executor = executor_reference()
+            if executor is not None:
+                executor._initializer_failed()
+            return
     try:
         while True:
             work_item = work_queue.get(block=True)
@@ -83,18 +94,28 @@
     except BaseException:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
 
+
+class BrokenThreadPool(_base.BrokenExecutor):
+    """
+    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
+    """
+
+
 class ThreadPoolExecutor(_base.Executor):
 
     # Used to assign unique thread names when thread_name_prefix is not supplied.
     _counter = itertools.count().__next__
 
-    def __init__(self, max_workers=None, thread_name_prefix=''):
+    def __init__(self, max_workers=None, thread_name_prefix='',
+                 initializer=None, initargs=()):
         """Initializes a new ThreadPoolExecutor instance.
 
         Args:
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
             thread_name_prefix: An optional name prefix to give our threads.
+            initializer: An callable used to initialize worker threads.
+            initargs: A tuple of arguments to pass to the initializer.
         """
         if max_workers is None:
             # Use this number because ThreadPoolExecutor is often
@@ -103,16 +124,25 @@
         if max_workers <= 0:
             raise ValueError("max_workers must be greater than 0")
 
+        if initializer is not None and not callable(initializer):
+            raise TypeError("initializer must be a callable")
+
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
+        self._broken = False
         self._shutdown = False
         self._shutdown_lock = threading.Lock()
         self._thread_name_prefix = (thread_name_prefix or
                                     ("ThreadPoolExecutor-%d" % self._counter()))
+        self._initializer = initializer
+        self._initargs = initargs
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
+            if self._broken:
+                raise BrokenThreadPool(self._broken)
+
             if self._shutdown:
                 raise RuntimeError('cannot schedule new futures after shutdown')
 
@@ -137,12 +167,27 @@
                                      num_threads)
             t = threading.Thread(name=thread_name, target=_worker,
                                  args=(weakref.ref(self, weakref_cb),
-                                       self._work_queue))
+                                       self._work_queue,
+                                       self._initializer,
+                                       self._initargs))
             t.daemon = True
             t.start()
             self._threads.add(t)
             _threads_queues[t] = self._work_queue
 
+    def _initializer_failed(self):
+        with self._shutdown_lock:
+            self._broken = ('A thread initializer failed, the thread pool '
+                            'is not usable anymore')
+            # Drain work queue and mark pending futures failed
+            while True:
+                try:
+                    work_item = self._work_queue.get_nowait()
+                except queue.Empty:
+                    break
+                if work_item is not None:
+                    work_item.future.set_exception(BrokenThreadPool(self._broken))
+
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True