bpo-42392: Remove loop parameter form asyncio locks and Queue (#23420)

Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index f1ce732..6f322c2 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -3,10 +3,9 @@
 __all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
 
 import collections
-import warnings
 
-from . import events
 from . import exceptions
+from . import mixins
 
 
 class _ContextManagerMixin:
@@ -20,7 +19,7 @@ async def __aexit__(self, exc_type, exc, tb):
         self.release()
 
 
-class Lock(_ContextManagerMixin):
+class Lock(_ContextManagerMixin, mixins._LoopBoundedMixin):
     """Primitive lock objects.
 
     A primitive lock is a synchronization primitive that is not owned
@@ -74,16 +73,9 @@ class Lock(_ContextManagerMixin):
 
     """
 
-    def __init__(self, *, loop=None):
+    def __init__(self):
         self._waiters = None
         self._locked = False
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
 
     def __repr__(self):
         res = super().__repr__()
@@ -109,7 +101,7 @@ async def acquire(self):
 
         if self._waiters is None:
             self._waiters = collections.deque()
-        fut = self._loop.create_future()
+        fut = self._get_loop().create_future()
         self._waiters.append(fut)
 
         # Finally block should be called before the CancelledError
@@ -161,7 +153,7 @@ def _wake_up_first(self):
             fut.set_result(True)
 
 
-class Event:
+class Event(mixins._LoopBoundedMixin):
     """Asynchronous equivalent to threading.Event.
 
     Class implementing event objects. An event manages a flag that can be set
@@ -170,16 +162,9 @@ class Event:
     false.
     """
 
-    def __init__(self, *, loop=None):
+    def __init__(self):
         self._waiters = collections.deque()
         self._value = False
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
 
     def __repr__(self):
         res = super().__repr__()
@@ -220,7 +205,7 @@ async def wait(self):
         if self._value:
             return True
 
-        fut = self._loop.create_future()
+        fut = self._get_loop().create_future()
         self._waiters.append(fut)
         try:
             await fut
@@ -229,7 +214,7 @@ async def wait(self):
             self._waiters.remove(fut)
 
 
-class Condition(_ContextManagerMixin):
+class Condition(_ContextManagerMixin, mixins._LoopBoundedMixin):
     """Asynchronous equivalent to threading.Condition.
 
     This class implements condition variable objects. A condition variable
@@ -239,18 +224,10 @@ class Condition(_ContextManagerMixin):
     A new Lock object is created and used as the underlying lock.
     """
 
-    def __init__(self, lock=None, *, loop=None):
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
-
+    def __init__(self, lock=None):
         if lock is None:
-            lock = Lock(loop=loop)
-        elif lock._loop is not self._loop:
+            lock = Lock()
+        elif lock._loop is not self._get_loop():
             raise ValueError("loop argument must agree with lock")
 
         self._lock = lock
@@ -284,7 +261,7 @@ async def wait(self):
 
         self.release()
         try:
-            fut = self._loop.create_future()
+            fut = self._get_loop().create_future()
             self._waiters.append(fut)
             try:
                 await fut
@@ -351,7 +328,7 @@ def notify_all(self):
         self.notify(len(self._waiters))
 
 
-class Semaphore(_ContextManagerMixin):
+class Semaphore(_ContextManagerMixin, mixins._LoopBoundedMixin):
     """A Semaphore implementation.
 
     A semaphore manages an internal counter which is decremented by each
@@ -366,18 +343,11 @@ class Semaphore(_ContextManagerMixin):
     ValueError is raised.
     """
 
-    def __init__(self, value=1, *, loop=None):
+    def __init__(self, value=1):
         if value < 0:
             raise ValueError("Semaphore initial value must be >= 0")
         self._value = value
         self._waiters = collections.deque()
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
 
     def __repr__(self):
         res = super().__repr__()
@@ -407,7 +377,7 @@ async def acquire(self):
         True.
         """
         while self._value <= 0:
-            fut = self._loop.create_future()
+            fut = self._get_loop().create_future()
             self._waiters.append(fut)
             try:
                 await fut
@@ -436,14 +406,9 @@ class BoundedSemaphore(Semaphore):
     above the initial value.
     """
 
-    def __init__(self, value=1, *, loop=None):
-        if loop:
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
-
+    def __init__(self, value=1):
         self._bound_value = value
-        super().__init__(value, loop=loop)
+        super().__init__(value)
 
     def release(self):
         if self._value >= self._bound_value:
diff --git a/Lib/asyncio/mixins.py b/Lib/asyncio/mixins.py
new file mode 100644
index 0000000..dbc4b5f
--- /dev/null
+++ b/Lib/asyncio/mixins.py
@@ -0,0 +1,21 @@
+"""Event loop mixins."""
+
+import threading
+from . import events
+
+_global_lock = threading.Lock()
+
+
+class _LoopBoundedMixin:
+    _loop = None
+
+    def _get_loop(self):
+        loop = events._get_running_loop()
+
+        if self._loop is None:
+            with _global_lock:
+                if self._loop is None:
+                    self._loop = loop
+        if loop is not self._loop:
+            raise RuntimeError(f'{type(self).__name__} have already bounded to another loop')
+        return loop
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index cd3f7c6..78ae9e9 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -2,10 +2,9 @@
 
 import collections
 import heapq
-import warnings
 
-from . import events
 from . import locks
+from . import mixins
 
 
 class QueueEmpty(Exception):
@@ -18,7 +17,7 @@ class QueueFull(Exception):
     pass
 
 
-class Queue:
+class Queue(mixins._LoopBoundedMixin):
     """A queue, useful for coordinating producer and consumer coroutines.
 
     If maxsize is less than or equal to zero, the queue size is infinite. If it
@@ -30,14 +29,7 @@ class Queue:
     interrupted between calling qsize() and doing an operation on the Queue.
     """
 
-    def __init__(self, maxsize=0, *, loop=None):
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-            warnings.warn("The loop argument is deprecated since Python 3.8, "
-                          "and scheduled for removal in Python 3.10.",
-                          DeprecationWarning, stacklevel=2)
+    def __init__(self, maxsize=0):
         self._maxsize = maxsize
 
         # Futures.
@@ -45,7 +37,7 @@ def __init__(self, maxsize=0, *, loop=None):
         # Futures.
         self._putters = collections.deque()
         self._unfinished_tasks = 0
-        self._finished = locks.Event(loop=loop)
+        self._finished = locks.Event()
         self._finished.set()
         self._init(maxsize)
 
@@ -122,7 +114,7 @@ async def put(self, item):
         slot is available before adding item.
         """
         while self.full():
-            putter = self._loop.create_future()
+            putter = self._get_loop().create_future()
             self._putters.append(putter)
             try:
                 await putter
@@ -160,7 +152,7 @@ async def get(self):
         If queue is empty, wait until an item is available.
         """
         while self.empty():
-            getter = self._loop.create_future()
+            getter = self._get_loop().create_future()
             self._getters.append(getter)
             try:
                 await getter
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index f486b67..03d8451 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -578,7 +578,7 @@ def as_completed(fs, *, loop=None, timeout=None):
         raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
 
     from .queues import Queue  # Import here to avoid circular import problem.
-    done = Queue(loop=loop)
+    done = Queue()
 
     if loop is None:
         loop = events.get_event_loop()