Issue #28544: Implement asyncio.Task in C.

This implementation provides additional 10-20% speed boost for
asyncio programs.

The patch also fixes _asynciomodule.c to use Arguments Clinic, and
makes '_schedule_callbacks' an overridable method (as it was in 3.5).
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 5880061..cc9994d 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -57,7 +57,7 @@
 
 def _format_handle(handle):
     cb = handle._callback
-    if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
+    if isinstance(getattr(cb, '__self__', None), tasks.Task):
         # format the task
         return repr(cb.__self__)
     else:
diff --git a/Lib/asyncio/base_futures.py b/Lib/asyncio/base_futures.py
new file mode 100644
index 0000000..64f7845
--- /dev/null
+++ b/Lib/asyncio/base_futures.py
@@ -0,0 +1,70 @@
+__all__ = []
+
+import concurrent.futures._base
+import reprlib
+
+from . import events
+
+Error = concurrent.futures._base.Error
+CancelledError = concurrent.futures.CancelledError
+TimeoutError = concurrent.futures.TimeoutError
+
+
+class InvalidStateError(Error):
+    """The operation is not allowed in this state."""
+
+
+# States for Future.
+_PENDING = 'PENDING'
+_CANCELLED = 'CANCELLED'
+_FINISHED = 'FINISHED'
+
+
+def isfuture(obj):
+    """Check for a Future.
+
+    This returns True when obj is a Future instance or is advertising
+    itself as duck-type compatible by setting _asyncio_future_blocking.
+    See comment in Future for more details.
+    """
+    return getattr(obj, '_asyncio_future_blocking', None) is not None
+
+
+def _format_callbacks(cb):
+    """helper function for Future.__repr__"""
+    size = len(cb)
+    if not size:
+        cb = ''
+
+    def format_cb(callback):
+        return events._format_callback_source(callback, ())
+
+    if size == 1:
+        cb = format_cb(cb[0])
+    elif size == 2:
+        cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
+    elif size > 2:
+        cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
+                                        size - 2,
+                                        format_cb(cb[-1]))
+    return 'cb=[%s]' % cb
+
+
+def _future_repr_info(future):
+    # (Future) -> str
+    """helper function for Future.__repr__"""
+    info = [future._state.lower()]
+    if future._state == _FINISHED:
+        if future._exception is not None:
+            info.append('exception={!r}'.format(future._exception))
+        else:
+            # use reprlib to limit the length of the output, especially
+            # for very long strings
+            result = reprlib.repr(future._result)
+            info.append('result={}'.format(result))
+    if future._callbacks:
+        info.append(_format_callbacks(future._callbacks))
+    if future._source_traceback:
+        frame = future._source_traceback[-1]
+        info.append('created at %s:%s' % (frame[0], frame[1]))
+    return info
diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py
new file mode 100644
index 0000000..5f34434
--- /dev/null
+++ b/Lib/asyncio/base_tasks.py
@@ -0,0 +1,76 @@
+import linecache
+import traceback
+
+from . import base_futures
+from . import coroutines
+
+
+def _task_repr_info(task):
+    info = base_futures._future_repr_info(task)
+
+    if task._must_cancel:
+        # replace status
+        info[0] = 'cancelling'
+
+    coro = coroutines._format_coroutine(task._coro)
+    info.insert(1, 'coro=<%s>' % coro)
+
+    if task._fut_waiter is not None:
+        info.insert(2, 'wait_for=%r' % task._fut_waiter)
+    return info
+
+
+def _task_get_stack(task, limit):
+    frames = []
+    try:
+        # 'async def' coroutines
+        f = task._coro.cr_frame
+    except AttributeError:
+        f = task._coro.gi_frame
+    if f is not None:
+        while f is not None:
+            if limit is not None:
+                if limit <= 0:
+                    break
+                limit -= 1
+            frames.append(f)
+            f = f.f_back
+        frames.reverse()
+    elif task._exception is not None:
+        tb = task._exception.__traceback__
+        while tb is not None:
+            if limit is not None:
+                if limit <= 0:
+                    break
+                limit -= 1
+            frames.append(tb.tb_frame)
+            tb = tb.tb_next
+    return frames
+
+
+def _task_print_stack(task, limit, file):
+    extracted_list = []
+    checked = set()
+    for f in task.get_stack(limit=limit):
+        lineno = f.f_lineno
+        co = f.f_code
+        filename = co.co_filename
+        name = co.co_name
+        if filename not in checked:
+            checked.add(filename)
+            linecache.checkcache(filename)
+        line = linecache.getline(filename, lineno, f.f_globals)
+        extracted_list.append((filename, lineno, name, line))
+    exc = task._exception
+    if not extracted_list:
+        print('No stack for %r' % task, file=file)
+    elif exc is not None:
+        print('Traceback for %r (most recent call last):' % task,
+              file=file)
+    else:
+        print('Stack for %r (most recent call last):' % task,
+              file=file)
+    traceback.print_list(extracted_list, file=file)
+    if exc is not None:
+        for line in traceback.format_exception_only(exc.__class__, exc):
+            print(line, file=file, end='')
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 1db7030..167c1e4 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -11,7 +11,7 @@
 
 from . import compat
 from . import events
