bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)


Currently, asyncio.wait_for(fut), upon reaching the timeout deadline,
cancels the future and returns immediately.  This is problematic for
when *fut* is a Task, because it will be left running for an arbitrary
amount of time.  This behavior is iself surprising and may lead to
related bugs such as the one described in bpo-33638:

    condition = asyncio.Condition()
    async with condition:
        await asyncio.wait_for(condition.wait(), timeout=0.5)

Currently, instead of raising a TimeoutError, the above code will fail
with `RuntimeError: cannot wait on un-acquired lock`, because
`__aexit__` is reached _before_ `condition.wait()` finishes its
cancellation and re-acquires the condition lock.

To resolve this, make `wait_for` await for the task cancellation.
The tradeoff here is that the `timeout` promise may be broken if the
task decides to handle its cancellation in a slow way.  This represents
a behavior change and should probably not be back-patched to 3.6 and
earlier.
(cherry picked from commit e2b340ab4196e1beb902327f503574b5d7369185)

Co-authored-by: Elvis Pranskevichus <elvis@magic.io>
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 6cef33d..72792a2 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -412,14 +412,17 @@
             return fut.result()
         else:
             fut.remove_done_callback(cb)
-            fut.cancel()
+            # We must ensure that the task is not running
+            # after wait_for() returns.
+            # See https://bugs.python.org/issue32751
+            await _cancel_and_wait(fut, loop=loop)
             raise futures.TimeoutError()
     finally:
         timeout_handle.cancel()
 
 
 async def _wait(fs, timeout, return_when, loop):
-    """Internal helper for wait() and wait_for().
+    """Internal helper for wait().
 
     The fs argument must be a collection of Futures.
     """
@@ -461,6 +464,22 @@
     return done, pending
 
 
+async def _cancel_and_wait(fut, loop):
+    """Cancel the *fut* future or task and wait until it completes."""
+
+    waiter = loop.create_future()
+    cb = functools.partial(_release_waiter, waiter)
+    fut.add_done_callback(cb)
+
+    try:
+        fut.cancel()
+        # We cannot wait on *fut* directly to make
+        # sure _cancel_and_wait itself is reliably cancellable.
+        await waiter
+    finally:
+        fut.remove_done_callback(cb)
+
+
 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
 def as_completed(fs, *, loop=None, timeout=None):
     """Return an iterator whose values are coroutines.