bpo-32574: Fix leaks in asyncio.Queue.put() and .get() (#5208)

diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index 512ea60..2a8d3e7 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -121,6 +121,13 @@
                 await putter
             except:
                 putter.cancel()  # Just in case putter is not done yet.
+                try:
+                    # Clean self._putters from canceled putters.
+                    self._putters.remove(putter)
+                except ValueError:
+                    # The putter could be removed from self._putters by a
+                    # previous get_nowait call.
+                    pass
                 if not self.full() and not putter.cancelled():
                     # We were woken up by get_nowait(), but can't take
                     # the call.  Wake up the next in line.
@@ -152,12 +159,13 @@
                 await getter
             except:
                 getter.cancel()  # Just in case getter is not done yet.
-
                 try:
+                    # Clean self._getters from canceled getters.
                     self._getters.remove(getter)
                 except ValueError:
+                    # The getter could be removed from self._getters by a
+                    # previous put_nowait call.
                     pass
-
                 if not self.empty() and not getter.cancelled():
                     # We were woken up by put_nowait(), but can't take
                     # the call.  Wake up the next in line.
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
index 8d78546..efe719e 100644
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -520,6 +520,56 @@
         self.loop.run_until_complete(
             asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
 
+    def test_cancelled_puts_not_being_held_in_self_putters(self):
+        def a_generator():
+            yield 0.01
+            yield 0.1
+
+        loop = self.new_test_loop(a_generator)
+
+        # Full queue.
+        queue = asyncio.Queue(loop=loop, maxsize=1)
+        queue.put_nowait(1)
+
+        # Task waiting for space to put an item in the queue.
+        put_task = loop.create_task(queue.put(1))
+        loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+        # Check that the putter is correctly removed from queue._putters when
+        # the task is canceled.
+        self.assertEqual(len(queue._putters), 1)
+        put_task.cancel()
+        with self.assertRaises(asyncio.CancelledError):
+            loop.run_until_complete(put_task)
+        self.assertEqual(len(queue._putters), 0)
+
+    def test_cancelled_put_silence_value_error_exception(self):
+        def gen():
+            yield 0.01
+            yield 0.1
+
+        loop = self.new_test_loop(gen)
+
+        # Full Queue.
+        queue = asyncio.Queue(1, loop=loop)
+        queue.put_nowait(1)
+
+        # Task waiting for space to put a item in the queue.
+        put_task = loop.create_task(queue.put(1))
+        loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+        # get_nowait() remove the future of put_task from queue._putters.
+        queue.get_nowait()
+        # When canceled, queue.put is going to remove its future from
+        # self._putters but it was removed previously by queue.get_nowait().
+        put_task.cancel()
+
+        # The ValueError exception triggered by queue._putters.remove(putter)
+        # inside queue.put should be silenced.
+        # If the ValueError is silenced we should catch a CancelledError.
+        with self.assertRaises(asyncio.CancelledError):
+            loop.run_until_complete(put_task)
+
 
 class LifoQueueTests(_QueueTestBase):
 
diff --git a/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst b/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst
new file mode 100644
index 0000000..00650d4
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-01-16-20-37-28.bpo-32574.ru8eZ9.rst
@@ -0,0 +1,3 @@
+Fix memory leak in asyncio.Queue, when the queue has limited size and it is
+full, the cancelation of queue.put() can cause a memory leak. Patch by: José
+Melero.