bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)
Currently, asyncio.wait_for(fut), upon reaching the timeout deadline,
cancels the future and returns immediately. This is problematic for
when *fut* is a Task, because it will be left running for an arbitrary
amount of time. This behavior is iself surprising and may lead to
related bugs such as the one described in bpo-33638:
condition = asyncio.Condition()
async with condition:
await asyncio.wait_for(condition.wait(), timeout=0.5)
Currently, instead of raising a TimeoutError, the above code will fail
with `RuntimeError: cannot wait on un-acquired lock`, because
`__aexit__` is reached _before_ `condition.wait()` finishes its
cancellation and re-acquires the condition lock.
To resolve this, make `wait_for` await for the task cancellation.
The tradeoff here is that the `timeout` promise may be broken if the
task decides to handle its cancellation in a slow way. This represents
a behavior change and should probably not be back-patched to 3.6 and
earlier.
(cherry picked from commit e2b340ab4196e1beb902327f503574b5d7369185)
Co-authored-by: Elvis Pranskevichus <elvis@magic.io>
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 6cef33d..72792a2 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -412,14 +412,17 @@
return fut.result()
else:
fut.remove_done_callback(cb)
- fut.cancel()
+ # We must ensure that the task is not running
+ # after wait_for() returns.
+ # See https://bugs.python.org/issue32751
+ await _cancel_and_wait(fut, loop=loop)
raise futures.TimeoutError()
finally:
timeout_handle.cancel()
async def _wait(fs, timeout, return_when, loop):
- """Internal helper for wait() and wait_for().
+ """Internal helper for wait().
The fs argument must be a collection of Futures.
"""
@@ -461,6 +464,22 @@
return done, pending
+async def _cancel_and_wait(fut, loop):
+ """Cancel the *fut* future or task and wait until it completes."""
+
+ waiter = loop.create_future()
+ cb = functools.partial(_release_waiter, waiter)
+ fut.add_done_callback(cb)
+
+ try:
+ fut.cancel()
+ # We cannot wait on *fut* directly to make
+ # sure _cancel_and_wait itself is reliably cancellable.
+ await waiter
+ finally:
+ fut.remove_done_callback(cb)
+
+
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.