bpo-32436: Implement PEP 567 (#5027)

diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index ca9eee7..e722cf2 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -489,7 +489,7 @@
         """
         return time.monotonic()
 
-    def call_later(self, delay, callback, *args):
+    def call_later(self, delay, callback, *args, context=None):
         """Arrange for a callback to be called at a given time.
 
         Return a Handle: an opaque object with a cancel() method that
@@ -505,12 +505,13 @@
         Any positional arguments after the callback will be passed to
         the callback when it is called.
         """
-        timer = self.call_at(self.time() + delay, callback, *args)
+        timer = self.call_at(self.time() + delay, callback, *args,
+                             context=context)
         if timer._source_traceback:
             del timer._source_traceback[-1]
         return timer
 
-    def call_at(self, when, callback, *args):
+    def call_at(self, when, callback, *args, context=None):
         """Like call_later(), but uses an absolute time.
 
         Absolute time corresponds to the event loop's time() method.
@@ -519,14 +520,14 @@
         if self._debug:
             self._check_thread()
             self._check_callback(callback, 'call_at')
-        timer = events.TimerHandle(when, callback, args, self)
+        timer = events.TimerHandle(when, callback, args, self, context)
         if timer._source_traceback:
             del timer._source_traceback[-1]
         heapq.heappush(self._scheduled, timer)
         timer._scheduled = True
         return timer
 
-    def call_soon(self, callback, *args):
+    def call_soon(self, callback, *args, context=None):
         """Arrange for a callback to be called as soon as possible.
 
         This operates as a FIFO queue: callbacks are called in the
@@ -540,7 +541,7 @@
         if self._debug:
             self._check_thread()
             self._check_callback(callback, 'call_soon')
-        handle = self._call_soon(callback, args)
+        handle = self._call_soon(callback, args, context)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         return handle
@@ -555,8 +556,8 @@
                 f'a callable object was expected by {method}(), '
                 f'got {callback!r}')
 
-    def _call_soon(self, callback, args):
-        handle = events.Handle(callback, args, self)
+    def _call_soon(self, callback, args, context):
+        handle = events.Handle(callback, args, self, context)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         self._ready.append(handle)
@@ -579,12 +580,12 @@
                 "Non-thread-safe operation invoked on an event loop other "
                 "than the current one")
 
-    def call_soon_threadsafe(self, callback, *args):
+    def call_soon_threadsafe(self, callback, *args, context=None):
         """Like call_soon(), but thread-safe."""
         self._check_closed()
         if self._debug:
             self._check_callback(callback, 'call_soon_threadsafe')
-        handle = self._call_soon(callback, args)
+        handle = self._call_soon(callback, args, context)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         self._write_to_self()
diff --git a/Lib/asyncio/base_futures.py b/Lib/asyncio/base_futures.py
index 008812e..5182884 100644
--- a/Lib/asyncio/base_futures.py
+++ b/Lib/asyncio/base_futures.py
@@ -41,13 +41,13 @@
         return format_helpers._format_callback_source(callback, ())
 
     if size == 1:
-        cb = format_cb(cb[0])
+        cb = format_cb(cb[0][0])
     elif size == 2:
-        cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
+        cb = '{}, {}'.format(format_cb(cb[0][0]), format_cb(cb[1][0]))
     elif size > 2:
-        cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
+        cb = '{}, <{} more>, {}'.format(format_cb(cb[0][0]),
                                         size - 2,
-                                        format_cb(cb[-1]))
+                                        format_cb(cb[-1][0]))
     return f'cb=[{cb}]'
 
 
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index d5365dc..5c68d4c 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -11,6 +11,7 @@
     '_get_running_loop',
 )
 
+import contextvars
 import os
 import socket
 import subprocess
@@ -32,9 +33,13 @@
     """Object returned by callback registration methods."""
 
     __slots__ = ('_callback', '_args', '_cancelled', '_loop',
-                 '_source_traceback', '_repr', '__weakref__')
+                 '_source_traceback', '_repr', '__weakref__',
+                 '_context')
 
-    def __init__(self, callback, args, loop):
+    def __init__(self, callback, args, loop, context=None):
+        if context is None:
+            context = contextvars.copy_context()
+        self._context = context
         self._loop = loop
         self._callback = callback
         self._args = args
@@ -80,7 +85,7 @@
 
     def _run(self):
         try:
-            self._callback(*self._args)
+            self._context.run(self._callback, *self._args)
         except Exception as exc:
             cb = format_helpers._format_callback_source(
                 self._callback, self._args)
@@ -101,9 +106,9 @@
 
     __slots__ = ['_scheduled', '_when']
 
-    def __init__(self, when, callback, args, loop):
+    def __init__(self, when, callback, args, loop, context=None):
         assert when is not None
-        super().__init__(callback, args, loop)
+        super().__init__(callback, args, loop, context)
         if self._source_traceback:
             del self._source_traceback[-1]
         self._when = when
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index 1c05b22..59621ff 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -6,6 +6,7 @@
 )
 
 import concurrent.futures
+import contextvars
 import logging
 import sys
 
@@ -144,8 +145,8 @@
             return
 
         self._callbacks[:] = []
-        for callback in callbacks:
-            self._loop.call_soon(callback, self)
+        for callback, ctx in callbacks:
+            self._loop.call_soon(callback, self, context=ctx)
 
     def cancelled(self):
         """Return True if the future was cancelled."""
@@ -192,7 +193,7 @@
         self.__log_traceback = False
         return self._exception
 
-    def add_done_callback(self, fn):
+    def add_done_callback(self, fn, *, context=None):
         """Add a callback to be run when the future becomes done.
 
         The callback is called with a single argument - the future object. If
@@ -200,9 +201,11 @@
         scheduled with call_soon.
         """
         if self._state != _PENDING:
-            self._loop.call_soon(fn, self)
+            self._loop.call_soon(fn, self, context=context)
         else:
-            self._callbacks.append(fn)
+            if context is None:
+                context = contextvars.copy_context()
+            self._callbacks.append((fn, context))
 
     # New method not in PEP 3148.
 
@@ -211,7 +214,9 @@
 
         Returns the number of callbacks removed.
         """
-        filtered_callbacks = [f for f in self._callbacks if f != fn]
+        filtered_callbacks = [(f, ctx)
+                              for (f, ctx) in self._callbacks
+                              if f != fn]
         removed_count = len(self._callbacks) - len(filtered_callbacks)
         if removed_count:
             self._callbacks[:] = filtered_callbacks
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5692e38..9446ae6 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -256,7 +256,7 @@
 
     def _add_reader(self, fd, callback, *args):
         self._check_closed()
-        handle = events.Handle(callback, args, self)
+        handle = events.Handle(callback, args, self, None)
         try:
             key = self._selector.get_key(fd)
         except KeyError:
@@ -292,7 +292,7 @@
 
     def _add_writer(self, fd, callback, *args):
         self._check_closed()
-        handle = events.Handle(callback, args, self)
+        handle = events.Handle(callback, args, self, None)
         try:
             key = self._selector.get_key(fd)
         except KeyError:
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index b118088..609b8e8 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -10,6 +10,7 @@
 )
 
 import concurrent.futures
+import contextvars
 import functools
 import inspect
 import types
@@ -96,8 +97,9 @@
         self._must_cancel = False
         self._fut_waiter = None
         self._coro = coro
+        self._context = contextvars.copy_context()
 
-        self._loop.call_soon(self._step)
+        self._loop.call_soon(self._step, context=self._context)
         _register_task(self)
 
     def __del__(self):
@@ -229,15 +231,18 @@
                     new_exc = RuntimeError(
                         f'Task {self!r} got Future '
                         f'{result!r} attached to a different loop')
-                    self._loop.call_soon(self._step, new_exc)
+                    self._loop.call_soon(
+                        self._step, new_exc, context=self._context)
                 elif blocking:
                     if result is self:
                         new_exc = RuntimeError(
                             f'Task cannot await on itself: {self!r}')
-                        self._loop.call_soon(self._step, new_exc)
+                        self._loop.call_soon(
+                            self._step, new_exc, context=self._context)
                     else:
                         result._asyncio_future_blocking = False
-                        result.add_done_callback(self._wakeup)
+                        result.add_done_callback(
+                            self._wakeup, context=self._context)
                         self._fut_waiter = result
                         if self._must_cancel:
                             if self._fut_waiter.cancel():
@@ -246,21 +251,24 @@
                     new_exc = RuntimeError(
                         f'yield was used instead of yield from '
                         f'in task {self!r} with {result!r}')
-                    self._loop.call_soon(self._step, new_exc)
+                    self._loop.call_soon(
+                        self._step, new_exc, context=self._context)
 
             elif result is None:
                 # Bare yield relinquishes control for one event loop iteration.
-                self._loop.call_soon(self._step)
+                self._loop.call_soon(self._step, context=self._context)
             elif inspect.isgenerator(result):
                 # Yielding a generator is just wrong.
                 new_exc = RuntimeError(
                     f'yield was used instead of yield from for '
                     f'generator in task {self!r} with {result}')
-                self._loop.call_soon(self._step, new_exc)
+                self._loop.call_soon(
+                    self._step, new_exc, context=self._context)
             else:
                 # Yielding something else is an error.
                 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
-                self._loop.call_soon(self._step, new_exc)
+                self._loop.call_soon(
+                    self._step, new_exc, context=self._context)
         finally:
             _leave_task(self._loop, self)
             self = None  # Needed to break cycles when an exception occurs.
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 028a0ca..9b9d004 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -92,7 +92,7 @@
         except (ValueError, OSError) as exc:
             raise RuntimeError(str(exc))
 
-        handle = events.Handle(callback, args, self)
+        handle = events.Handle(callback, args, self, None)
         self._signal_handlers[sig] = handle
 
         try: