bpo-34037: test_asyncio uses shutdown_default_executor() (GH-16284)

diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index ccdd3ba..66191d9 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -216,6 +216,9 @@
                 raise NotImplementedError(
                     'cannot submit into a dummy executor')
 
+        self.loop._process_events = mock.Mock()
+        self.loop._write_to_self = mock.Mock()
+
         executor = DummyExecutor()
         self.loop.set_default_executor(executor)
         self.assertIs(executor, self.loop._default_executor)
@@ -226,6 +229,9 @@
         with self.assertWarns(DeprecationWarning):
             self.loop.set_default_executor(executor)
 
+        # Avoid cleaning up the executor mock
+        self.loop._default_executor = None
+
     def test_call_soon(self):
         def cb():
             pass
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 6e832ea..576714f 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -3232,6 +3232,8 @@
         self.loop.set_exception_handler(callback)
 
         # Set corrupted task factory
+        self.addCleanup(self.loop.set_task_factory,
+                        self.loop.get_task_factory())
         self.loop.set_task_factory(task_factory)
 
         # Run event loop
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 5b4bb12..ea608f4 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -509,9 +509,11 @@
 class TestCase(unittest.TestCase):
     @staticmethod
     def close_loop(loop):
-        executor = loop._default_executor
-        if executor is not None:
-            executor.shutdown(wait=True)
+        if loop._default_executor is not None:
+            if not loop.is_closed():
+                loop.run_until_complete(loop.shutdown_default_executor())
+            else:
+                loop._default_executor.shutdown(wait=True)
         loop.close()
         policy = support.maybe_get_event_loop_policy()
         if policy is not None: