Issue #11635: Don't use polling in worker threads and processes launched by
concurrent.futures.
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 15736da..93b495f 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -25,28 +25,17 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads finish.
 
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
 
 def _python_exit():
     global _shutdown
     _shutdown = True
-    for thread_reference in _thread_references:
-        thread = thread_reference()
-        if thread is not None:
-            thread.join()
-
-def _remove_dead_thread_references():
-    """Remove inactive threads from _thread_references.
-
-    Should be called periodically to prevent memory leaks in scenarios such as:
-    >>> while True:
-    ...    t = ThreadPoolExecutor(max_workers=5)
-    ...    t.map(int, ['1', '2', '3', '4', '5'])
-    """
-    for thread_reference in set(_thread_references):
-        if thread_reference() is None:
-            _thread_references.discard(thread_reference)
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
 
 atexit.register(_python_exit)
 
@@ -72,18 +61,23 @@
     try:
         while True:
             try:
-                work_item = work_queue.get(block=True, timeout=0.1)
+                work_item = work_queue.get(block=True)
             except queue.Empty:
-                executor = executor_reference()
-                # Exit if:
-                #   - The interpreter is shutting down OR
-                #   - The executor that owns the worker has been collected OR
-                #   - The executor that owns the worker has been shutdown.
-                if _shutdown or executor is None or executor._shutdown:
-                    return
-                del executor
+                pass
             else:
-                work_item.run()
+                if work_item is not None:
+                    work_item.run()
+                    continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The interpreter is shutting down OR
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if _shutdown or executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
     except BaseException as e:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
 
@@ -95,8 +89,6 @@
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
         """
-        _remove_dead_thread_references()
-
         self._max_workers = max_workers
         self._work_queue = queue.Queue()
         self._threads = set()
@@ -117,19 +109,25 @@
     submit.__doc__ = _base.Executor.submit.__doc__
 
     def _adjust_thread_count(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
         # TODO(bquinlan): Should avoid creating new threads if there are more
         # idle threads than items in the work queue.
         if len(self._threads) < self._max_workers:
             t = threading.Thread(target=_worker,
-                                 args=(weakref.ref(self), self._work_queue))
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
             t.daemon = True
             t.start()
             self._threads.add(t)
-            _thread_references.add(weakref.ref(t))
+            _threads_queues[t] = self._work_queue
 
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True
+            self._work_queue.put(None)
         if wait:
             for t in self._threads:
                 t.join()