bpo-32193: Convert asyncio to async/await usage (#4753)

* Convert asyncio/tasks.py to async/await

* Convert asyncio/queues.py to async/await

* Convert asyncio/test_utils.py to async/await

* Convert asyncio/base_subprocess.py to async/await

* Convert asyncio/subprocess.py to async/await

* Convert asyncio/streams.py to async/await

* Fix comments

* Convert asyncio/locks.py to async/await

* Convert asyncio.sleep to async def

* Add a comment

* Add missing news

* Convert stubs from AbstrctEventLoop to async functions

* Convert subprocess_shell/subprocess_exec

* Convert connect_read_pipe/connect_write_pip to async/await syntax

* Convert create_datagram_endpoint

* Convert create_unix_server/create_unix_connection

* Get rid of old style coroutines in unix_events.py

* Convert selector_events.py to async/await

* Convert wait_closed and create_connection

* Drop redundant line

* Convert base_events.py

* Code cleanup

* Drop redundant comments

* Fix indentation

* Add explicit tests for compatibility between old and new coroutines

* Convert windows event loop to use async/await

* Fix double awaiting of async function

* Convert asyncio/locks.py

* Improve docstring

* Convert tests to async/await

* Convert more tests

* Convert more tests

* Convert more tests

* Convert tests

* Improve test
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index ffdb50f..ab92a0b 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -33,7 +33,6 @@
 from . import events
 from . import futures
 from . import tasks
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -220,13 +219,12 @@
             if not waiter.done():
                 waiter.set_result(waiter)
 
-    @coroutine
-    def wait_closed(self):
+    async def wait_closed(self):
         if self.sockets is None or self._waiters is None:
             return
         waiter = self._loop.create_future()
         self._waiters.append(waiter)
-        yield from waiter
+        await waiter
 
 
 class BaseEventLoop(events.AbstractEventLoop):
@@ -330,10 +328,9 @@
         """Create write pipe transport."""
         raise NotImplementedError
 
-    @coroutine
-    def _make_subprocess_transport(self, protocol, args, shell,
-                                   stdin, stdout, stderr, bufsize,
-                                   extra=None, **kwargs):
+    async def _make_subprocess_transport(self, protocol, args, shell,
+                                         stdin, stdout, stderr, bufsize,
+                                         extra=None, **kwargs):
         """Create subprocess transport."""
         raise NotImplementedError
 
@@ -371,8 +368,7 @@
 
         self._asyncgens.add(agen)
 
-    @coroutine
-    def shutdown_asyncgens(self):
+    async def shutdown_asyncgens(self):
         """Shutdown all active asynchronous generators."""
         self._asyncgens_shutdown_called = True
 
@@ -384,12 +380,11 @@
         closing_agens = list(self._asyncgens)
         self._asyncgens.clear()
 
-        shutdown_coro = tasks.gather(
+        results = await tasks.gather(
             *[ag.aclose() for ag in closing_agens],
             return_exceptions=True,
             loop=self)
 
-        results = yield from shutdown_coro
         for result, agen in zip(results, closing_agens):
             if isinstance(result, Exception):
                 self.call_exception_handler({
@@ -671,10 +666,10 @@
     def getnameinfo(self, sockaddr, flags=0):
         return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
 
-    @coroutine
-    def create_connection(self, protocol_factory, host=None, port=None, *,
-                          ssl=None, family=0, proto=0, flags=0, sock=None,
-                          local_addr=None, server_hostname=None):
+    async def create_connection(self, protocol_factory, host=None, port=None,
+                                *, ssl=None, family=0,
+                                proto=0, flags=0, sock=None,
+                                local_addr=None, server_hostname=None):
         """Connect to a TCP server.
 
         Create a streaming transport connection to a given Internet host and
@@ -722,7 +717,7 @@
             else:
                 f2 = None
 
-            yield from tasks.wait(fs, loop=self)
+            await tasks.wait(fs, loop=self)
 
             infos = f1.result()
             if not infos:
@@ -755,7 +750,7 @@
                             continue
                     if self._debug:
                         logger.debug("connect %r to %r", sock, address)
-                    yield from self.sock_connect(sock, address)
+                    await self.sock_connect(sock, address)
                 except OSError as exc:
                     if sock is not None:
                         sock.close()
@@ -793,7 +788,7 @@
                 raise ValueError(
                     'A Stream Socket was expected, got {!r}'.format(sock))
 
-        transport, protocol = yield from self._create_connection_transport(
+        transport, protocol = await self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
         if self._debug:
             # Get the socket from the transport because SSL transport closes
@@ -803,9 +798,8 @@
                          sock, host, port, transport, protocol)
         return transport, protocol
 
-    @coroutine
-    def _create_connection_transport(self, sock, protocol_factory, ssl,
-                                     server_hostname, server_side=False):
+    async def _create_connection_transport(self, sock, protocol_factory, ssl,
+                                           server_hostname, server_side=False):
 
         sock.setblocking(False)
 
@@ -820,19 +814,18 @@
             transport = self._make_socket_transport(sock, protocol, waiter)
 
         try:
-            yield from waiter
+            await waiter
         except:
             transport.close()
             raise
 
         return transport, protocol
 
-    @coroutine
-    def create_datagram_endpoint(self, protocol_factory,
-                                 local_addr=None, remote_addr=None, *,
-                                 family=0, proto=0, flags=0,
-                                 reuse_address=None, reuse_port=None,
-                                 allow_broadcast=None, sock=None):
+    async def create_datagram_endpoint(self, protocol_factory,
+                                       local_addr=None, remote_addr=None, *,
+                                       family=0, proto=0, flags=0,
+                                       reuse_address=None, reuse_port=None,
+                                       allow_broadcast=None, sock=None):
         """Create datagram connection."""
         if sock is not None:
             if not _is_dgram_socket(sock):
@@ -872,7 +865,7 @@
                         assert isinstance(addr, tuple) and len(addr) == 2, (
                             '2-tuple is expected')
 
-                        infos = yield from _ensure_resolved(
+                        infos = await _ensure_resolved(
                             addr, family=family, type=socket.SOCK_DGRAM,
                             proto=proto, flags=flags, loop=self)
                         if not infos:
@@ -918,7 +911,7 @@
                     if local_addr:
                         sock.bind(local_address)
                     if remote_addr:
-                        yield from self.sock_connect(sock, remote_address)
+                        await self.sock_connect(sock, remote_address)
                         r_addr = remote_address
                 except OSError as exc:
                     if sock is not None:
@@ -948,32 +941,30 @@
                              remote_addr, transport, protocol)
 
         try:
-            yield from waiter
+            await waiter
         except:
             transport.close()
             raise
 
         return transport, protocol
 
-    @coroutine
-    def _create_server_getaddrinfo(self, host, port, family, flags):
-        infos = yield from _ensure_resolved((host, port), family=family,
-                                            type=socket.SOCK_STREAM,
-                                            flags=flags, loop=self)
+    async def _create_server_getaddrinfo(self, host, port, family, flags):
+        infos = await _ensure_resolved((host, port), family=family,
+                                       type=socket.SOCK_STREAM,
+                                       flags=flags, loop=self)
         if not infos:
             raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
         return infos
 
-    @coroutine
-    def create_server(self, protocol_factory, host=None, port=None,
-                      *,
-                      family=socket.AF_UNSPEC,
-                      flags=socket.AI_PASSIVE,
-                      sock=None,
-                      backlog=100,
-                      ssl=None,
-                      reuse_address=None,
-                      reuse_port=None):
+    async def create_server(self, protocol_factory, host=None, port=None,
+                            *,
+                            family=socket.AF_UNSPEC,
+                            flags=socket.AI_PASSIVE,
+                            sock=None,
+                            backlog=100,
+                            ssl=None,
+                            reuse_address=None,
+                            reuse_port=None):
         """Create a TCP server.
 
         The host parameter can be a string, in that case the TCP server is bound
@@ -1011,7 +1002,7 @@
             fs = [self._create_server_getaddrinfo(host, port, family=family,
                                                   flags=flags)
                   for host in hosts]
-            infos = yield from tasks.gather(*fs, loop=self)
+            infos = await tasks.gather(*fs, loop=self)
             infos = set(itertools.chain.from_iterable(infos))
 
             completed = False
@@ -1068,8 +1059,8 @@
             logger.info("%r is serving", server)
         return server
 
-    @coroutine
-    def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
+    async def connect_accepted_socket(self, protocol_factory, sock,
+                                      *, ssl=None):
         """Handle an accepted connection.
 
         This is used by servers that accept connections outside of
@@ -1082,7 +1073,7 @@
             raise ValueError(
                 'A Stream Socket was expected, got {!r}'.format(sock))
 
-        transport, protocol = yield from self._create_connection_transport(
+        transport, protocol = await self._create_connection_transport(
             sock, protocol_factory, ssl, '', server_side=True)
         if self._debug:
             # Get the socket from the transport because SSL transport closes
@@ -1091,14 +1082,13 @@
             logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
         return transport, protocol
 
-    @coroutine
-    def connect_read_pipe(self, protocol_factory, pipe):
+    async def connect_read_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
         waiter = self.create_future()
         transport = self._make_read_pipe_transport(pipe, protocol, waiter)
 
         try:
-            yield from waiter
+            await waiter
         except:
             transport.close()
             raise
@@ -1108,14 +1098,13 @@
                          pipe.fileno(), transport, protocol)
         return transport, protocol
 
-    @coroutine
-    def connect_write_pipe(self, protocol_factory, pipe):
+    async def connect_write_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
         waiter = self.create_future()
         transport = self._make_write_pipe_transport(pipe, protocol, waiter)
 
         try:
-            yield from waiter
+            await waiter
         except:
             transport.close()
             raise
@@ -1138,11 +1127,13 @@
                 info.append('stderr=%s' % _format_pipe(stderr))
         logger.debug(' '.join(info))
 
-    @coroutine
-    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                         universal_newlines=False, shell=True, bufsize=0,
-                         **kwargs):
+    async def subprocess_shell(self, protocol_factory, cmd, *,
+                               stdin=subprocess.PIPE,
+                               stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE,
+                               universal_newlines=False,
+                               shell=True, bufsize=0,
+                               **kwargs):
         if not isinstance(cmd, (bytes, str)):
             raise ValueError("cmd must be a string")
         if universal_newlines:
@@ -1157,17 +1148,16 @@
             # (password) and may be too long
             debug_log = 'run shell command %r' % cmd
             self._log_subprocess(debug_log, stdin, stdout, stderr)
-        transport = yield from self._make_subprocess_transport(
+        transport = await self._make_subprocess_transport(
             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
         if self._debug:
             logger.info('%s: %r', debug_log, transport)
         return transport, protocol
 
-    @coroutine
-    def subprocess_exec(self, protocol_factory, program, *args,
-                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
-                        stderr=subprocess.PIPE, universal_newlines=False,
-                        shell=False, bufsize=0, **kwargs):
+    async def subprocess_exec(self, protocol_factory, program, *args,
+                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE, universal_newlines=False,
+                              shell=False, bufsize=0, **kwargs):
         if universal_newlines:
             raise ValueError("universal_newlines must be False")
         if shell:
@@ -1186,7 +1176,7 @@
             # (password) and may be too long
             debug_log = 'execute program %r' % program
             self._log_subprocess(debug_log, stdin, stdout, stderr)
-        transport = yield from self._make_subprocess_transport(
+        transport = await self._make_subprocess_transport(
             protocol, popen_args, False, stdin, stdout, stderr,
             bufsize, **kwargs)
         if self._debug:
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index cac8d96..7e5a901 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -4,7 +4,6 @@
 
 from . import protocols
 from . import transports
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -154,26 +153,25 @@
         self._check_proc()
         self._proc.kill()
 
-    @coroutine
-    def _connect_pipes(self, waiter):
+    async def _connect_pipes(self, waiter):
         try:
             proc = self._proc
             loop = self._loop
 
             if proc.stdin is not None:
-                _, pipe = yield from loop.connect_write_pipe(
+                _, pipe = await loop.connect_write_pipe(
                     lambda: WriteSubprocessPipeProto(self, 0),
                     proc.stdin)
                 self._pipes[0] = pipe
 
             if proc.stdout is not None:
-                _, pipe = yield from loop.connect_read_pipe(
+                _, pipe = await loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 1),
                     proc.stdout)
                 self._pipes[1] = pipe
 
             if proc.stderr is not None:
-                _, pipe = yield from loop.connect_read_pipe(
+                _, pipe = await loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 2),
                     proc.stderr)
                 self._pipes[2] = pipe
@@ -224,8 +222,7 @@
                 waiter.set_result(returncode)
         self._exit_waiters = None
 
-    @coroutine
-    def _wait(self):
+    async def _wait(self):
         """Wait until the process exit and return the process return code.
 
         This method is a coroutine."""
@@ -234,7 +231,7 @@
 
         waiter = self._loop.create_future()
         self._exit_waiters.append(waiter)
-        return (yield from waiter)
+        return await waiter
 
     def _try_finish(self):
         assert not self._finished
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index e59d3d2..2cd6035 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -219,7 +219,7 @@
         """Stop serving.  This leaves existing connections open."""
         return NotImplemented
 
-    def wait_closed(self):
+    async def wait_closed(self):
         """Coroutine to wait until service is closed."""
         return NotImplemented
 
@@ -267,7 +267,7 @@
         """
         raise NotImplementedError
 
-    def shutdown_asyncgens(self):
+    async def shutdown_asyncgens(self):
         """Shutdown all active asynchronous generators."""
         raise NotImplementedError
 
@@ -302,7 +302,7 @@
     def call_soon_threadsafe(self, callback, *args):
         raise NotImplementedError
 
-    def run_in_executor(self, executor, func, *args):
+    async def run_in_executor(self, executor, func, *args):
         raise NotImplementedError
 
     def set_default_executor(self, executor):
@@ -310,21 +310,23 @@
 
     # Network I/O methods returning Futures.
 
-    def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
+    async def getaddrinfo(self, host, port, *,
+                          family=0, type=0, proto=0, flags=0):
         raise NotImplementedError
 
-    def getnameinfo(self, sockaddr, flags=0):
+    async def getnameinfo(self, sockaddr, flags=0):
         raise NotImplementedError
 
-    def create_connection(self, protocol_factory, host=None, port=None, *,
-                          ssl=None, family=0, proto=0, flags=0, sock=None,
-                          local_addr=None, server_hostname=None):
+    async def create_connection(self, protocol_factory, host=None, port=None,
+                                *, ssl=None, family=0, proto=0,
+                                flags=0, sock=None, local_addr=None,
+                                server_hostname=None):
         raise NotImplementedError
 
-    def create_server(self, protocol_factory, host=None, port=None, *,
-                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
-                      sock=None, backlog=100, ssl=None, reuse_address=None,
-                      reuse_port=None):
+    async def create_server(self, protocol_factory, host=None, port=None,
+                            *, family=socket.AF_UNSPEC,
+                            flags=socket.AI_PASSIVE, sock=None, backlog=100,
+                            ssl=None, reuse_address=None, reuse_port=None):
         """A coroutine which creates a TCP server bound to host and port.
 
         The return value is a Server object which can be used to stop
@@ -362,13 +364,13 @@
         """
         raise NotImplementedError
 
-    def create_unix_connection(self, protocol_factory, path=None, *,
-                               ssl=None, sock=None,
-                               server_hostname=None):
+    async def create_unix_connection(self, protocol_factory, path=None, *,
+                                     ssl=None, sock=None,
+                                     server_hostname=None):
         raise NotImplementedError
 
-    def create_unix_server(self, protocol_factory, path=None, *,
-                           sock=None, backlog=100, ssl=None):
+    async def create_unix_server(self, protocol_factory, path=None, *,
+                                 sock=None, backlog=100, ssl=None):
         """A coroutine which creates a UNIX Domain Socket server.
 
         The return value is a Server object, which can be used to stop
@@ -388,11 +390,11 @@
         """
         raise NotImplementedError
 
-    def create_datagram_endpoint(self, protocol_factory,
-                                 local_addr=None, remote_addr=None, *,
-                                 family=0, proto=0, flags=0,
-                                 reuse_address=None, reuse_port=None,
-                                 allow_broadcast=None, sock=None):
+    async def create_datagram_endpoint(self, protocol_factory,
+                                       local_addr=None, remote_addr=None, *,
+                                       family=0, proto=0, flags=0,
+                                       reuse_address=None, reuse_port=None,
+                                       allow_broadcast=None, sock=None):
         """A coroutine which creates a datagram endpoint.
 
         This method will try to establish the endpoint in the background.
@@ -425,7 +427,7 @@
 
     # Pipes and subprocesses.
 
-    def connect_read_pipe(self, protocol_factory, pipe):
+    async def connect_read_pipe(self, protocol_factory, pipe):
         """Register read pipe in event loop. Set the pipe to non-blocking mode.
 
         protocol_factory should instantiate object with Protocol interface.
@@ -438,7 +440,7 @@
         # close fd in pipe transport then close f and vise versa.
         raise NotImplementedError
 
-    def connect_write_pipe(self, protocol_factory, pipe):
+    async def connect_write_pipe(self, protocol_factory, pipe):
         """Register write pipe in event loop.
 
         protocol_factory should instantiate object with BaseProtocol interface.
@@ -451,14 +453,18 @@
         # close fd in pipe transport then close f and vise versa.
         raise NotImplementedError
 
-    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                         **kwargs):
+    async def subprocess_shell(self, protocol_factory, cmd, *,
+                               stdin=subprocess.PIPE,
+                               stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE,
+                               **kwargs):
         raise NotImplementedError
 
-    def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
-                        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                        **kwargs):
+    async def subprocess_exec(self, protocol_factory, *args,
+                              stdin=subprocess.PIPE,
+                              stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE,
+                              **kwargs):
         raise NotImplementedError
 
     # Ready-based callback registration methods.
@@ -480,19 +486,19 @@
 
     # Completion based I/O methods returning Futures.
 
-    def sock_recv(self, sock, nbytes):
+    async def sock_recv(self, sock, nbytes):
         raise NotImplementedError
 
-    def sock_recv_into(self, sock, buf):
+    async def sock_recv_into(self, sock, buf):
         raise NotImplementedError
 
-    def sock_sendall(self, sock, data):
+    async def sock_sendall(self, sock, data):
         raise NotImplementedError
 
-    def sock_connect(self, sock, address):
+    async def sock_connect(self, sock, address):
         raise NotImplementedError
 
-    def sock_accept(self, sock):
+    async def sock_accept(self, sock):
         raise NotImplementedError
 
     # Signal handling.
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index 750c435..aa6ed3e 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -66,20 +66,21 @@
         yield from self.acquire()
         return _ContextManager(self)
 
-    def __await__(self):
-        # To make "with await lock" work.
-        yield from self.acquire()
+    async def __acquire_ctx(self):
+        await self.acquire()
         return _ContextManager(self)
 
-    @coroutine
-    def __aenter__(self):
-        yield from self.acquire()
+    def __await__(self):
+        # To make "with await lock" work.
+        return self.__acquire_ctx().__await__()
+
+    async def __aenter__(self):
+        await self.acquire()
         # We have no use for the "as ..."  clause in the with
         # statement for locks.
         return None
 
-    @coroutine
-    def __aexit__(self, exc_type, exc, tb):
+    async def __aexit__(self, exc_type, exc, tb):
         self.release()
 
 
@@ -156,8 +157,7 @@
         """Return True if lock is acquired."""
         return self._locked
 
-    @coroutine
-    def acquire(self):
+    async def acquire(self):
         """Acquire a lock.
 
         This method blocks until the lock is unlocked, then sets it to
@@ -170,7 +170,7 @@
         fut = self._loop.create_future()
         self._waiters.append(fut)
         try:
-            yield from fut
+            await fut
             self._locked = True
             return True
         except futures.CancelledError:
@@ -251,8 +251,7 @@
         to true again."""
         self._value = False
 
-    @coroutine
-    def wait(self):
+    async def wait(self):
         """Block until the internal flag is true.
 
         If the internal flag is true on entry, return True
@@ -265,7 +264,7 @@
         fut = self._loop.create_future()
         self._waiters.append(fut)
         try:
-            yield from fut
+            await fut
             return True
         finally:
             self._waiters.remove(fut)
@@ -307,8 +306,7 @@
             extra = '{},waiters:{}'.format(extra, len(self._waiters))
         return '<{} [{}]>'.format(res[1:-1], extra)
 
-    @coroutine
-    def wait(self):
+    async def wait(self):
         """Wait until notified.
 
         If the calling coroutine has not acquired the lock when this
@@ -327,7 +325,7 @@
             fut = self._loop.create_future()
             self._waiters.append(fut)
             try:
-                yield from fut
+                await fut
                 return True
             finally:
                 self._waiters.remove(fut)
@@ -336,13 +334,12 @@
             # Must reacquire lock even if wait is cancelled
             while True:
                 try:
-                    yield from self.acquire()
+                    await self.acquire()
                     break
                 except futures.CancelledError:
                     pass
 
-    @coroutine
-    def wait_for(self, predicate):
+    async def wait_for(self, predicate):
         """Wait until a predicate becomes true.
 
         The predicate should be a callable which result will be
@@ -351,7 +348,7 @@
         """
         result = predicate()
         while not result:
-            yield from self.wait()
+            await self.wait()
             result = predicate()
         return result
 
@@ -432,8 +429,7 @@
         """Returns True if semaphore can not be acquired immediately."""
         return self._value == 0
 
-    @coroutine
-    def acquire(self):
+    async def acquire(self):
         """Acquire a semaphore.
 
         If the internal counter is larger than zero on entry,
@@ -446,7 +442,7 @@
             fut = self._loop.create_future()
             self._waiters.append(fut)
             try:
-                yield from fut
+                await fut
             except:
                 # See the similar code in Queue.get.
                 fut.cancel()
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index 4fc681d..10e694f 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -7,7 +7,6 @@
 
 from . import events
 from . import locks
-from .coroutines import coroutine
 
 
 class QueueEmpty(Exception):
@@ -28,7 +27,7 @@
     """A queue, useful for coordinating producer and consumer coroutines.
 
     If maxsize is less than or equal to zero, the queue size is infinite. If it
-    is an integer greater than 0, then "yield from put()" will block when the
+    is an integer greater than 0, then "await put()" will block when the
     queue reaches maxsize, until an item is removed by get().
 
     Unlike the standard library Queue, you can reliably know this Queue's size
@@ -116,20 +115,17 @@
         else:
             return self.qsize() >= self._maxsize
 
-    @coroutine
-    def put(self, item):
+    async def put(self, item):
         """Put an item into the queue.
 
         Put an item into the queue. If the queue is full, wait until a free
         slot is available before adding item.
-
-        This method is a coroutine.
         """
         while self.full():
             putter = self._loop.create_future()
             self._putters.append(putter)
             try:
-                yield from putter
+                await putter
             except:
                 putter.cancel()  # Just in case putter is not done yet.
                 if not self.full() and not putter.cancelled():
@@ -151,19 +147,16 @@
         self._finished.clear()
         self._wakeup_next(self._getters)
 
-    @coroutine
-    def get(self):
+    async def get(self):
         """Remove and return an item from the queue.
 
         If queue is empty, wait until an item is available.
-
-        This method is a coroutine.
         """
         while self.empty():
             getter = self._loop.create_future()
             self._getters.append(getter)
             try:
-                yield from getter
+                await getter
             except:
                 getter.cancel()  # Just in case getter is not done yet.
 
@@ -210,8 +203,7 @@
         if self._unfinished_tasks == 0:
             self._finished.set()
 
-    @coroutine
-    def join(self):
+    async def join(self):
         """Block until all items in the queue have been gotten and processed.
 
         The count of unfinished tasks goes up whenever an item is added to the
@@ -220,7 +212,7 @@
         When the count of unfinished tasks drops to zero, join() unblocks.
         """
         if self._unfinished_tasks > 0:
-            yield from self._finished.wait()
+            await self._finished.wait()
 
 
 class PriorityQueue(Queue):
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 3639466..c30fde7 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -24,7 +24,6 @@
 from . import futures
 from . import transports
 from . import sslproto
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -189,9 +188,8 @@
                                                   sslcontext, server)
                 self.create_task(accept)
 
-    @coroutine
-    def _accept_connection2(self, protocol_factory, conn, extra,
-                            sslcontext=None, server=None):
+    async def _accept_connection2(self, protocol_factory, conn, extra,
+                                  sslcontext=None, server=None):
         protocol = None
         transport = None
         try:
@@ -207,7 +205,7 @@
                     server=server)
 
             try:
-                yield from waiter
+                await waiter
             except:
                 transport.close()
                 raise
@@ -452,8 +450,7 @@
             fd = sock.fileno()
             self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
 
-    @coroutine
-    def sock_connect(self, sock, address):
+    async def sock_connect(self, sock, address):
         """Connect to a remote socket at address.
 
         This method is a coroutine.
@@ -465,12 +462,12 @@
             resolved = base_events._ensure_resolved(
                 address, family=sock.family, proto=sock.proto, loop=self)
             if not resolved.done():
-                yield from resolved
+                await resolved
             _, _, _, _, address = resolved.result()[0]
 
         fut = self.create_future()
         self._sock_connect(fut, sock, address)
-        return (yield from fut)
+        return await fut
 
     def _sock_connect(self, fut, sock, address):
         fd = sock.fileno()
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 15c9513..baa9ec9 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -14,8 +14,8 @@
 from . import coroutines
 from . import events
 from . import protocols
-from .coroutines import coroutine
 from .log import logger
+from .tasks import sleep
 
 
 _DEFAULT_LIMIT = 2 ** 16
@@ -52,9 +52,8 @@
         return type(self), (self.args[0], self.consumed)
 
 
-@coroutine
-def open_connection(host=None, port=None, *,
-                    loop=None, limit=_DEFAULT_LIMIT, **kwds):
+async def open_connection(host=None, port=None, *,
+                          loop=None, limit=_DEFAULT_LIMIT, **kwds):
     """A wrapper for create_connection() returning a (reader, writer) pair.
 
     The reader returned is a StreamReader instance; the writer is a
@@ -76,15 +75,14 @@
         loop = events.get_event_loop()
     reader = StreamReader(limit=limit, loop=loop)
     protocol = StreamReaderProtocol(reader, loop=loop)
-    transport, _ = yield from loop.create_connection(
+    transport, _ = await loop.create_connection(
         lambda: protocol, host, port, **kwds)
     writer = StreamWriter(transport, protocol, reader, loop)
     return reader, writer
 
 
-@coroutine
-def start_server(client_connected_cb, host=None, port=None, *,
-                 loop=None, limit=_DEFAULT_LIMIT, **kwds):
+async def start_server(client_connected_cb, host=None, port=None, *,
+                       loop=None, limit=_DEFAULT_LIMIT, **kwds):
     """Start a socket server, call back for each client connected.
 
     The first parameter, `client_connected_cb`, takes two parameters:
@@ -115,28 +113,26 @@
                                         loop=loop)
         return protocol
 
-    return (yield from loop.create_server(factory, host, port, **kwds))
+    return await loop.create_server(factory, host, port, **kwds)
 
 
 if hasattr(socket, 'AF_UNIX'):
     # UNIX Domain Sockets are supported on this platform
 
-    @coroutine
-    def open_unix_connection(path=None, *,
-                             loop=None, limit=_DEFAULT_LIMIT, **kwds):
+    async def open_unix_connection(path=None, *,
+                                   loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `open_connection` but works with UNIX Domain Sockets."""
         if loop is None:
             loop = events.get_event_loop()
         reader = StreamReader(limit=limit, loop=loop)
         protocol = StreamReaderProtocol(reader, loop=loop)
-        transport, _ = yield from loop.create_unix_connection(
+        transport, _ = await loop.create_unix_connection(
             lambda: protocol, path, **kwds)
         writer = StreamWriter(transport, protocol, reader, loop)
         return reader, writer
 
-    @coroutine
-    def start_unix_server(client_connected_cb, path=None, *,
-                          loop=None, limit=_DEFAULT_LIMIT, **kwds):
+    async def start_unix_server(client_connected_cb, path=None, *,
+                                loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `start_server` but works with UNIX Domain Sockets."""
         if loop is None:
             loop = events.get_event_loop()
@@ -147,7 +143,7 @@
                                             loop=loop)
             return protocol
 
-        return (yield from loop.create_unix_server(factory, path, **kwds))
+        return await loop.create_unix_server(factory, path, **kwds)
 
 
 class FlowControlMixin(protocols.Protocol):
@@ -203,8 +199,7 @@
         else:
             waiter.set_exception(exc)
 
-    @coroutine
-    def _drain_helper(self):
+    async def _drain_helper(self):
         if self._connection_lost:
             raise ConnectionResetError('Connection lost')
         if not self._paused:
@@ -213,7 +208,7 @@
         assert waiter is None or waiter.cancelled()
         waiter = self._loop.create_future()
         self._drain_waiter = waiter
-        yield from waiter
+        await waiter
 
 
 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
@@ -313,14 +308,13 @@
     def get_extra_info(self, name, default=None):
         return self._transport.get_extra_info(name, default)
 
-    @coroutine
-    def drain(self):
+    async def drain(self):
         """Flush the write buffer.
 
         The intended use is to write
 
           w.write(data)
-          yield from w.drain()
+          await w.drain()
         """
         if self._reader is not None:
             exc = self._reader.exception()
@@ -331,11 +325,11 @@
                 # Yield to the event loop so connection_lost() may be
                 # called.  Without this, _drain_helper() would return
                 # immediately, and code that calls
-                #     write(...); yield from drain()
+                #     write(...); await drain()
                 # in a loop would never call connection_lost(), so it
                 # would not see an error when the socket is closed.
-                yield
-        yield from self._protocol._drain_helper()
+                await sleep(0, loop=self._loop)
+        await self._protocol._drain_helper()
 
 
 class StreamReader:
@@ -436,8 +430,7 @@
             else:
                 self._paused = True
 
-    @coroutine
-    def _wait_for_data(self, func_name):
+    async def _wait_for_data(self, func_name):
         """Wait until feed_data() or feed_eof() is called.
 
         If stream was paused, automatically resume it.
@@ -460,12 +453,11 @@
 
         self._waiter = self._loop.create_future()
         try:
-            yield from self._waiter
+            await self._waiter
         finally:
             self._waiter = None
 
-    @coroutine
-    def readline(self):
+    async def readline(self):
         """Read chunk of data from the stream until newline (b'\n') is found.
 
         On success, return chunk that ends with newline. If only partial
@@ -484,7 +476,7 @@
         sep = b'\n'
         seplen = len(sep)
         try:
-            line = yield from self.readuntil(sep)
+            line = await self.readuntil(sep)
         except IncompleteReadError as e:
             return e.partial
         except LimitOverrunError as e:
@@ -496,8 +488,7 @@
             raise ValueError(e.args[0])
         return line
 
-    @coroutine
-    def readuntil(self, separator=b'\n'):
+    async def readuntil(self, separator=b'\n'):
         """Read data from the stream until ``separator`` is found.
 
         On success, the data and separator will be removed from the
@@ -577,7 +568,7 @@
                 raise IncompleteReadError(chunk, None)
 
             # _wait_for_data() will resume reading if stream was paused.
-            yield from self._wait_for_data('readuntil')
+            await self._wait_for_data('readuntil')
 
         if isep > self._limit:
             raise LimitOverrunError(
@@ -588,8 +579,7 @@
         self._maybe_resume_transport()
         return bytes(chunk)
 
-    @coroutine
-    def read(self, n=-1):
+    async def read(self, n=-1):
         """Read up to `n` bytes from the stream.
 
         If n is not provided, or set to -1, read until EOF and return all read
@@ -623,14 +613,14 @@
             # bytes.  So just call self.read(self._limit) until EOF.
             blocks = []
             while True:
-                block = yield from self.read(self._limit)
+                block = await self.read(self._limit)
                 if not block:
                     break
                 blocks.append(block)
             return b''.join(blocks)
 
         if not self._buffer and not self._eof:
-            yield from self._wait_for_data('read')
+            await self._wait_for_data('read')
 
         # This will work right even if buffer is less than n bytes
         data = bytes(self._buffer[:n])
@@ -639,8 +629,7 @@
         self._maybe_resume_transport()
         return data
 
-    @coroutine
-    def readexactly(self, n):
+    async def readexactly(self, n):
         """Read exactly `n` bytes.
 
         Raise an IncompleteReadError if EOF is reached before `n` bytes can be
@@ -670,7 +659,7 @@
                 self._buffer.clear()
                 raise IncompleteReadError(incomplete, n)
 
-            yield from self._wait_for_data('readexactly')
+            await self._wait_for_data('readexactly')
 
         if len(self._buffer) == n:
             data = bytes(self._buffer)
@@ -684,9 +673,8 @@
     def __aiter__(self):
         return self
 
-    @coroutine
-    def __anext__(self):
-        val = yield from self.readline()
+    async def __anext__(self):
+        val = await self.readline()
         if val == b'':
             raise StopAsyncIteration
         return val
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index 4c85466..dd3d10c 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -6,7 +6,6 @@
 from . import protocols
 from . import streams
 from . import tasks
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -121,12 +120,9 @@
     def returncode(self):
         return self._transport.get_returncode()
 
-    @coroutine
-    def wait(self):
-        """Wait until the process exit and return the process return code.
-
-        This method is a coroutine."""
-        return (yield from self._transport._wait())
+    async def wait(self):
+        """Wait until the process exit and return the process return code."""
+        return await self._transport._wait()
 
     def send_signal(self, signal):
         self._transport.send_signal(signal)
@@ -137,15 +133,14 @@
     def kill(self):
         self._transport.kill()
 
-    @coroutine
-    def _feed_stdin(self, input):
+    async def _feed_stdin(self, input):
         debug = self._loop.get_debug()
         self.stdin.write(input)
         if debug:
             logger.debug('%r communicate: feed stdin (%s bytes)',
                         self, len(input))
         try:
-            yield from self.stdin.drain()
+            await self.stdin.drain()
         except (BrokenPipeError, ConnectionResetError) as exc:
             # communicate() ignores BrokenPipeError and ConnectionResetError
             if debug:
@@ -155,12 +150,10 @@
             logger.debug('%r communicate: close stdin', self)
         self.stdin.close()
 
-    @coroutine
-    def _noop(self):
+    async def _noop(self):
         return None
 
-    @coroutine
-    def _read_stream(self, fd):
+    async def _read_stream(self, fd):
         transport = self._transport.get_pipe_transport(fd)
         if fd == 2:
             stream = self.stderr
@@ -170,15 +163,14 @@
         if self._loop.get_debug():
             name = 'stdout' if fd == 1 else 'stderr'
             logger.debug('%r communicate: read %s', self, name)
-        output = yield from stream.read()
+        output = await stream.read()
         if self._loop.get_debug():
             name = 'stdout' if fd == 1 else 'stderr'
             logger.debug('%r communicate: close %s', self, name)
         transport.close()
         return output
 
-    @coroutine
-    def communicate(self, input=None):
+    async def communicate(self, input=None):
         if input is not None:
             stdin = self._feed_stdin(input)
         else:
@@ -191,36 +183,36 @@
             stderr = self._read_stream(2)
         else:
             stderr = self._noop()
-        stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
-                                                        loop=self._loop)
-        yield from self.wait()
+        stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
+                                                   loop=self._loop)
+        await self.wait()
         return (stdout, stderr)
 
 
-@coroutine
-def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
-                            loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
+                                  loop=None, limit=streams._DEFAULT_LIMIT,
+                                  **kwds):
     if loop is None:
         loop = events.get_event_loop()
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
                                                         loop=loop)
-    transport, protocol = yield from loop.subprocess_shell(
-                                            protocol_factory,
-                                            cmd, stdin=stdin, stdout=stdout,
-                                            stderr=stderr, **kwds)
+    transport, protocol = await loop.subprocess_shell(
+        protocol_factory,
+        cmd, stdin=stdin, stdout=stdout,
+        stderr=stderr, **kwds)
     return Process(transport, protocol, loop)
 
-@coroutine
-def create_subprocess_exec(program, *args, stdin=None, stdout=None,
-                           stderr=None, loop=None,
-                           limit=streams._DEFAULT_LIMIT, **kwds):
+
+async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
+                                 stderr=None, loop=None,
+                                 limit=streams._DEFAULT_LIMIT, **kwds):
     if loop is None:
         loop = events.get_event_loop()
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
                                                         loop=loop)
-    transport, protocol = yield from loop.subprocess_exec(
-                                            protocol_factory,
-                                            program, *args,
-                                            stdin=stdin, stdout=stdout,
-                                            stderr=stderr, **kwds)
+    transport, protocol = await loop.subprocess_exec(
+        protocol_factory,
+        program, *args,
+        stdin=stdin, stdout=stdout,
+        stderr=stderr, **kwds)
     return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 5d744c3..c23d06a 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -9,6 +9,7 @@
 import concurrent.futures
 import functools
 import inspect
+import types
 import warnings
 import weakref
 
@@ -276,8 +277,7 @@
 ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
 
 
-@coroutine
-def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
     """Wait for the Futures and coroutines given by fs to complete.
 
     The sequence futures must not be empty.
@@ -288,7 +288,7 @@
 
     Usage:
 
-        done, pending = yield from asyncio.wait(fs)
+        done, pending = await asyncio.wait(fs)
 
     Note: This does not raise TimeoutError! Futures that aren't done
     when the timeout occurs are returned in the second set.
@@ -305,7 +305,7 @@
 
     fs = {ensure_future(f, loop=loop) for f in set(fs)}
 
-    return (yield from _wait(fs, timeout, return_when, loop))
+    return await _wait(fs, timeout, return_when, loop)
 
 
 def _release_waiter(waiter, *args):
@@ -313,8 +313,7 @@
         waiter.set_result(None)
 
 
-@coroutine
-def wait_for(fut, timeout, *, loop=None):
+async def wait_for(fut, timeout, *, loop=None):
     """Wait for the single Future or coroutine to complete, with timeout.
 
     Coroutine will be wrapped in Task.
@@ -331,7 +330,7 @@
         loop = events.get_event_loop()
 
     if timeout is None:
-        return (yield from fut)
+        return await fut
 
     if timeout <= 0:
         fut = ensure_future(fut, loop=loop)
@@ -352,7 +351,7 @@
     try:
         # wait until the future completes or the timeout
         try:
-            yield from waiter
+            await waiter
         except futures.CancelledError:
             fut.remove_done_callback(cb)
             fut.cancel()
@@ -368,8 +367,7 @@
         timeout_handle.cancel()
 
 
-@coroutine
-def _wait(fs, timeout, return_when, loop):
+async def _wait(fs, timeout, return_when, loop):
     """Internal helper for wait() and wait_for().
 
     The fs argument must be a collection of Futures.
@@ -397,7 +395,7 @@
         f.add_done_callback(_on_completion)
 
     try:
-        yield from waiter
+        await waiter
     finally:
         if timeout_handle is not None:
             timeout_handle.cancel()
@@ -423,10 +421,10 @@
     This differs from PEP 3148; the proper way to use this is:
 
         for f in as_completed(fs):
-            result = yield from f  # The 'yield from' may raise.
+            result = await f  # The 'await' may raise.
             # Use result.
 
-    If a timeout is specified, the 'yield from' will raise
+    If a timeout is specified, the 'await' will raise
     TimeoutError when the timeout occurs before all Futures are done.
 
     Note: The futures 'f' are not necessarily members of fs.
@@ -453,9 +451,8 @@
         if not todo and timeout_handle is not None:
             timeout_handle.cancel()
 
-    @coroutine
-    def _wait_for_one():
-        f = yield from done.get()
+    async def _wait_for_one():
+        f = await done.get()
         if f is None:
             # Dummy value from _on_timeout().
             raise futures.TimeoutError
@@ -469,11 +466,22 @@
         yield _wait_for_one()
 
 
-@coroutine
-def sleep(delay, result=None, *, loop=None):
+@types.coroutine
+def __sleep0():
+    """Skip one event loop run cycle.
+
+    This is a private helper for 'asyncio.sleep()', used
+    when the 'delay' is set to 0.  It uses a bare 'yield'
+    expression (which Task._step knows how to handle)
+    instead of creating a Future object.
+    """
+    yield
+
+
+async def sleep(delay, result=None, *, loop=None):
     """Coroutine that completes after a given time (in seconds)."""
     if delay == 0:
-        yield
+        await __sleep0()
         return result
 
     if loop is None:
@@ -483,7 +491,7 @@
                                 futures._set_result_unless_cancelled,
                                 future, result)
     try:
-        return (yield from future)
+        return await future
     finally:
         h.cancel()
 
@@ -652,11 +660,11 @@
 
     The statement
 
-        res = yield from shield(something())
+        res = await shield(something())
 
     is exactly equivalent to the statement
 
-        res = yield from something()
+        res = await something()
 
     *except* that if the coroutine containing it is cancelled, the
     task running in something() is not cancelled.  From the POV of
@@ -669,7 +677,7 @@
     you can combine shield() with a try/except clause, as follows:
 
         try:
-            res = yield from shield(something())
+            res = await shield(something())
         except CancelledError:
             res = None
     """
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index 32d3b0b..2319169 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -30,7 +30,6 @@
 from . import events
 from . import futures
 from . import tasks
-from .coroutines import coroutine
 from .log import logger
 from test import support
 
@@ -43,8 +42,7 @@
 
 
 def run_briefly(loop):
-    @coroutine
-    def once():
+    async def once():
         pass
     gen = once()
     t = loop.create_task(gen)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index ab818da..0308b02 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -20,7 +20,6 @@
 from . import futures
 from . import selector_events
 from . import transports
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -168,10 +167,9 @@
                                    extra=None):
         return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
 
-    @coroutine
-    def _make_subprocess_transport(self, protocol, args, shell,
-                                   stdin, stdout, stderr, bufsize,
-                                   extra=None, **kwargs):
+    async def _make_subprocess_transport(self, protocol, args, shell,
+                                         stdin, stdout, stderr, bufsize,
+                                         extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
             waiter = self.create_future()
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
@@ -182,29 +180,20 @@
             watcher.add_child_handler(transp.get_pid(),
                                       self._child_watcher_callback, transp)
             try:
-                yield from waiter
-            except Exception as exc:
-                # Workaround CPython bug #23353: using yield/yield-from in an
-                # except block of a generator doesn't clear properly
-                # sys.exc_info()
-                err = exc
-            else:
-                err = None
-
-            if err is not None:
+                await waiter
+            except Exception:
                 transp.close()
-                yield from transp._wait()
-                raise err
+                await transp._wait()
+                raise
 
         return transp
 
     def _child_watcher_callback(self, pid, returncode, transp):
         self.call_soon_threadsafe(transp._process_exited, returncode)
 
-    @coroutine
-    def create_unix_connection(self, protocol_factory, path=None, *,
-                               ssl=None, sock=None,
-                               server_hostname=None):
+    async def create_unix_connection(self, protocol_factory, path=None, *,
+                                     ssl=None, sock=None,
+                                     server_hostname=None):
         assert server_hostname is None or isinstance(server_hostname, str)
         if ssl:
             if server_hostname is None:
@@ -223,7 +212,7 @@
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
             try:
                 sock.setblocking(False)
-                yield from self.sock_connect(sock, path)
+                await self.sock_connect(sock, path)
             except:
                 sock.close()
                 raise
@@ -238,13 +227,12 @@
                     .format(sock))
             sock.setblocking(False)
 
-        transport, protocol = yield from self._create_connection_transport(
+        transport, protocol = await self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
         return transport, protocol
 
-    @coroutine
-    def create_unix_server(self, protocol_factory, path=None, *,
-                           sock=None, backlog=100, ssl=None):
+    async def create_unix_server(self, protocol_factory, path=None, *,
+                                 sock=None, backlog=100, ssl=None):
         if isinstance(ssl, bool):
             raise TypeError('ssl argument must be an SSLContext or None')
 
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index de41e64..95b12a1 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -15,7 +15,6 @@
 from . import selector_events
 from . import tasks
 from . import windows_utils
-from .coroutines import coroutine
 from .log import logger
 
 
@@ -305,17 +304,15 @@
             proactor = IocpProactor()
         super().__init__(proactor)
 
-    @coroutine
-    def create_pipe_connection(self, protocol_factory, address):
+    async def create_pipe_connection(self, protocol_factory, address):
         f = self._proactor.connect_pipe(address)
-        pipe = yield from f
+        pipe = await f
         protocol = protocol_factory()
         trans = self._make_duplex_pipe_transport(pipe, protocol,
                                                  extra={'addr': address})
         return trans, protocol
 
-    @coroutine
-    def start_serving_pipe(self, protocol_factory, address):
+    async def start_serving_pipe(self, protocol_factory, address):
         server = PipeServer(address)
 
         def loop_accept_pipe(f=None):
@@ -361,28 +358,20 @@
         self.call_soon(loop_accept_pipe)
         return [server]
 
-    @coroutine
-    def _make_subprocess_transport(self, protocol, args, shell,
-                                   stdin, stdout, stderr, bufsize,
-                                   extra=None, **kwargs):
+    async def _make_subprocess_transport(self, protocol, args, shell,
+                                         stdin, stdout, stderr, bufsize,
+                                         extra=None, **kwargs):
         waiter = self.create_future()
         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
                                              stdin, stdout, stderr, bufsize,
                                              waiter=waiter, extra=extra,
                                              **kwargs)
         try:
-            yield from waiter
-        except Exception as exc:
-            # Workaround CPython bug #23353: using yield/yield-from in an
-            # except block of a generator doesn't clear properly sys.exc_info()
-            err = exc
-        else:
-            err = None
-
-        if err is not None:
+            await waiter
+        except Exception:
             transp.close()
-            yield from transp._wait()
-            raise err
+            await transp._wait()
+            raise
 
         return transp
 
@@ -498,11 +487,10 @@
             conn.settimeout(listener.gettimeout())
             return conn, conn.getpeername()
 
-        @coroutine
-        def accept_coro(future, conn):
+        async def accept_coro(future, conn):
             # Coroutine closing the accept socket if the future is cancelled
             try:
-                yield from future
+                await future
             except futures.CancelledError:
                 conn.close()
                 raise
@@ -552,8 +540,7 @@
 
         return self._register(ov, pipe, finish_accept_pipe)
 
-    @coroutine
-    def connect_pipe(self, address):
+    async def connect_pipe(self, address):
         delay = CONNECT_PIPE_INIT_DELAY
         while True:
             # Unfortunately there is no way to do an overlapped connect to a pipe.
@@ -568,7 +555,7 @@
 
             # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
             delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
-            yield from tasks.sleep(delay, loop=self._loop)
+            await tasks.sleep(delay, loop=self._loop)
 
         return windows_utils.PipeHandle(handle)