Issue #25233: Rewrite the guts of Queue to be more understandable and correct.
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index 021043d..e3a1d5e 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -47,7 +47,7 @@
 
         # Futures.
         self._getters = collections.deque()
-        # Futures
+        # Futures.
         self._putters = collections.deque()
         self._unfinished_tasks = 0
         self._finished = locks.Event(loop=self._loop)
@@ -67,10 +67,13 @@
 
     # End of the overridable methods.
 
-    def __put_internal(self, item):
-        self._put(item)
-        self._unfinished_tasks += 1
-        self._finished.clear()
+    def _wakeup_next(self, waiters):
+        # Wake up the next waiter (if any) that isn't cancelled.
+        while waiters:
+            waiter = waiters.popleft()
+            if not waiter.done():
+                waiter.set_result(None)
+                break
 
     def __repr__(self):
         return '<{} at {:#x} {}>'.format(
@@ -91,16 +94,6 @@
             result += ' tasks={}'.format(self._unfinished_tasks)
         return result
 
-    def _consume_done_getters(self):
-        # Delete waiters at the head of the get() queue who've timed out.
-        while self._getters and self._getters[0].done():
-            self._getters.popleft()
-
-    def _consume_done_putters(self):
-        # Delete waiters at the head of the put() queue who've timed out.
-        while self._putters and self._putters[0].done():
-            self._putters.popleft()
-
     def qsize(self):
         """Number of items in the queue."""
         return len(self._queue)
@@ -134,47 +127,31 @@
 
         This method is a coroutine.
         """
-        self._consume_done_getters()
-        if self._getters:
-            assert not self._queue, (
-                'queue non-empty, why are getters waiting?')
-
-            getter = self._getters.popleft()
-            self.__put_internal(item)
-
-            # getter cannot be cancelled, we just removed done getters
-            getter.set_result(self._get())
-
-        elif self._maxsize > 0 and self._maxsize <= self.qsize():
-            waiter = futures.Future(loop=self._loop)
-
-            self._putters.append(waiter)
-            yield from waiter
-            self._put(item)
-
-        else:
-            self.__put_internal(item)
+        while self.full():
+            putter = futures.Future(loop=self._loop)
+            self._putters.append(putter)
+            try:
+                yield from putter
+            except:
+                putter.cancel()  # Just in case putter is not done yet.
+                if not self.full() and not putter.cancelled():
+                    # We were woken up by get_nowait(), but can't take
+                    # the call.  Wake up the next in line.
+                    self._wakeup_next(self._putters)
+                raise
+        return self.put_nowait(item)
 
     def put_nowait(self, item):
         """Put an item into the queue without blocking.
 
         If no free slot is immediately available, raise QueueFull.
         """
-        self._consume_done_getters()
-        if self._getters:
-            assert not self._queue, (
-                'queue non-empty, why are getters waiting?')
-
-            getter = self._getters.popleft()
-            self.__put_internal(item)
-
-            # getter cannot be cancelled, we just removed done getters
-            getter.set_result(self._get())
-
-        elif self._maxsize > 0 and self._maxsize <= self.qsize():
+        if self.full():
             raise QueueFull
-        else:
-            self.__put_internal(item)
+        self._put(item)
+        self._unfinished_tasks += 1
+        self._finished.clear()
+        self._wakeup_next(self._getters)
 
     @coroutine
     def get(self):
@@ -184,77 +161,30 @@
 
         This method is a coroutine.
         """
-        self._consume_done_putters()
-        if self._putters:
-            assert self.full(), 'queue not full, why are putters waiting?'
-            putter = self._putters.popleft()
-
-            # When a getter runs and frees up a slot so this putter can
-            # run, we need to defer the put for a tick to ensure that
-            # getters and putters alternate perfectly. See
-            # ChannelTest.test_wait.
-            self._loop.call_soon(putter._set_result_unless_cancelled, None)
-
-            return self._get()
-
-        elif self.qsize():
-            return self._get()
-        else:
-            waiter = futures.Future(loop=self._loop)
-            self._getters.append(waiter)
+        while self.empty():
+            getter = futures.Future(loop=self._loop)
+            self._getters.append(getter)
             try:
-                return (yield from waiter)
-            except futures.CancelledError:
-                # if we get CancelledError, it means someone cancelled this
-                # get() coroutine.  But there is a chance that the waiter
-                # already is ready and contains an item that has just been
-                # removed from the queue.  In this case, we need to put the item
-                # back into the front of the queue.  This get() must either
-                # succeed without fault or, if it gets cancelled, it must be as
-                # if it never happened.
-                if waiter.done():
-                    self._put_it_back(waiter.result())
+                yield from getter
+            except:
+                getter.cancel()  # Just in case getter is not done yet.
+                if not self.empty() and not getter.cancelled():
+                    # We were woken up by put_nowait(), but can't take
+                    # the call.  Wake up the next in line.
+                    self._wakeup_next(self._getters)
                 raise
-
-    def _put_it_back(self, item):
-        """
-        This is called when we have a waiter to get() an item and this waiter
-        gets cancelled.  In this case, we put the item back: wake up another
-        waiter or put it in the _queue.
-        """
-        self._consume_done_getters()
-        if self._getters:
-            assert not self._queue, (
-                'queue non-empty, why are getters waiting?')
-
-            getter = self._getters.popleft()
-            self.__put_internal(item)
-
-            # getter cannot be cancelled, we just removed done getters
-            getter.set_result(item)
-        else:
-            self._queue.appendleft(item)
+        return self.get_nowait()
 
     def get_nowait(self):
         """Remove and return an item from the queue.
 
         Return an item if one is immediately available, else raise QueueEmpty.
         """
-        self._consume_done_putters()
-        if self._putters:
-            assert self.full(), 'queue not full, why are putters waiting?'
-            putter = self._putters.popleft()
-            # Wake putter on next tick.
-
-            # getter cannot be cancelled, we just removed done putters
-            putter.set_result(None)
-
-            return self._get()
-
-        elif self.qsize():
-            return self._get()
-        else:
+        if self.empty():
             raise QueueEmpty
+        item = self._get()
+        self._wakeup_next(self._putters)
+        return item
 
     def task_done(self):
         """Indicate that a formerly enqueued task is complete.