bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)

* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor

* Fix docstring
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 67ebbf5..35af65d 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -131,6 +131,7 @@
         self.args = args
         self.kwargs = kwargs
 
+
 def _get_chunks(*iterables, chunksize):
     """ Iterates over zip()ed iterables in chunks. """
     it = zip(*iterables)
@@ -151,7 +152,7 @@
     """
     return [fn(*args) for args in chunk]
 
-def _process_worker(call_queue, result_queue):
+def _process_worker(call_queue, result_queue, initializer, initargs):
     """Evaluates calls from call_queue and places the results in result_queue.
 
     This worker is run in a separate process.
@@ -161,7 +162,17 @@
             evaluated by the worker.
         result_queue: A ctx.Queue of _ResultItems that will written
             to by the worker.
+        initializer: A callable initializer, or None
+        initargs: A tuple of args for the initializer
     """
+    if initializer is not None:
+        try:
+            initializer(*initargs)
+        except BaseException:
+            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+            # The parent will notice that the process stopped and
+            # mark the pool broken
+            return
     while True:
         call_item = call_queue.get(block=True)
         if call_item is None:
@@ -277,7 +288,9 @@
             # Mark the process pool broken so that submits fail right now.
             executor = executor_reference()
             if executor is not None:
-                executor._broken = True
+                executor._broken = ('A child process terminated '
+                                    'abruptly, the process pool is not '
+                                    'usable anymore')
                 executor._shutdown_thread = True
                 executor = None
             # All futures in flight must be marked failed
@@ -372,7 +385,7 @@
             yield element.pop()
 
 
-class BrokenProcessPool(RuntimeError):
+class BrokenProcessPool(_base.BrokenExecutor):
     """
     Raised when a process in a ProcessPoolExecutor terminated abruptly
     while a future was in the running state.
@@ -380,7 +393,8 @@
 
 
 class ProcessPoolExecutor(_base.Executor):
-    def __init__(self, max_workers=None, mp_context=None):
+    def __init__(self, max_workers=None, mp_context=None,
+                 initializer=None, initargs=()):
         """Initializes a new ProcessPoolExecutor instance.
 
         Args:
@@ -389,6 +403,8 @@
                 worker processes will be created as the machine has processors.
             mp_context: A multiprocessing context to launch the workers. This
                 object should provide SimpleQueue, Queue and Process.
+            initializer: An callable used to initialize worker processes.
+            initargs: A tuple of arguments to pass to the initializer.
         """
         _check_system_limits()
 
@@ -403,6 +419,11 @@
             mp_context = mp.get_context()
         self._mp_context = mp_context
 
+        if initializer is not None and not callable(initializer):
+            raise TypeError("initializer must be a callable")
+        self._initializer = initializer
+        self._initargs = initargs
+
         # Make the call queue slightly larger than the number of processes to
         # prevent the worker processes from idling. But don't make it too big
         # because futures in the call queue cannot be cancelled.
@@ -450,15 +471,16 @@
             p = self._mp_context.Process(
                 target=_process_worker,
                 args=(self._call_queue,
-                      self._result_queue))
+                      self._result_queue,
+                      self._initializer,
+                      self._initargs))
             p.start()
             self._processes[p.pid] = p
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
             if self._broken:
-                raise BrokenProcessPool('A child process terminated '
-                    'abruptly, the process pool is not usable anymore')
+                raise BrokenProcessPool(self._broken)
             if self._shutdown_thread:
                 raise RuntimeError('cannot schedule new futures after shutdown')