bpo-41891: ensure asyncio.wait_for waits for task completion (#22461)

diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 0d3a24b..52f1e66 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -437,7 +437,10 @@ async def wait_for(fut, timeout):
                 return fut.result()
             else:
                 fut.remove_done_callback(cb)
-                fut.cancel()
+                # We must ensure that the task is not running
+                # after wait_for() returns.
+                # See https://bugs.python.org/issue32751
+                await _cancel_and_wait(fut, loop=loop)
                 raise
 
         if fut.done():
diff --git a/Lib/test/test_asyncio/test_asyncio_waitfor.py b/Lib/test/test_asyncio/test_asyncio_waitfor.py
new file mode 100644
index 0000000..2ca64ab
--- /dev/null
+++ b/Lib/test/test_asyncio/test_asyncio_waitfor.py
@@ -0,0 +1,61 @@
+import asyncio
+import unittest
+import time
+
+def tearDownModule():
+    asyncio.set_event_loop_policy(None)
+
+
+class SlowTask:
+    """ Task will run for this defined time, ignoring cancel requests """
+    TASK_TIMEOUT = 0.2
+
+    def __init__(self):
+        self.exited = False
+
+    async def run(self):
+        exitat = time.monotonic() + self.TASK_TIMEOUT
+
+        while True:
+            tosleep = exitat - time.monotonic()
+            if tosleep <= 0:
+                break
+
+            try:
+                await asyncio.sleep(tosleep)
+            except asyncio.CancelledError:
+                pass
+
+        self.exited = True
+
+class AsyncioWaitForTest(unittest.TestCase):
+
+    async def atest_asyncio_wait_for_cancelled(self):
+        t  = SlowTask()
+
+        waitfortask = asyncio.create_task(asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
+        await asyncio.sleep(0)
+        waitfortask.cancel()
+        await asyncio.wait({waitfortask})
+
+        self.assertTrue(t.exited)
+
+    def test_asyncio_wait_for_cancelled(self):
+        asyncio.run(self.atest_asyncio_wait_for_cancelled())
+
+    async def atest_asyncio_wait_for_timeout(self):
+        t  = SlowTask()
+
+        try:
+            await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
+        except asyncio.TimeoutError:
+            pass
+
+        self.assertTrue(t.exited)
+
+    def test_asyncio_wait_for_timeout(self):
+        asyncio.run(self.atest_asyncio_wait_for_timeout())
+
+
+if __name__ == '__main__':
+    unittest.main()