bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH-15735)

diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 14b80bd..0310712 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -406,6 +406,8 @@
         self._asyncgens = weakref.WeakSet()
         # Set to True when `loop.shutdown_asyncgens` is called.
         self._asyncgens_shutdown_called = False
+        # Set to True when `loop.shutdown_default_executor` is called.
+        self._executor_shutdown_called = False
 
     def __repr__(self):
         return (
@@ -503,6 +505,10 @@
         if self._closed:
             raise RuntimeError('Event loop is closed')
 
+    def _check_default_executor(self):
+        if self._executor_shutdown_called:
+            raise RuntimeError('Executor shutdown has been called')
+
     def _asyncgen_finalizer_hook(self, agen):
         self._asyncgens.discard(agen)
         if not self.is_closed():
@@ -543,6 +549,26 @@
                     'asyncgen': agen
                 })
 
+    async def shutdown_default_executor(self):
+        """Schedule the shutdown of the default executor."""
+        self._executor_shutdown_called = True
+        if self._default_executor is None:
+            return
+        future = self.create_future()
+        thread = threading.Thread(target=self._do_shutdown, args=(future,))
+        thread.start()
+        try:
+            await future
+        finally:
+            thread.join()
+
+    def _do_shutdown(self, future):
+        try:
+            self._default_executor.shutdown(wait=True)
+            self.call_soon_threadsafe(future.set_result, None)
+        except Exception as ex:
+            self.call_soon_threadsafe(future.set_exception, ex)
+
     def run_forever(self):
         """Run until stop() is called."""
         self._check_closed()
@@ -632,6 +658,7 @@
         self._closed = True
         self._ready.clear()
         self._scheduled.clear()
+        self._executor_shutdown_called = True
         executor = self._default_executor
         if executor is not None:
             self._default_executor = None
@@ -768,6 +795,8 @@
             self._check_callback(func, 'run_in_executor')
         if executor is None:
             executor = self._default_executor
+            # Only check when the default executor is being used
+            self._check_default_executor()
             if executor is None:
                 executor = concurrent.futures.ThreadPoolExecutor()
                 self._default_executor = executor
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 5fb5464..2f06c4a 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -249,6 +249,10 @@
         """Shutdown all active asynchronous generators."""
         raise NotImplementedError
 
+    async def shutdown_default_executor(self):
+        """Schedule the shutdown of the default executor."""
+        raise NotImplementedError
+
     # Methods scheduling callbacks.  All these return Handles.
 
     def _timer_handle_cancelled(self, handle):
diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py
index 5fbab03..6c87747 100644
--- a/Lib/asyncio/runners.py
+++ b/Lib/asyncio/runners.py
@@ -45,6 +45,7 @@
         try:
             _cancel_all_tasks(loop)
             loop.run_until_complete(loop.shutdown_asyncgens())
+            loop.run_until_complete(loop.shutdown_default_executor())
         finally:
             events.set_event_loop(None)
             loop.close()