asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue #20566.
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 81a125f..b7ee758 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -463,7 +463,11 @@
 
 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
 def as_completed(fs, *, loop=None, timeout=None):
-    """Return an iterator whose values, when waited for, are Futures.
+    """Return an iterator whose values are coroutines.
+
+    When waiting for the yielded coroutines you'll get the results (or
+    exceptions!) of the original Futures (or coroutines), in the order
+    in which and as soon as they complete.
 
     This differs from PEP 3148; the proper way to use this is:
 
@@ -471,8 +475,8 @@
             result = yield from f  # The 'yield from' may raise.
             # Use result.
 
-    Raises TimeoutError if the timeout occurs before all Futures are
-    done.
+    If a timeout is specified, the 'yield from' will raise
+    TimeoutError when the timeout occurs before all Futures are done.
 
     Note: The futures 'f' are not necessarily members of fs.
     """
@@ -481,27 +485,36 @@
     loop = loop if loop is not None else events.get_event_loop()
     deadline = None if timeout is None else loop.time() + timeout
     todo = {async(f, loop=loop) for f in set(fs)}
-    completed = collections.deque()
+    from .queues import Queue  # Import here to avoid circular import problem.
+    done = Queue(loop=loop)
+    timeout_handle = None
+
+    def _on_timeout():
+        for f in todo:
+            f.remove_done_callback(_on_completion)
+            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
+        todo.clear()  # Can't do todo.remove(f) in the loop.
+
+    def _on_completion(f):
+        if not todo:
+            return  # _on_timeout() was here first.
+        todo.remove(f)
+        done.put_nowait(f)
+        if not todo and timeout_handle is not None:
+            timeout_handle.cancel()
 
     @coroutine
     def _wait_for_one():
-        while not completed:
-            timeout = None
-            if deadline is not None:
-                timeout = deadline - loop.time()
-                if timeout < 0:
-                    raise futures.TimeoutError()
-            done, pending = yield from _wait(
-                todo, timeout, FIRST_COMPLETED, loop)
-            # Multiple callers might be waiting for the same events
-            # and getting the same outcome.  Dedupe by updating todo.
-            for f in done:
-                if f in todo:
-                    todo.remove(f)
-                    completed.append(f)
-        f = completed.popleft()
-        return f.result()  # May raise.
+        f = yield from done.get()
+        if f is None:
+            # Dummy value from _on_timeout().
+            raise futures.TimeoutError
+        return f.result()  # May raise f.exception().
 
+    for f in todo:
+        f.add_done_callback(_on_completion)
+    if todo and timeout is not None:
+        timeout_handle = loop.call_later(timeout, _on_timeout)
     for _ in range(len(todo)):
         yield _wait_for_one()