bpo-31033: Add a msg argument to Future.cancel() and Task.cancel() (GH-19979)

diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index a3cf379..889f3e6 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -51,6 +51,7 @@
     _exception = None
     _loop = None
     _source_traceback = None
+    _cancel_message = None
 
     # This field is used for a dual purpose:
     # - Its presence is a marker to declare that a class implements
@@ -123,7 +124,7 @@
             raise RuntimeError("Future object is not initialized.")
         return loop
 
-    def cancel(self):
+    def cancel(self, msg=None):
         """Cancel the future and schedule callbacks.
 
         If the future is already done or cancelled, return False.  Otherwise,
@@ -134,6 +135,7 @@
         if self._state != _PENDING:
             return False
         self._state = _CANCELLED
+        self._cancel_message = msg
         self.__schedule_callbacks()
         return True
 
@@ -173,7 +175,9 @@
         the future is done and has an exception set, this exception is raised.
         """
         if self._state == _CANCELLED:
-            raise exceptions.CancelledError
+            raise exceptions.CancelledError(
+                '' if self._cancel_message is None else self._cancel_message)
+
         if self._state != _FINISHED:
             raise exceptions.InvalidStateError('Result is not ready.')
         self.__log_traceback = False
@@ -190,7 +194,8 @@
         InvalidStateError.
         """
         if self._state == _CANCELLED:
-            raise exceptions.CancelledError
+            raise exceptions.CancelledError(
+                '' if self._cancel_message is None else self._cancel_message)
         if self._state != _FINISHED:
             raise exceptions.InvalidStateError('Exception is not set.')
         self.__log_traceback = False
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index f5de1a2..a3a0a33 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -230,7 +230,7 @@
         """
         return base_tasks._task_print_stack(self, limit, file)
 
-    def cancel(self):
+    def cancel(self, msg=None):
         """Request that this task cancel itself.
 
         This arranges for a CancelledError to be thrown into the
@@ -254,13 +254,14 @@
         if self.done():
             return False
         if self._fut_waiter is not None:
-            if self._fut_waiter.cancel():
+            if self._fut_waiter.cancel(msg=msg):
                 # Leave self._fut_waiter; it may be a Task that
                 # catches and ignores the cancellation so we may have
                 # to cancel it again later.
                 return True
         # It must be the case that self.__step is already scheduled.
         self._must_cancel = True
+        self._cancel_message = msg
         return True
 
     def __step(self, exc=None):
@@ -269,7 +270,8 @@
                 f'_step(): already done: {self!r}, {exc!r}')
         if self._must_cancel:
             if not isinstance(exc, exceptions.CancelledError):
-                exc = exceptions.CancelledError()
+                exc = exceptions.CancelledError(''
+                    if self._cancel_message is None else self._cancel_message)
             self._must_cancel = False
         coro = self._coro
         self._fut_waiter = None
@@ -287,11 +289,15 @@
             if self._must_cancel:
                 # Task is cancelled right before coro stops.
                 self._must_cancel = False
-                super().cancel()
+                super().cancel(msg=self._cancel_message)
             else:
                 super().set_result(exc.value)
-        except exceptions.CancelledError:
-            super().cancel()  # I.e., Future.cancel(self).
+        except exceptions.CancelledError as exc:
+            if exc.args:
+                cancel_msg = exc.args[0]
+            else:
+                cancel_msg = None
+            super().cancel(msg=cancel_msg)  # I.e., Future.cancel(self).
         except (KeyboardInterrupt, SystemExit) as exc:
             super().set_exception(exc)
             raise
@@ -319,7 +325,8 @@
                             self.__wakeup, context=self._context)
                         self._fut_waiter = result
                         if self._must_cancel:
-                            if self._fut_waiter.cancel():
+                            if self._fut_waiter.cancel(
+                                    msg=self._cancel_message):
                                 self._must_cancel = False
                 else:
                     new_exc = RuntimeError(
@@ -716,12 +723,12 @@
         self._children = children
         self._cancel_requested = False
 
-    def cancel(self):
+    def cancel(self, msg=None):
         if self.done():
             return False
         ret = False
         for child in self._children:
-            if child.cancel():
+            if child.cancel(msg=msg):
                 ret = True
         if ret:
             # If any child tasks were actually cancelled, we should
@@ -780,7 +787,8 @@
                 # Check if 'fut' is cancelled first, as
                 # 'fut.exception()' will *raise* a CancelledError
                 # instead of returning it.
-                exc = exceptions.CancelledError()
+                exc = exceptions.CancelledError(''
+                    if fut._cancel_message is None else fut._cancel_message)
                 outer.set_exception(exc)
                 return
             else:
@@ -799,7 +807,9 @@
                     # Check if 'fut' is cancelled first, as
                     # 'fut.exception()' will *raise* a CancelledError
                     # instead of returning it.
-                    res = exceptions.CancelledError()
+                    res = exceptions.CancelledError(
+                        '' if fut._cancel_message is None else
+                        fut._cancel_message)
                 else:
                     res = fut.exception()
                     if res is None:
@@ -810,7 +820,9 @@
                 # If gather is being cancelled we must propagate the
                 # cancellation regardless of *return_exceptions* argument.
                 # See issue 32684.
-                outer.set_exception(exceptions.CancelledError())
+                exc = exceptions.CancelledError(''
+                    if fut._cancel_message is None else fut._cancel_message)
+                outer.set_exception(exc)
             else:
                 outer.set_result(results)
 
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index ac51109..c07fe32 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -75,9 +75,9 @@
             self._loop.call_exception_handler(context)
         self._ov = None
 
-    def cancel(self):
+    def cancel(self, msg=None):
         self._cancel_overlapped()
-        return super().cancel()
+        return super().cancel(msg=msg)
 
     def set_exception(self, exception):
         super().set_exception(exception)
@@ -149,9 +149,9 @@
 
         self._unregister_wait_cb(None)
 
-    def cancel(self):
+    def cancel(self, msg=None):
         self._unregister_wait()
-        return super().cancel()
+        return super().cancel(msg=msg)
 
     def set_exception(self, exception):
         self._unregister_wait()
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index ee5edd5..ec00896 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -201,6 +201,27 @@
         self.assertFalse(fut.cancelled())
         self.assertFalse(fut.done())
 
+    def test_future_cancel_message_getter(self):
+        f = self._new_future(loop=self.loop)
+        self.assertTrue(hasattr(f, '_cancel_message'))
+        self.assertEqual(f._cancel_message, None)
+
+        f.cancel('my message')
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(f)
+        self.assertEqual(f._cancel_message, 'my message')
+
+    def test_future_cancel_message_setter(self):
+        f = self._new_future(loop=self.loop)
+        f.cancel('my message')
+        f._cancel_message = 'my new message'
+        self.assertEqual(f._cancel_message, 'my new message')
+
+        # Also check that the value is used for cancel().
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(f)
+        self.assertEqual(f._cancel_message, 'my new message')
+
     def test_cancel(self):
         f = self._new_future(loop=self.loop)
         self.assertTrue(f.cancel())
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 0f8d921..65bee52 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -103,6 +103,31 @@
         self.loop.set_task_factory(self.new_task)
         self.loop.create_future = lambda: self.new_future(self.loop)
 
+    def test_task_cancel_message_getter(self):
+        async def coro():
+            pass
+        t = self.new_task(self.loop, coro())
+        self.assertTrue(hasattr(t, '_cancel_message'))
+        self.assertEqual(t._cancel_message, None)
+
+        t.cancel('my message')
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(t)
+        self.assertEqual(t._cancel_message, 'my message')
+
+    def test_task_cancel_message_setter(self):
+        async def coro():
+            pass
+        t = self.new_task(self.loop, coro())
+        t.cancel('my message')
+        t._cancel_message = 'my new message'
+        self.assertEqual(t._cancel_message, 'my new message')
+
+        # Also check that the value is used for cancel().
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(t)
+        self.assertEqual(t._cancel_message, 'my new message')
+
     def test_task_del_collect(self):
         class Evil:
             def __del__(self):
@@ -520,6 +545,86 @@
         self.assertTrue(t.cancelled())
         self.assertFalse(t.cancel())
 
+    def test_cancel_with_message_then_future_result(self):
+        # Test Future.result() after calling cancel() with a message.
+        cases = [
+            ((), ('',)),
+            ((None,), ('',)),
+            (('my message',), ('my message',)),
+            # Non-string values should roundtrip.
+            ((5,), (5,)),
+        ]
+        for cancel_args, expected_args in cases:
+            with self.subTest(cancel_args=cancel_args):
+                loop = asyncio.new_event_loop()
+                self.set_event_loop(loop)
+
+                async def sleep():
+                    await asyncio.sleep(10)
+
+                async def coro():
+                    task = self.new_task(loop, sleep())
+                    await asyncio.sleep(0)
+                    task.cancel(*cancel_args)
+                    done, pending = await asyncio.wait([task])
+                    task.result()
+
+                task = self.new_task(loop, coro())
+                with self.assertRaises(asyncio.CancelledError) as cm:
+                    loop.run_until_complete(task)
+                exc = cm.exception
+                self.assertEqual(exc.args, expected_args)
+
+    def test_cancel_with_message_then_future_exception(self):
+        # Test Future.exception() after calling cancel() with a message.
+        cases = [
+            ((), ('',)),
+            ((None,), ('',)),
+            (('my message',), ('my message',)),
+            # Non-string values should roundtrip.
+            ((5,), (5,)),
+        ]
+        for cancel_args, expected_args in cases:
+            with self.subTest(cancel_args=cancel_args):
+                loop = asyncio.new_event_loop()
+                self.set_event_loop(loop)
+
+                async def sleep():
+                    await asyncio.sleep(10)
+
+                async def coro():
+                    task = self.new_task(loop, sleep())
+                    await asyncio.sleep(0)
+                    task.cancel(*cancel_args)
+                    done, pending = await asyncio.wait([task])
+                    task.exception()
+
+                task = self.new_task(loop, coro())
+                with self.assertRaises(asyncio.CancelledError) as cm:
+                    loop.run_until_complete(task)
+                exc = cm.exception
+                self.assertEqual(exc.args, expected_args)
+
+    def test_cancel_with_message_before_starting_task(self):
+        loop = asyncio.new_event_loop()
+        self.set_event_loop(loop)
+
+        async def sleep():
+            await asyncio.sleep(10)
+
+        async def coro():
+            task = self.new_task(loop, sleep())
+            # We deliberately leave out the sleep here.
+            task.cancel('my message')
+            done, pending = await asyncio.wait([task])
+            task.exception()
+
+        task = self.new_task(loop, coro())
+        with self.assertRaises(asyncio.CancelledError) as cm:
+            loop.run_until_complete(task)
+        exc = cm.exception
+        self.assertEqual(exc.args, ('my message',))
+
     def test_cancel_yield(self):
         with self.assertWarns(DeprecationWarning):
             @asyncio.coroutine
@@ -2285,31 +2390,42 @@
         self.assertEqual(gather_task.result(), [42])
 
     def test_cancel_gather_2(self):
-        loop = asyncio.new_event_loop()
-        self.addCleanup(loop.close)
+        cases = [
+            ((), ('',)),
+            ((None,), ('',)),
+            (('my message',), ('my message',)),
+            # Non-string values should roundtrip.
+            ((5,), (5,)),
+        ]
+        for cancel_args, expected_args in cases:
+            with self.subTest(cancel_args=cancel_args):
 
-        async def test():
-            time = 0
-            while True:
-                time += 0.05
-                await asyncio.gather(asyncio.sleep(0.05),
-                                     return_exceptions=True,
-                                     loop=loop)
-                if time > 1:
-                    return
+                loop = asyncio.new_event_loop()
+                self.addCleanup(loop.close)
 
-        async def main():
-            qwe = self.new_task(loop, test())
-            await asyncio.sleep(0.2)
-            qwe.cancel()
-            try:
-                await qwe
-            except asyncio.CancelledError:
-                pass
-            else:
-                self.fail('gather did not propagate the cancellation request')
+                async def test():
+                    time = 0
+                    while True:
+                        time += 0.05
+                        await asyncio.gather(asyncio.sleep(0.05),
+                                             return_exceptions=True,
+                                             loop=loop)
+                        if time > 1:
+                            return
 
-        loop.run_until_complete(main())
+                async def main():
+                    qwe = self.new_task(loop, test())
+                    await asyncio.sleep(0.2)
+                    qwe.cancel(*cancel_args)
+                    try:
+                        await qwe
+                    except asyncio.CancelledError as exc:
+                        self.assertEqual(exc.args, expected_args)
+                    else:
+                        self.fail('gather did not propagate the cancellation '
+                                  'request')
+
+                loop.run_until_complete(main())
 
     def test_exception_traceback(self):
         # See http://bugs.python.org/issue28843