bpo-39529: Deprecate creating new event loop in asyncio.get_event_loop() (GH-23554)

asyncio.get_event_loop() emits now a deprecation warning when it creates a new event loop.
In future releases it will became an alias of asyncio.get_running_loop().
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 1a20f36..b966ad2 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -759,9 +759,16 @@ def get_event_loop():
     the result of `get_event_loop_policy().get_event_loop()` call.
     """
     # NOTE: this function is implemented in C (see _asynciomodule.c)
+    return _py__get_event_loop()
+
+
+def _get_event_loop(stacklevel=3):
     current_loop = _get_running_loop()
     if current_loop is not None:
         return current_loop
+    import warnings
+    warnings.warn('There is no current event loop',
+                  DeprecationWarning, stacklevel=stacklevel)
     return get_event_loop_policy().get_event_loop()
 
 
@@ -791,6 +798,7 @@ def set_child_watcher(watcher):
 _py__set_running_loop = _set_running_loop
 _py_get_running_loop = get_running_loop
 _py_get_event_loop = get_event_loop
+_py__get_event_loop = _get_event_loop
 
 
 try:
@@ -798,7 +806,7 @@ def set_child_watcher(watcher):
     # functions in asyncio.  Pure Python implementation is
     # about 4 times slower than C-accelerated.
     from _asyncio import (_get_running_loop, _set_running_loop,
-                          get_running_loop, get_event_loop)
+                          get_running_loop, get_event_loop, _get_event_loop)
 except ImportError:
     pass
 else:
@@ -807,3 +815,4 @@ def set_child_watcher(watcher):
     _c__set_running_loop = _set_running_loop
     _c_get_running_loop = get_running_loop
     _c_get_event_loop = get_event_loop
+    _c__get_event_loop = _get_event_loop
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index 2d22ef6..10f8f05 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -76,7 +76,7 @@ def __init__(self, *, loop=None):
         the default event loop.
         """
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop()
         else:
             self._loop = loop
         self._callbacks = []
@@ -408,7 +408,7 @@ def wrap_future(future, *, loop=None):
     assert isinstance(future, concurrent.futures.Future), \
         f'concurrent.futures.Future is expected, got {future!r}'
     if loop is None:
-        loop = events.get_event_loop()
+        loop = events._get_event_loop()
     new_future = loop.create_future()
     _chain_future(future, new_future)
     return new_future
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 96a9f97..080d8a6 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -125,7 +125,7 @@ class FlowControlMixin(protocols.Protocol):
 
     def __init__(self, loop=None):
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop(stacklevel=4)
         else:
             self._loop = loop
         self._paused = False
@@ -283,9 +283,13 @@ def _get_close_waiter(self, stream):
     def __del__(self):
         # Prevent reports about unhandled exceptions.
         # Better than self._closed._log_traceback = False hack
-        closed = self._closed
-        if closed.done() and not closed.cancelled():
-            closed.exception()
+        try:
+            closed = self._closed
+        except AttributeError:
+            pass  # failed constructor
+        else:
+            if closed.done() and not closed.cancelled():
+                closed.exception()
 
 
 class StreamWriter:
@@ -381,7 +385,7 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
 
         self._limit = limit
         if loop is None:
-            self._loop = events.get_event_loop()
+            self._loop = events._get_event_loop()
         else:
             self._loop = loop
         self._buffer = bytearray()
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 52f1e66..9a9d0d6 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -549,7 +549,7 @@ def as_completed(fs, *, timeout=None):
     from .queues import Queue  # Import here to avoid circular import problem.
     done = Queue()
 
-    loop = events.get_event_loop()
+    loop = events._get_event_loop()
     todo = {ensure_future(f, loop=loop) for f in set(fs)}
     timeout_handle = None
 
@@ -616,23 +616,26 @@ def ensure_future(coro_or_future, *, loop=None):
 
     If the argument is a Future, it is returned directly.
     """
-    if coroutines.iscoroutine(coro_or_future):
-        if loop is None:
-            loop = events.get_event_loop()
-        task = loop.create_task(coro_or_future)
-        if task._source_traceback:
-            del task._source_traceback[-1]
-        return task
-    elif futures.isfuture(coro_or_future):
+    return _ensure_future(coro_or_future, loop=loop)
+
+
+def _ensure_future(coro_or_future, *, loop=None):
+    if futures.isfuture(coro_or_future):
         if loop is not None and loop is not futures._get_loop(coro_or_future):
             raise ValueError('The future belongs to a different loop than '
-                             'the one specified as the loop argument')
+                            'the one specified as the loop argument')
         return coro_or_future
-    elif inspect.isawaitable(coro_or_future):
-        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
-    else:
-        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
-                        'required')
+
+    if not coroutines.iscoroutine(coro_or_future):
+        if inspect.isawaitable(coro_or_future):
+            coro_or_future = _wrap_awaitable(coro_or_future)
+        else:
+            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
+                            'is required')
+
+    if loop is None:
+        loop = events._get_event_loop(stacklevel=4)
+    return loop.create_task(coro_or_future)
 
 
 @types.coroutine
@@ -655,7 +658,8 @@ class _GatheringFuture(futures.Future):
     cancelled.
     """
 
-    def __init__(self, children, *, loop=None):
+    def __init__(self, children, *, loop):
+        assert loop is not None
         super().__init__(loop=loop)
         self._children = children
         self._cancel_requested = False
@@ -706,7 +710,7 @@ def gather(*coros_or_futures, return_exceptions=False):
     gather won't cancel any other awaitables.
     """
     if not coros_or_futures:
-        loop = events.get_event_loop()
+        loop = events._get_event_loop()
         outer = loop.create_future()
         outer.set_result([])
         return outer
@@ -773,7 +777,7 @@ def _done_callback(fut):
     loop = None
     for arg in coros_or_futures:
         if arg not in arg_to_fut:
-            fut = ensure_future(arg, loop=loop)
+            fut = _ensure_future(arg, loop=loop)
             if loop is None:
                 loop = futures._get_loop(fut)
             if fut is not arg:
@@ -823,7 +827,7 @@ def shield(arg):
         except CancelledError:
             res = None
     """
-    inner = ensure_future(arg)
+    inner = _ensure_future(arg)
     if inner.done():
         # Shortcut.
         return inner