bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)

* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor

* Fix docstring
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 0b5d537..2e7100b 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -41,6 +41,7 @@
 
 atexit.register(_python_exit)
 
+
 class _WorkItem(object):
     def __init__(self, future, fn, args, kwargs):
         self.future = future
@@ -61,7 +62,17 @@
         else:
             self.future.set_result(result)
 
-def _worker(executor_reference, work_queue):
+
+def _worker(executor_reference, work_queue, initializer, initargs):
+    if initializer is not None:
+        try:
+            initializer(*initargs)
+        except BaseException:
+            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+            executor = executor_reference()
+            if executor is not None:
+                executor._initializer_failed()
+            return
     try:
         while True:
             work_item = work_queue.get(block=True)
@@ -83,18 +94,28 @@
     except BaseException:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
 
+
+class BrokenThreadPool(_base.BrokenExecutor):
+    """
+    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
+    """
+
+
 class ThreadPoolExecutor(_base.Executor):
 
     # Used to assign unique thread names when thread_name_prefix is not supplied.
     _counter = itertools.count().__next__
 
-    def __init__(self, max_workers=None, thread_name_prefix=''):
+    def __init__(self, max_workers=None, thread_name_prefix='',
+                 initializer=None, initargs=()):
         """Initializes a new ThreadPoolExecutor instance.
 
         Args:
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
             thread_name_prefix: An optional name prefix to give our threads.
+            initializer: An callable used to initialize worker threads.
+            initargs: A tuple of arguments to pass to the initializer.
         """
         if max_workers is None:
             # Use this number because ThreadPoolExecutor is often
@@ -103,16 +124,25 @@
         if max_workers <= 0:
             raise ValueError("max_workers must be greater than 0")
 
+        if initializer is not None and not callable(initializer):
+            raise TypeError("initializer must be a callable")
+
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
+        self._broken = False
         self._shutdown = False
         self._shutdown_lock = threading.Lock()
         self._thread_name_prefix = (thread_name_prefix or
                                     ("ThreadPoolExecutor-%d" % self._counter()))
+        self._initializer = initializer
+        self._initargs = initargs
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
+            if self._broken:
+                raise BrokenThreadPool(self._broken)
+
             if self._shutdown:
                 raise RuntimeError('cannot schedule new futures after shutdown')
 
@@ -137,12 +167,27 @@
                                      num_threads)
             t = threading.Thread(name=thread_name, target=_worker,
                                  args=(weakref.ref(self, weakref_cb),
-                                       self._work_queue))
+                                       self._work_queue,
+                                       self._initializer,
+                                       self._initargs))
             t.daemon = True
             t.start()
             self._threads.add(t)
             _threads_queues[t] = self._work_queue
 
+    def _initializer_failed(self):
+        with self._shutdown_lock:
+            self._broken = ('A thread initializer failed, the thread pool '
+                            'is not usable anymore')
+            # Drain work queue and mark pending futures failed
+            while True:
+                try:
+                    work_item = self._work_queue.get_nowait()
+                except queue.Empty:
+                    break
+                if work_item is not None:
+                    work_item.future.set_exception(BrokenThreadPool(self._broken))
+
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True