-from . import futures
+from . import base_futures
 from .log import logger
 
 
@@ -204,7 +204,7 @@
         @functools.wraps(func)
         def coro(*args, **kw):
             res = func(*args, **kw)
-            if (futures.isfuture(res) or inspect.isgenerator(res) or
+            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
                 isinstance(res, CoroWrapper)):
                 res = yield from res
             elif _AwaitableABC is not None:
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index b571130..d11d289 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -1,35 +1,32 @@
 """A Future class similar to the one in PEP 3148."""
 
-__all__ = ['CancelledError', 'TimeoutError',
-           'InvalidStateError',
-           'Future', 'wrap_future', 'isfuture'
-           ]
+__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError',
+           'Future', 'wrap_future', 'isfuture']
 
-import concurrent.futures._base
+import concurrent.futures
 import logging
-import reprlib
 import sys
 import traceback
 
+from . import base_futures
 from . import compat
 from . import events
 
-# States for Future.
-_PENDING = 'PENDING'
-_CANCELLED = 'CANCELLED'
-_FINISHED = 'FINISHED'
 
-Error = concurrent.futures._base.Error
-CancelledError = concurrent.futures.CancelledError
-TimeoutError = concurrent.futures.TimeoutError
+CancelledError = base_futures.CancelledError
+InvalidStateError = base_futures.InvalidStateError
+TimeoutError = base_futures.TimeoutError
+isfuture = base_futures.isfuture
+
+
+_PENDING = base_futures._PENDING
+_CANCELLED = base_futures._CANCELLED
+_FINISHED = base_futures._FINISHED
+
 
 STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
 
 
-class InvalidStateError(Error):
-    """The operation is not allowed in this state."""
-
-
 class _TracebackLogger:
     """Helper to log a traceback upon destruction if not cleared.
 
@@ -110,56 +107,6 @@
             self.loop.call_exception_handler({'message': msg})
 
 
-def isfuture(obj):
-    """Check for a Future.
-
-    This returns True when obj is a Future instance or is advertising
-    itself as duck-type compatible by setting _asyncio_future_blocking.
-    See comment in Future for more details.
-    """
-    return getattr(obj, '_asyncio_future_blocking', None) is not None
-
-
-def _format_callbacks(cb):
-    """helper function for Future.__repr__"""
-    size = len(cb)
-    if not size:
-        cb = ''
-
-    def format_cb(callback):
-        return events._format_callback_source(callback, ())
-
-    if size == 1:
-        cb = format_cb(cb[0])
-    elif size == 2:
-        cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
-    elif size > 2:
-        cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
-                                        size-2,
-                                        format_cb(cb[-1]))
-    return 'cb=[%s]' % cb
-
-
-def _future_repr_info(future):
-    # (Future) -> str
-    """helper function for Future.__repr__"""
-    info = [future._state.lower()]
-    if future._state == _FINISHED:
-        if future._exception is not None:
-            info.append('exception={!r}'.format(future._exception))
-        else:
-            # use reprlib to limit the length of the output, especially
-            # for very long strings
-            result = reprlib.repr(future._result)
-            info.append('result={}'.format(result))
-    if future._callbacks:
-        info.append(_format_callbacks(future._callbacks))
-    if future._source_traceback:
-        frame = future._source_traceback[-1]
-        info.append('created at %s:%s' % (frame[0], frame[1]))
-    return info
-
-
 class Future:
     """This class is *almost* compatible with concurrent.futures.Future.
 
@@ -212,7 +159,7 @@
         if self._loop.get_debug():
             self._source_traceback = traceback.extract_stack(sys._getframe(1))
 
-    _repr_info = _future_repr_info
+    _repr_info = base_futures._future_repr_info
 
     def __repr__(self):
         return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info()))
@@ -247,10 +194,10 @@
         if self._state != _PENDING:
             return False
         self._state = _CANCELLED
-        self.__schedule_callbacks()
+        self._schedule_callbacks()
         return True
 
-    def __schedule_callbacks(self):
+    def _schedule_callbacks(self):
         """Internal: Ask the event loop to call all callbacks.
 
         The callbacks are scheduled to be called as soon as possible. Also
@@ -352,7 +299,7 @@
             raise InvalidStateError('{}: {!r}'.format(self._state, self))
         self._result = result
         self._state = _FINISHED
-        self.__schedule_callbacks()
+        self._schedule_callbacks()
 
     def set_exception(self, exception):
         """Mark the future done and set an exception.
@@ -369,7 +316,7 @@
                             "and cannot be raised into a Future")
         self._exception = exception
         self._state = _FINISHED
-        self.__schedule_callbacks()
+        self._schedule_callbacks()
         if compat.PY34:
             self._log_traceback = True
         else:
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 8852aa5..5a43ef2 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -9,11 +9,10 @@
 import concurrent.futures
 import functools
 import inspect
-import linecache
-import traceback
 import warnings
 import weakref
 
+from . import base_tasks
 from . import compat
 from . import coroutines
 from . import events
@@ -93,18 +92,7 @@
             futures.Future.__del__(self)
 
     def _repr_info(self):
-        info = super()._repr_info()
-
-        if self._must_cancel:
-            # replace status
-            info[0] = 'cancelling'
-
-        coro = coroutines._format_coroutine(self._coro)
-        info.insert(1, 'coro=<%s>' % coro)
-
-        if self._fut_waiter is not None:
-            info.insert(2, 'wait_for=%r' % self._fut_waiter)
-        return info
+        return base_tasks._task_repr_info(self)
 
     def get_stack(self, *, limit=None):
         """Return the list of stack frames for this task's coroutine.
@@ -127,31 +115,7 @@
         For reasons beyond our control, only one stack frame is
         returned for a suspended coroutine.
         """
-        frames = []
-        try:
-            # 'async def' coroutines
-            f = self._coro.cr_frame
-        except AttributeError:
-            f = self._coro.gi_frame
-        if f is not None:
-            while f is not None:
-                if limit is not None:
-                    if limit <= 0:
-                        break
-                    limit -= 1
-                frames.append(f)
-                f = f.f_back
-            frames.reverse()
-        elif self._exception is not None:
-            tb = self._exception.__traceback__
-            while tb is not None:
-                if limit is not None:
-                    if limit <= 0:
-                        break
-                    limit -= 1
-                frames.append(tb.tb_frame)
-                tb = tb.tb_next
-        return frames
+        return base_tasks._task_get_stack(self, limit)
 
     def print_stack(self, *, limit=None, file=None):
         """Print the stack or traceback for this task's coroutine.
@@ -162,31 +126,7 @@
         to which the output is written; by default output is written
         to sys.stderr.
         """
-        extracted_list = []
-        checked = set()
-        for f in self.get_stack(limit=limit):
-            lineno = f.f_lineno
-            co = f.f_code
-            filename = co.co_filename
-            name = co.co_name
-            if filename not in checked:
-                checked.add(filename)
-                linecache.checkcache(filename)
-            line = linecache.getline(filename, lineno, f.f_globals)
-            extracted_list.append((filename, lineno, name, line))
-        exc = self._exception
-        if not extracted_list:
-            print('No stack for %r' % self, file=file)
-        elif exc is not None:
-            print('Traceback for %r (most recent call last):' % self,
-                  file=file)
-        else:
-            print('Stack for %r (most recent call last):' % self,
-                  file=file)
-        traceback.print_list(extracted_list, file=file)
-        if exc is not None:
-            for line in traceback.format_exception_only(exc.__class__, exc):
-                print(line, file=file, end='')
+        return base_tasks._task_print_stack(self, limit, file)
 
     def cancel(self):
         """Request that this task cancel itself.
@@ -316,6 +256,18 @@
         self = None  # Needed to break cycles when an exception occurs.
 
 
+_PyTask = Task
+
+
+try:
+    import _asyncio
+except ImportError:
+    pass
+else:
+    # _CTask is needed for tests.
+    Task = _CTask = _asyncio.Task
+
+
 # wait() and as_completed() similar to those in PEP 3148.
 
 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED