bpo-39529: Deprecate creating new event loop in asyncio.get_event_loop() (GH-23554)

asyncio.get_event_loop() emits now a deprecation warning when it creates a new event loop.
In future releases it will became an alias of asyncio.get_running_loop().
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 1a20f36..b966ad2 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -759,9 +759,16 @@ def get_event_loop():
     the result of `get_event_loop_policy().get_event_loop()` call.
     """
     # NOTE: this function is implemented in C (see _asynciomodule.c)
+    return _py__get_event_loop()
+
+
+def _get_event_loop(stacklevel=3):
     current_loop = _get_running_loop()
     if current_loop is not None:
         return current_loop
+    import warnings
+    warnings.warn('There is no current event loop',
+                  DeprecationWarning, stacklevel=stacklevel)
     return get_event_loop_policy().get_event_loop()
 
 
@@ -791,6 +798,7 @@ def set_child_watcher(watcher):
 _py__set_running_loop = _set_running_loop
 _py_get_running_loop = get_running_loop
 _py_get_event_loop = get_event_loop
+_py__get_event_loop = _get_event_loop
 
 
 try:
@@ -798,7 +806,7 @@ def set_child_watcher(watcher):
     # functions in asyncio.  Pure Python implementation is
     # about 4 times slower than C-accelerated.
     from _asyncio import (_get_running_loop, _set_running_loop,
-                          get_running_loop, get_event_loop)
+                          get_running_loop, get_event_loop, _get_event_loop)
 except ImportError:
     pass
 else:
@@ -807,3 +815,4 @@ def set_child_watcher(watcher):
     _c__set_running_loop = _set_running_loop
     _c_get_running_loop = get_running_loop
     _c_get_event_loop = get_event_loop
+    _c__get_event_loop = _get_event_loop
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index 2d22ef6..10f8f05 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -76,7 +76,7 @@ def __init__(self, *, loop=None):
         the default event loop.
         """
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop()
         else:
             self._loop = loop
         self._callbacks = []
@@ -408,7 +408,7 @@ def wrap_future(future, *, loop=None):
     assert isinstance(future, concurrent.futures.Future), \
         f'concurrent.futures.Future is expected, got {future!r}'
     if loop is None:
-        loop = events.get_event_loop()
+        loop = events._get_event_loop()
     new_future = loop.create_future()
     _chain_future(future, new_future)
     return new_future
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 96a9f97..080d8a6 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -125,7 +125,7 @@ class FlowControlMixin(protocols.Protocol):
 
     def __init__(self, loop=None):
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop(stacklevel=4)
         else:
             self._loop = loop
         self._paused = False
@@ -283,9 +283,13 @@ def _get_close_waiter(self, stream):
     def __del__(self):
         # Prevent reports about unhandled exceptions.
         # Better than self._closed._log_traceback = False hack
-        closed = self._closed
-        if closed.done() and not closed.cancelled():
-            closed.exception()
+        try:
+            closed = self._closed
+        except AttributeError:
+            pass  # failed constructor
+        else:
+            if closed.done() and not closed.cancelled():
+                closed.exception()
 
 
 class StreamWriter:
@@ -381,7 +385,7 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
 
         self._limit = limit
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop()
         else:
             self._loop = loop
         self._buffer = bytearray()
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 52f1e66..9a9d0d6 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -549,7 +549,7 @@ def as_completed(fs, *, timeout=None):
     from .queues import Queue  # Import here to avoid circular import problem.
     done = Queue()
 
-    loop = events.get_event_loop()
+    loop = events._get_event_loop()
     todo = {ensure_future(f, loop=loop) for f in set(fs)}
     timeout_handle = None
 
@@ -616,23 +616,26 @@ def ensure_future(coro_or_future, *, loop=None):
 
     If the argument is a Future, it is returned directly.
     """
-    if coroutines.iscoroutine(coro_or_future):
-        if loop is None:
-            loop = events.get_event_loop()
-        task = loop.create_task(coro_or_future)
-        if task._source_traceback:
-            del task._source_traceback[-1]
-        return task
-    elif futures.isfuture(coro_or_future):
+    return _ensure_future(coro_or_future, loop=loop)
+
+
+def _ensure_future(coro_or_future, *, loop=None):
+    if futures.isfuture(coro_or_future):
         if loop is not None and loop is not futures._get_loop(coro_or_future):
             raise ValueError('The future belongs to a different loop than '
-                             'the one specified as the loop argument')
+                            'the one specified as the loop argument')
         return coro_or_future
-    elif inspect.isawaitable(coro_or_future):
-        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
-    else:
-        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
-                        'required')
+
+    if not coroutines.iscoroutine(coro_or_future):
+        if inspect.isawaitable(coro_or_future):
+            coro_or_future = _wrap_awaitable(coro_or_future)
+        else:
+            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
+                            'is required')
+
+    if loop is None:
+        loop = events._get_event_loop(stacklevel=4)
+    return loop.create_task(coro_or_future)
 
 
 @types.coroutine
@@ -655,7 +658,8 @@ class _GatheringFuture(futures.Future):
     cancelled.
     """
 
-    def __init__(self, children, *, loop=None):
+    def __init__(self, children, *, loop):
+        assert loop is not None
         super().__init__(loop=loop)
         self._children = children
         self._cancel_requested = False
@@ -706,7 +710,7 @@ def gather(*coros_or_futures, return_exceptions=False):
     gather won't cancel any other awaitables.
     """
     if not coros_or_futures:
-        loop = events.get_event_loop()
+        loop = events._get_event_loop()
         outer = loop.create_future()
         outer.set_result([])
         return outer
@@ -773,7 +777,7 @@ def _done_callback(fut):
     loop = None
     for arg in coros_or_futures:
         if arg not in arg_to_fut:
-            fut = ensure_future(arg, loop=loop)
+            fut = _ensure_future(arg, loop=loop)
             if loop is None:
                 loop = futures._get_loop(fut)
             if fut is not arg:
@@ -823,7 +827,7 @@ def shield(arg):
         except CancelledError:
             res = None
     """
-    inner = ensure_future(arg)
+    inner = _ensure_future(arg)
     if inner.done():
         # Shortcut.
         return inner
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 5511407..55fc266 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -2702,14 +2702,18 @@ def get_event_loop(self):
             asyncio.set_event_loop_policy(Policy())
             loop = asyncio.new_event_loop()
 
-            with self.assertRaises(TestError):
-                asyncio.get_event_loop()
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaises(TestError):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
             asyncio.set_event_loop(None)
-            with self.assertRaises(TestError):
-                asyncio.get_event_loop()
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaises(TestError):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
 
             with self.assertRaisesRegex(RuntimeError, 'no running'):
-                self.assertIs(asyncio.get_running_loop(), None)
+                asyncio.get_running_loop()
             self.assertIs(asyncio._get_running_loop(), None)
 
             async def func():
@@ -2720,12 +2724,16 @@ async def func():
             loop.run_until_complete(func())
 
             asyncio.set_event_loop(loop)
-            with self.assertRaises(TestError):
-                asyncio.get_event_loop()
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaises(TestError):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
 
             asyncio.set_event_loop(None)
-            with self.assertRaises(TestError):
-                asyncio.get_event_loop()
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaises(TestError):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
 
         finally:
             asyncio.set_event_loop_policy(old_policy)
@@ -2733,7 +2741,56 @@ async def func():
                 loop.close()
 
         with self.assertRaisesRegex(RuntimeError, 'no running'):
-            self.assertIs(asyncio.get_running_loop(), None)
+            asyncio.get_running_loop()
+
+        self.assertIs(asyncio._get_running_loop(), None)
+
+    def test_get_event_loop_returns_running_loop2(self):
+        old_policy = asyncio.get_event_loop_policy()
+        try:
+            asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+            loop = asyncio.new_event_loop()
+            self.addCleanup(loop.close)
+
+            with self.assertWarns(DeprecationWarning) as cm:
+                loop2 = asyncio.get_event_loop()
+            self.addCleanup(loop2.close)
+            self.assertEqual(cm.warnings[0].filename, __file__)
+            asyncio.set_event_loop(None)
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaisesRegex(RuntimeError, 'no current'):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
+
+            with self.assertRaisesRegex(RuntimeError, 'no running'):
+                asyncio.get_running_loop()
+            self.assertIs(asyncio._get_running_loop(), None)
+
+            async def func():
+                self.assertIs(asyncio.get_event_loop(), loop)
+                self.assertIs(asyncio.get_running_loop(), loop)
+                self.assertIs(asyncio._get_running_loop(), loop)
+
+            loop.run_until_complete(func())
+
+            asyncio.set_event_loop(loop)
+            with self.assertWarns(DeprecationWarning) as cm:
+                self.assertIs(asyncio.get_event_loop(), loop)
+            self.assertEqual(cm.warnings[0].filename, __file__)
+
+            asyncio.set_event_loop(None)
+            with self.assertWarns(DeprecationWarning) as cm:
+                with self.assertRaisesRegex(RuntimeError, 'no current'):
+                    asyncio.get_event_loop()
+            self.assertEqual(cm.warnings[0].filename, __file__)
+
+        finally:
+            asyncio.set_event_loop_policy(old_policy)
+            if loop is not None:
+                loop.close()
+
+        with self.assertRaisesRegex(RuntimeError, 'no running'):
+            asyncio.get_running_loop()
 
         self.assertIs(asyncio._get_running_loop(), None)
 
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index ec00896..fe3d442 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -139,9 +139,26 @@ def test_initial_state(self):
         f.cancel()
         self.assertTrue(f.cancelled())
 
-    def test_init_constructor_default_loop(self):
+    def test_constructor_without_loop(self):
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                self._new_future()
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_constructor_use_running_loop(self):
+        async def test():
+            return self._new_future()
+        f = self.loop.run_until_complete(test())
+        self.assertIs(f._loop, self.loop)
+        self.assertIs(f.get_loop(), self.loop)
+
+    def test_constructor_use_global_loop(self):
+        # Deprecated in 3.10
         asyncio.set_event_loop(self.loop)
-        f = self._new_future()
+        self.addCleanup(asyncio.set_event_loop, None)
+        with self.assertWarns(DeprecationWarning) as cm:
+            f = self._new_future()
+        self.assertEqual(cm.warnings[0].filename, __file__)
         self.assertIs(f._loop, self.loop)
         self.assertIs(f.get_loop(), self.loop)
 
@@ -472,16 +489,41 @@ def test_wrap_future_future(self):
         f2 = asyncio.wrap_future(f1)
         self.assertIs(f1, f2)
 
+    def test_wrap_future_without_loop(self):
+        def run(arg):
+            return (arg, threading.get_ident())
+        ex = concurrent.futures.ThreadPoolExecutor(1)
+        f1 = ex.submit(run, 'oi')
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaises(RuntimeError):
+                asyncio.wrap_future(f1)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        ex.shutdown(wait=True)
+
+    def test_wrap_future_use_running_loop(self):
+        def run(arg):
+            return (arg, threading.get_ident())
+        ex = concurrent.futures.ThreadPoolExecutor(1)
+        f1 = ex.submit(run, 'oi')
+        async def test():
+            return asyncio.wrap_future(f1)
+        f2 = self.loop.run_until_complete(test())
+        self.assertIs(self.loop, f2._loop)
+        ex.shutdown(wait=True)
+
     def test_wrap_future_use_global_loop(self):
-        with mock.patch('asyncio.futures.events') as events:
-            events.get_event_loop = lambda: self.loop
-            def run(arg):
-                return (arg, threading.get_ident())
-            ex = concurrent.futures.ThreadPoolExecutor(1)
-            f1 = ex.submit(run, 'oi')
+        # Deprecated in 3.10
+        asyncio.set_event_loop(self.loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        def run(arg):
+            return (arg, threading.get_ident())
+        ex = concurrent.futures.ThreadPoolExecutor(1)
+        f1 = ex.submit(run, 'oi')
+        with self.assertWarns(DeprecationWarning) as cm:
             f2 = asyncio.wrap_future(f1)
-            self.assertIs(self.loop, f2._loop)
-            ex.shutdown(wait=True)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIs(self.loop, f2._loop)
+        ex.shutdown(wait=True)
 
     def test_wrap_future_cancel(self):
         f1 = concurrent.futures.Future()
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
index 0a0b529..63a9a5f 100644
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -273,12 +273,12 @@ async def create_queue():
             queue._get_loop()
             return queue
 
-        q = self.loop.run_until_complete(create_queue())
+        async def test():
+            q = await create_queue()
+            await asyncio.gather(producer(q, producer_num_items),
+                                 consumer(q, producer_num_items))
 
-        self.loop.run_until_complete(
-            asyncio.gather(producer(q, producer_num_items),
-                           consumer(q, producer_num_items)),
-            )
+        self.loop.run_until_complete(test())
 
     def test_cancelled_getters_not_being_held_in_self_getters(self):
         def a_generator():
@@ -516,11 +516,14 @@ async def getter():
             for _ in range(num):
                 item = queue.get_nowait()
 
-        t0 = putter(0)
-        t1 = putter(1)
-        t2 = putter(2)
-        t3 = putter(3)
-        self.loop.run_until_complete(asyncio.gather(getter(), t0, t1, t2, t3))
+        async def test():
+            t0 = putter(0)
+            t1 = putter(1)
+            t2 = putter(2)
+            t3 = putter(3)
+            await asyncio.gather(getter(), t0, t1, t2, t3)
+
+        self.loop.run_until_complete(test())
 
     def test_cancelled_puts_not_being_held_in_self_putters(self):
         def a_generator():
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index a075358..6eaa289 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -40,11 +40,6 @@ def tearDown(self):
         gc.collect()
         super().tearDown()
 
-    @mock.patch('asyncio.streams.events')
-    def test_ctor_global_loop(self, m_events):
-        stream = asyncio.StreamReader()
-        self.assertIs(stream._loop, m_events.get_event_loop.return_value)
-
     def _basetest_open_connection(self, open_connection_fut):
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -751,23 +746,59 @@ def test_read_all_from_pipe_reader(self):
         data = self.loop.run_until_complete(reader.read(-1))
         self.assertEqual(data, b'data')
 
-    def test_streamreader_constructor(self):
-        self.addCleanup(asyncio.set_event_loop, None)
-        asyncio.set_event_loop(self.loop)
+    def test_streamreader_constructor_without_loop(self):
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                asyncio.StreamReader()
+        self.assertEqual(cm.warnings[0].filename, __file__)
 
+    def test_streamreader_constructor_use_running_loop(self):
         # asyncio issue #184: Ensure that StreamReaderProtocol constructor
         # retrieves the current loop if the loop parameter is not set
-        reader = asyncio.StreamReader()
+        async def test():
+            return asyncio.StreamReader()
+
+        reader = self.loop.run_until_complete(test())
         self.assertIs(reader._loop, self.loop)
 
-    def test_streamreaderprotocol_constructor(self):
+    def test_streamreader_constructor_use_global_loop(self):
+        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
+        # retrieves the current loop if the loop parameter is not set
+        # Deprecated in 3.10
         self.addCleanup(asyncio.set_event_loop, None)
         asyncio.set_event_loop(self.loop)
+        with self.assertWarns(DeprecationWarning) as cm:
+            reader = asyncio.StreamReader()
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIs(reader._loop, self.loop)
 
+
+    def test_streamreaderprotocol_constructor_without_loop(self):
+        reader = mock.Mock()
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                asyncio.StreamReaderProtocol(reader)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_streamreaderprotocol_constructor_use_running_loop(self):
         # asyncio issue #184: Ensure that StreamReaderProtocol constructor
         # retrieves the current loop if the loop parameter is not set
         reader = mock.Mock()
-        protocol = asyncio.StreamReaderProtocol(reader)
+        async def test():
+            return asyncio.StreamReaderProtocol(reader)
+        protocol = self.loop.run_until_complete(test())
+        self.assertIs(protocol._loop, self.loop)
+
+    def test_streamreaderprotocol_constructor_use_global_loop(self):
+        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
+        # retrieves the current loop if the loop parameter is not set
+        # Deprecated in 3.10
+        self.addCleanup(asyncio.set_event_loop, None)
+        asyncio.set_event_loop(self.loop)
+        reader = mock.Mock()
+        with self.assertWarns(DeprecationWarning) as cm:
+            protocol = asyncio.StreamReaderProtocol(reader)
+        self.assertEqual(cm.warnings[0].filename, __file__)
         self.assertIs(protocol._loop, self.loop)
 
     def test_drain_raises(self):
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 7c2e85c..a9e4cf5 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -200,22 +200,76 @@ async def notmuch():
         loop.close()
 
     def test_ensure_future_coroutine(self):
+        async def notmuch():
+            return 'ok'
+        t = asyncio.ensure_future(notmuch(), loop=self.loop)
+        self.assertIs(t._loop, self.loop)
+        self.loop.run_until_complete(t)
+        self.assertTrue(t.done())
+        self.assertEqual(t.result(), 'ok')
+
+        a = notmuch()
+        self.addCleanup(a.close)
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                asyncio.ensure_future(a)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+        async def test():
+            return asyncio.ensure_future(notmuch())
+        t = self.loop.run_until_complete(test())
+        self.assertIs(t._loop, self.loop)
+        self.loop.run_until_complete(t)
+        self.assertTrue(t.done())
+        self.assertEqual(t.result(), 'ok')
+
+        # Deprecated in 3.10
+        asyncio.set_event_loop(self.loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        with self.assertWarns(DeprecationWarning) as cm:
+            t = asyncio.ensure_future(notmuch())
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIs(t._loop, self.loop)
+        self.loop.run_until_complete(t)
+        self.assertTrue(t.done())
+        self.assertEqual(t.result(), 'ok')
+
+    def test_ensure_future_coroutine_2(self):
         with self.assertWarns(DeprecationWarning):
             @asyncio.coroutine
             def notmuch():
                 return 'ok'
         t = asyncio.ensure_future(notmuch(), loop=self.loop)
+        self.assertIs(t._loop, self.loop)
         self.loop.run_until_complete(t)
         self.assertTrue(t.done())
         self.assertEqual(t.result(), 'ok')
-        self.assertIs(t._loop, self.loop)
 
-        loop = asyncio.new_event_loop()
-        self.set_event_loop(loop)
-        t = asyncio.ensure_future(notmuch(), loop=loop)
-        self.assertIs(t._loop, loop)
-        loop.run_until_complete(t)
-        loop.close()
+        a = notmuch()
+        self.addCleanup(a.close)
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                asyncio.ensure_future(a)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+        async def test():
+            return asyncio.ensure_future(notmuch())
+        t = self.loop.run_until_complete(test())
+        self.assertIs(t._loop, self.loop)
+        self.loop.run_until_complete(t)
+        self.assertTrue(t.done())
+        self.assertEqual(t.result(), 'ok')
+
+        # Deprecated in 3.10
+        asyncio.set_event_loop(self.loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        with self.assertWarns(DeprecationWarning) as cm:
+            t = asyncio.ensure_future(notmuch())
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIs(t._loop, self.loop)
+        self.loop.run_until_complete(t)
+        self.assertTrue(t.done())
+        self.assertEqual(t.result(), 'ok')
 
     def test_ensure_future_future(self):
         f_orig = self.new_future(self.loop)
@@ -1078,33 +1132,6 @@ async def coro():
         res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
         self.assertEqual(res, 'done')
 
-    def test_wait_for_with_global_loop(self):
-
-        def gen():
-            when = yield
-            self.assertAlmostEqual(0.2, when)
-            when = yield 0
-            self.assertAlmostEqual(0.01, when)
-            yield 0.01
-
-        loop = self.new_test_loop(gen)
-
-        async def foo():
-            await asyncio.sleep(0.2)
-            return 'done'
-
-        asyncio.set_event_loop(loop)
-        try:
-            fut = self.new_task(loop, foo())
-            with self.assertRaises(asyncio.TimeoutError):
-                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
-        finally:
-            asyncio.set_event_loop(None)
-
-        self.assertAlmostEqual(0.01, loop.time())
-        self.assertTrue(fut.done())
-        self.assertTrue(fut.cancelled())
-
     def test_wait_for_race_condition(self):
 
         def gen():
@@ -1293,32 +1320,6 @@ async def foo():
         self.assertAlmostEqual(0.15, loop.time())
         self.assertEqual(res, 42)
 
-    def test_wait_with_global_loop(self):
-
-        def gen():
-            when = yield
-            self.assertAlmostEqual(0.01, when)
-            when = yield 0
-            self.assertAlmostEqual(0.015, when)
-            yield 0.015
-
-        loop = self.new_test_loop(gen)
-
-        a = self.new_task(loop, asyncio.sleep(0.01))
-        b = self.new_task(loop, asyncio.sleep(0.015))
-
-        async def foo():
-            done, pending = await asyncio.wait([b, a])
-            self.assertEqual(done, set([a, b]))
-            self.assertEqual(pending, set())
-            return 42
-
-        asyncio.set_event_loop(loop)
-        res = loop.run_until_complete(
-            self.new_task(loop, foo()))
-
-        self.assertEqual(res, 42)
-
     def test_wait_duplicate_coroutines(self):
 
         with self.assertWarns(DeprecationWarning):
@@ -1679,22 +1680,24 @@ def gen():
             yield 0
 
         loop = self.new_test_loop(gen)
-        asyncio.set_event_loop(loop)
 
         a = asyncio.sleep(0.05, 'a')
         b = asyncio.sleep(0.10, 'b')
         fs = {a, b}
 
-        futs = list(asyncio.as_completed(fs))
-        self.assertEqual(len(futs), 2)
+        async def test():
+            futs = list(asyncio.as_completed(fs))
+            self.assertEqual(len(futs), 2)
 
-        x = loop.run_until_complete(futs[1])
-        self.assertEqual(x, 'a')
-        self.assertAlmostEqual(0.05, loop.time())
-        loop.advance_time(0.05)
-        y = loop.run_until_complete(futs[0])
-        self.assertEqual(y, 'b')
-        self.assertAlmostEqual(0.10, loop.time())
+            x = await futs[1]
+            self.assertEqual(x, 'a')
+            self.assertAlmostEqual(0.05, loop.time())
+            loop.advance_time(0.05)
+            y = await futs[0]
+            self.assertEqual(y, 'b')
+            self.assertAlmostEqual(0.10, loop.time())
+
+        loop.run_until_complete(test())
 
     def test_as_completed_concurrent(self):
 
@@ -1705,20 +1708,22 @@ def gen():
             self.assertAlmostEqual(0.05, when)
             yield 0.05
 
-        loop = self.new_test_loop(gen)
-        asyncio.set_event_loop(loop)
-
         a = asyncio.sleep(0.05, 'a')
         b = asyncio.sleep(0.05, 'b')
         fs = {a, b}
 
-        futs = list(asyncio.as_completed(fs))
-        self.assertEqual(len(futs), 2)
-        waiter = asyncio.wait(futs)
-        # Deprecation from passing coros in futs to asyncio.wait()
-        with self.assertWarns(DeprecationWarning):
-            done, pending = loop.run_until_complete(waiter)
-        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+        async def test():
+            futs = list(asyncio.as_completed(fs))
+            self.assertEqual(len(futs), 2)
+            waiter = asyncio.wait(futs)
+            # Deprecation from passing coros in futs to asyncio.wait()
+            with self.assertWarns(DeprecationWarning) as cm:
+                done, pending = await waiter
+            self.assertEqual(cm.warnings[0].filename, __file__)
+            self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+
+        loop = self.new_test_loop(gen)
+        loop.run_until_complete(test())
 
     def test_as_completed_duplicate_coroutines(self):
 
@@ -1742,6 +1747,47 @@ def runner():
         self.assertEqual(set(result), {'ham', 'spam'})
         self.assertEqual(len(result), 2)
 
+    def test_as_completed_coroutine_without_loop(self):
+        async def coro():
+            return 42
+
+        a = coro()
+        self.addCleanup(a.close)
+
+        futs = asyncio.as_completed([a])
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                list(futs)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_as_completed_coroutine_use_running_loop(self):
+        loop = self.new_test_loop()
+
+        async def coro():
+            return 42
+
+        async def test():
+            futs = list(asyncio.as_completed([coro()]))
+            self.assertEqual(len(futs), 1)
+            self.assertEqual(await futs[0], 42)
+
+        loop.run_until_complete(test())
+
+    def test_as_completed_coroutine_use_global_loop(self):
+        # Deprecated in 3.10
+        async def coro():
+            return 42
+
+        loop = self.new_test_loop()
+        asyncio.set_event_loop(loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        futs = asyncio.as_completed([coro()])
+        with self.assertWarns(DeprecationWarning) as cm:
+            futs = list(futs)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertEqual(len(futs), 1)
+        self.assertEqual(loop.run_until_complete(futs[0]), 42)
+
     def test_sleep(self):
 
         def gen():
@@ -2201,6 +2247,42 @@ def test_gather_shield(self):
         child2.set_result(2)
         test_utils.run_briefly(self.loop)
 
+    def test_shield_coroutine_without_loop(self):
+        async def coro():
+            return 42
+
+        inner = coro()
+        self.addCleanup(inner.close)
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+                asyncio.shield(inner)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_shield_coroutine_use_running_loop(self):
+        async def coro():
+            return 42
+
+        async def test():
+            return asyncio.shield(coro())
+        outer = self.loop.run_until_complete(test())
+        self.assertEqual(outer._loop, self.loop)
+        res = self.loop.run_until_complete(outer)
+        self.assertEqual(res, 42)
+
+    def test_shield_coroutine_use_global_loop(self):
+        # Deprecated in 3.10
+        async def coro():
+            return 42
+
+        asyncio.set_event_loop(self.loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        with self.assertWarns(DeprecationWarning) as cm:
+            outer = asyncio.shield(coro())
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertEqual(outer._loop, self.loop)
+        res = self.loop.run_until_complete(outer)
+        self.assertEqual(res, 42)
+
     def test_as_completed_invalid_args(self):
         fut = self.new_future(self.loop)
 
@@ -2507,16 +2589,17 @@ def test_cancel_gather_1(self):
         """Ensure that a gathering future refuses to be cancelled once all
         children are done"""
         loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(loop)
         self.addCleanup(loop.close)
 
         fut = self.new_future(loop)
-        # The indirection fut->child_coro is needed since otherwise the
-        # gathering task is done at the same time as the child future
-        def child_coro():
-            return (yield from fut)
-        gather_future = asyncio.gather(child_coro())
-        gather_task = asyncio.ensure_future(gather_future, loop=loop)
+        async def create():
+            # The indirection fut->child_coro is needed since otherwise the
+            # gathering task is done at the same time as the child future
+            def child_coro():
+                return (yield from fut)
+            gather_future = asyncio.gather(child_coro())
+            return asyncio.ensure_future(gather_future)
+        gather_task = loop.run_until_complete(create())
 
         cancel_result = None
         def cancelling_callback(_):
@@ -3222,7 +3305,7 @@ def _run_loop(self, loop):
 
     def _check_success(self, **kwargs):
         a, b, c = [self.one_loop.create_future() for i in range(3)]
-        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
+        fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
         cb = test_utils.MockCallback()
         fut.add_done_callback(cb)
         b.set_result(1)
@@ -3244,7 +3327,7 @@ def test_result_exception_success(self):
 
     def test_one_exception(self):
         a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
-        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
+        fut = self._gather(*self.wrap_futures(a, b, c, d, e))
         cb = test_utils.MockCallback()
         fut.add_done_callback(cb)
         exc = ZeroDivisionError()
@@ -3262,8 +3345,8 @@ def test_one_exception(self):
 
     def test_return_exceptions(self):
         a, b, c, d = [self.one_loop.create_future() for i in range(4)]
-        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
-                             return_exceptions=True)
+        fut = self._gather(*self.wrap_futures(a, b, c, d),
+                           return_exceptions=True)
         cb = test_utils.MockCallback()
         fut.add_done_callback(cb)
         exc = ZeroDivisionError()
@@ -3315,21 +3398,37 @@ class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
     def wrap_futures(self, *futures):
         return futures
 
-    def _check_empty_sequence(self, seq_or_iter):
-        asyncio.set_event_loop(self.one_loop)
-        self.addCleanup(asyncio.set_event_loop, None)
-        fut = asyncio.gather(*seq_or_iter)
+    def _gather(self, *args, **kwargs):
+        return asyncio.gather(*args, **kwargs)
+
+    def test_constructor_empty_sequence_without_loop(self):
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaises(RuntimeError):
+                asyncio.gather()
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_constructor_empty_sequence_use_running_loop(self):
+        async def gather():
+            return asyncio.gather()
+        fut = self.one_loop.run_until_complete(gather())
         self.assertIsInstance(fut, asyncio.Future)
         self.assertIs(fut._loop, self.one_loop)
         self._run_loop(self.one_loop)
         self.assertTrue(fut.done())
         self.assertEqual(fut.result(), [])
 
-    def test_constructor_empty_sequence(self):
-        self._check_empty_sequence([])
-        self._check_empty_sequence(())
-        self._check_empty_sequence(set())
-        self._check_empty_sequence(iter(""))
+    def test_constructor_empty_sequence_use_global_loop(self):
+        # Deprecated in 3.10
+        asyncio.set_event_loop(self.one_loop)
+        self.addCleanup(asyncio.set_event_loop, None)
+        with self.assertWarns(DeprecationWarning) as cm:
+            fut = asyncio.gather()
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIsInstance(fut, asyncio.Future)
+        self.assertIs(fut._loop, self.one_loop)
+        self._run_loop(self.one_loop)
+        self.assertTrue(fut.done())
+        self.assertEqual(fut.result(), [])
 
     def test_constructor_heterogenous_futures(self):
         fut1 = self.one_loop.create_future()
@@ -3392,10 +3491,6 @@ def test_result_exception_one_cancellation(self):
 
 class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
 
-    def setUp(self):
-        super().setUp()
-        asyncio.set_event_loop(self.one_loop)
-
     def wrap_futures(self, *futures):
         coros = []
         for fut in futures:
@@ -3404,22 +3499,47 @@ async def coro(fut=fut):
             coros.append(coro())
         return coros
 
-    def test_constructor_loop_selection(self):
+    def _gather(self, *args, **kwargs):
+        async def coro():
+            return asyncio.gather(*args, **kwargs)
+        return self.one_loop.run_until_complete(coro())
+
+    def test_constructor_without_loop(self):
+        async def coro():
+            return 'abc'
+        gen1 = coro()
+        self.addCleanup(gen1.close)
+        gen2 = coro()
+        self.addCleanup(gen2.close)
+        with self.assertWarns(DeprecationWarning) as cm:
+            with self.assertRaises(RuntimeError):
+                asyncio.gather(gen1, gen2)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+
+    def test_constructor_use_running_loop(self):
         async def coro():
             return 'abc'
         gen1 = coro()
         gen2 = coro()
-        fut = asyncio.gather(gen1, gen2)
+        async def gather():
+            return asyncio.gather(gen1, gen2)
+        fut = self.one_loop.run_until_complete(gather())
         self.assertIs(fut._loop, self.one_loop)
         self.one_loop.run_until_complete(fut)
 
-        self.set_event_loop(self.other_loop, cleanup=False)
+    def test_constructor_use_global_loop(self):
+        # Deprecated in 3.10
+        async def coro():
+            return 'abc'
         asyncio.set_event_loop(self.other_loop)
-        gen3 = coro()
-        gen4 = coro()
-        fut2 = asyncio.gather(gen3, gen4)
-        self.assertIs(fut2._loop, self.other_loop)
-        self.other_loop.run_until_complete(fut2)
+        self.addCleanup(asyncio.set_event_loop, None)
+        gen1 = coro()
+        gen2 = coro()
+        with self.assertWarns(DeprecationWarning) as cm:
+            fut = asyncio.gather(gen1, gen2)
+        self.assertEqual(cm.warnings[0].filename, __file__)
+        self.assertIs(fut._loop, self.other_loop)
+        self.other_loop.run_until_complete(fut)
 
     def test_duplicate_coroutines(self):
         with self.assertWarns(DeprecationWarning):
@@ -3427,7 +3547,7 @@ def test_duplicate_coroutines(self):
             def coro(s):
                 return s
         c = coro('abc')
-        fut = asyncio.gather(c, c, coro('def'), c)
+        fut = self._gather(c, c, coro('def'), c)
         self._run_loop(self.one_loop)
         self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])