bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9e2ab9d..fd9f572 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -435,6 +435,24 @@
                 # is not gc-ed yet.
                 if executor is not None:
                     executor._shutdown_thread = True
+                    # Unless there are pending work items, we have nothing to cancel.
+                    if pending_work_items and executor._cancel_pending_futures:
+                        # Cancel all pending futures and update pending_work_items
+                        # to only have futures that are currently running.
+                        new_pending_work_items = {}
+                        for work_id, work_item in pending_work_items.items():
+                            if not work_item.future.cancel():
+                                new_pending_work_items[work_id] = work_item
+
+                        pending_work_items = new_pending_work_items
+                        # Drain work_ids_queue since we no longer need to
+                        # add items to the call queue.
+                        while True:
+                            try:
+                                work_ids_queue.get_nowait()
+                            except queue.Empty:
+                                break
+
                 # Since no new work items can be added, it is safe to shutdown
                 # this thread if there are no pending work items.
                 if not pending_work_items:
@@ -546,6 +564,7 @@
         self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
+        self._cancel_pending_futures = False
 
         # Create communication channels for the executor
         # Make the call queue slightly larger than the number of processes to
@@ -660,9 +679,11 @@
                               timeout=timeout)
         return _chain_from_iterable_of_lists(results)
 
-    def shutdown(self, wait=True):
+    def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
+            self._cancel_pending_futures = cancel_futures
             self._shutdown_thread = True
+
         if self._queue_management_thread:
             # Wake up queue management thread
             self._queue_management_thread_wakeup.wakeup()
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index b89f8f2..be79161 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -215,9 +215,22 @@
                 if work_item is not None:
                     work_item.future.set_exception(BrokenThreadPool(self._broken))
 
-    def shutdown(self, wait=True):
+    def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
             self._shutdown = True
+            if cancel_futures:
+                # Drain all work items from the queue, and then cancel their
+                # associated futures.
+                while True:
+                    try:
+                        work_item = self._work_queue.get_nowait()
+                    except queue.Empty:
+                        break
+                    if work_item is not None:
+                        work_item.future.cancel()
+
+            # Send a wake-up to prevent threads calling
+            # _work_queue.get(block=True) from permanently blocking.
             self._work_queue.put(None)
         if wait:
             for t in self._threads: