feat: third batch of AsyncIO integration (#29)

* LRO client
* gRPC wrappers & helpers
* With unit tests & docs
diff --git a/google/api_core/gapic_v1/__init__.py b/google/api_core/gapic_v1/__init__.py
index e47d2cb..ed95da1 100644
--- a/google/api_core/gapic_v1/__init__.py
+++ b/google/api_core/gapic_v1/__init__.py
@@ -23,4 +23,6 @@
 
 if sys.version_info >= (3, 6):
     from google.api_core.gapic_v1 import config_async  # noqa: F401
+    from google.api_core.gapic_v1 import method_async  # noqa: F401
     __all__.append("config_async")
+    __all__.append("method_async")
diff --git a/google/api_core/gapic_v1/method_async.py b/google/api_core/gapic_v1/method_async.py
new file mode 100644
index 0000000..5210b2b
--- /dev/null
+++ b/google/api_core/gapic_v1/method_async.py
@@ -0,0 +1,45 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""AsyncIO helpers for wrapping gRPC methods with common functionality.
+
+This is used by gapic clients to provide common error mapping, retry, timeout,
+pagination, and long-running operations to gRPC methods.
+"""
+
+from google.api_core import general_helpers, grpc_helpers_async
+from google.api_core.gapic_v1 import client_info
+from google.api_core.gapic_v1.method import (_GapicCallable,  # noqa: F401
+                                             DEFAULT,
+                                             USE_DEFAULT_METADATA)
+
+
+def wrap_method(
+        func,
+        default_retry=None,
+        default_timeout=None,
+        client_info=client_info.DEFAULT_CLIENT_INFO,
+):
+    """Wrap an async RPC method with common behavior.
+
+    Returns:
+        Callable: A new callable that takes optional ``retry`` and ``timeout``
+            arguments and applies the common error mapping, retry, timeout,
+            and metadata behavior to the low-level RPC method.
+    """
+    func = grpc_helpers_async.wrap_errors(func)
+
+    metadata = [client_info.to_grpc_metadata()] if client_info is not None else None
+
+    return general_helpers.wraps(func)(_GapicCallable(
+        func, default_retry, default_timeout, metadata=metadata))
diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py
index c47b09f..fde6c33 100644
--- a/google/api_core/grpc_helpers.py
+++ b/google/api_core/grpc_helpers.py
@@ -170,13 +170,10 @@
         return _wrap_unary_errors(callable_)
 
 
-def create_channel(
-    target, credentials=None, scopes=None, ssl_credentials=None, **kwargs
-):
-    """Create a secure channel with credentials.
+def _create_composite_credentials(credentials=None, scopes=None, ssl_credentials=None):
+    """Create the composite credentials for secure channels.
 
     Args:
-        target (str): The target service address in the format 'hostname:port'.
         credentials (google.auth.credentials.Credentials): The credentials. If
             not specified, then this function will attempt to ascertain the
             credentials from the environment using :func:`google.auth.default`.
@@ -185,11 +182,9 @@
             are passed to :func:`google.auth.default`.
         ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
             credentials. This can be used to specify different certificates.
-        kwargs: Additional key-word args passed to
-            :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
 
     Returns:
-        grpc.Channel: The created channel.
+        grpc.ChannelCredentials: The composed channel credentials object.
     """
     if credentials is None:
         credentials, _ = google.auth.default(scopes=scopes)
@@ -212,10 +207,34 @@
         ssl_credentials = grpc.ssl_channel_credentials()
 
     # Combine the ssl credentials and the authorization credentials.
-    composite_credentials = grpc.composite_channel_credentials(
+    return grpc.composite_channel_credentials(
         ssl_credentials, google_auth_credentials
     )
 
+
+def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs):
+    """Create a secure channel with credentials.
+
+    Args:
+        target (str): The target service address in the format 'hostname:port'.
+        credentials (google.auth.credentials.Credentials): The credentials. If
+            not specified, then this function will attempt to ascertain the
+            credentials from the environment using :func:`google.auth.default`.
+        scopes (Sequence[str]): A optional list of scopes needed for this
+            service. These are only used when credentials are not specified and
+            are passed to :func:`google.auth.default`.
+        ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
+            credentials. This can be used to specify different certificates.
+        kwargs: Additional key-word args passed to
+            :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
+
+    Returns:
+        grpc.Channel: The created channel.
+    """
+    composite_credentials = _create_composite_credentials(
+        credentials, scopes, ssl_credentials
+    )
+
     if HAS_GRPC_GCP:
         # If grpc_gcp module is available use grpc_gcp.secure_channel,
         # otherwise, use grpc.secure_channel to create grpc channel.
diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py
new file mode 100644
index 0000000..9ded803
--- /dev/null
+++ b/google/api_core/grpc_helpers_async.py
@@ -0,0 +1,270 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""AsyncIO helpers for :mod:`grpc` supporting 3.6+.
+
+Please combine more detailed docstring in grpc_helpers.py to use following
+functions. This module is implementing the same surface with AsyncIO semantics.
+"""
+
+import asyncio
+import functools
+
+import grpc
+from grpc.experimental import aio
+
+from google.api_core import exceptions, grpc_helpers
+
+
+# TODO(lidiz) Support gRPC GCP wrapper
+HAS_GRPC_GCP = False
+
+# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
+# automatic patching for us. But that means the overhead of creating an
+# extra Python function spreads to every single send and receive.
+
+
+class _WrappedCall(aio.Call):
+
+    def __init__(self):
+        self._call = None
+
+    def with_call(self, call):
+        """Supplies the call object separately to keep __init__ clean."""
+        self._call = call
+        return self
+
+    async def initial_metadata(self):
+        return await self._call.initial_metadata()
+
+    async def trailing_metadata(self):
+        return await self._call.trailing_metadata()
+
+    async def code(self):
+        return await self._call.code()
+
+    async def details(self):
+        return await self._call.details()
+
+    def cancelled(self):
+        return self._call.cancelled()
+
+    def done(self):
+        return self._call.done()
+
+    def time_remaining(self):
+        return self._call.time_remaining()
+
+    def cancel(self):
+        return self._call.cancel()
+
+    def add_done_callback(self, callback):
+        self._call.add_done_callback(callback)
+
+    async def wait_for_connection(self):
+        try:
+            await self._call.wait_for_connection()
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+
+class _WrappedUnaryResponseMixin(_WrappedCall):
+
+    def __await__(self):
+        try:
+            response = yield from self._call.__await__()
+            return response
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+
+class _WrappedStreamResponseMixin(_WrappedCall):
+
+    def __init__(self):
+        self._wrapped_async_generator = None
+
+    async def read(self):
+        try:
+            return await self._call.read()
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+    async def _wrapped_aiter(self):
+        try:
+            # NOTE(lidiz) coverage doesn't understand the exception raised from
+            # __anext__ method. It is covered by test case:
+            #     test_wrap_stream_errors_aiter_non_rpc_error
+            async for response in self._call:  # pragma: no branch
+                yield response
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+    def __aiter__(self):
+        if not self._wrapped_async_generator:
+            self._wrapped_async_generator = self._wrapped_aiter()
+        return self._wrapped_async_generator
+
+
+class _WrappedStreamRequestMixin(_WrappedCall):
+
+    async def write(self, request):
+        try:
+            await self._call.write(request)
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+    async def done_writing(self):
+        try:
+            await self._call.done_writing()
+        except grpc.RpcError as rpc_error:
+            raise exceptions.from_grpc_error(rpc_error) from rpc_error
+
+
+# NOTE(lidiz) Implementing each individual class separately, so we don't
+# expose any API that should not be seen. E.g., __aiter__ in unary-unary
+# RPC, or __await__ in stream-stream RPC.
+class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin, aio.UnaryUnaryCall):
+    """Wrapped UnaryUnaryCall to map exceptions."""
+
+
+class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin, aio.UnaryStreamCall):
+    """Wrapped UnaryStreamCall to map exceptions."""
+
+
+class _WrappedStreamUnaryCall(_WrappedUnaryResponseMixin, _WrappedStreamRequestMixin, aio.StreamUnaryCall):
+    """Wrapped StreamUnaryCall to map exceptions."""
+
+
+class _WrappedStreamStreamCall(_WrappedStreamRequestMixin, _WrappedStreamResponseMixin, aio.StreamStreamCall):
+    """Wrapped StreamStreamCall to map exceptions."""
+
+
+def _wrap_unary_errors(callable_):
+    """Map errors for Unary-Unary async callables."""
+    grpc_helpers._patch_callable_name(callable_)
+
+    @functools.wraps(callable_)
+    def error_remapped_callable(*args, **kwargs):
+        call = callable_(*args, **kwargs)
+        return _WrappedUnaryUnaryCall().with_call(call)
+
+    return error_remapped_callable
+
+
+def _wrap_stream_errors(callable_):
+    """Map errors for streaming RPC async callables."""
+    grpc_helpers._patch_callable_name(callable_)
+
+    @functools.wraps(callable_)
+    async def error_remapped_callable(*args, **kwargs):
+        call = callable_(*args, **kwargs)
+
+        if isinstance(call, aio.UnaryStreamCall):
+            call = _WrappedUnaryStreamCall().with_call(call)
+        elif isinstance(call, aio.StreamUnaryCall):
+            call = _WrappedStreamUnaryCall().with_call(call)
+        elif isinstance(call, aio.StreamStreamCall):
+            call = _WrappedStreamStreamCall().with_call(call)
+        else:
+            raise TypeError('Unexpected type of call %s' % type(call))
+
+        await call.wait_for_connection()
+        return call
+
+    return error_remapped_callable
+
+
+def wrap_errors(callable_):
+    """Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to
+    friendly error classes.
+
+    Errors raised by the gRPC callable are mapped to the appropriate
+    :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The
+    original `grpc.RpcError` (which is usually also a `grpc.Call`) is
+    available from the ``response`` property on the mapped exception. This
+    is useful for extracting metadata from the original error.
+
+    Args:
+        callable_ (Callable): A gRPC callable.
+
+    Returns: Callable: The wrapped gRPC callable.
+    """
+    if isinstance(callable_, aio.UnaryUnaryMultiCallable):
+        return _wrap_unary_errors(callable_)
+    else:
+        return _wrap_stream_errors(callable_)
+
+
+def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs):
+    """Create an AsyncIO secure channel with credentials.
+
+    Args:
+        target (str): The target service address in the format 'hostname:port'.
+        credentials (google.auth.credentials.Credentials): The credentials. If
+            not specified, then this function will attempt to ascertain the
+            credentials from the environment using :func:`google.auth.default`.
+        scopes (Sequence[str]): A optional list of scopes needed for this
+            service. These are only used when credentials are not specified and
+            are passed to :func:`google.auth.default`.
+        ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
+            credentials. This can be used to specify different certificates.
+        kwargs: Additional key-word args passed to :func:`aio.secure_channel`.
+
+    Returns:
+        aio.Channel: The created channel.
+    """
+    composite_credentials = grpc_helpers._create_composite_credentials(
+        credentials, scopes, ssl_credentials
+    )
+
+    return aio.secure_channel(target, composite_credentials, **kwargs)
+
+
+class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
+    """Fake implementation for unary-unary RPCs.
+
+    It is a dummy object for response message. Supply the intended response
+    upon the initialization, and the coroutine will return the exact response
+    message.
+    """
+
+    def __init__(self, response=object()):
+        self.response = response
+        self._future = asyncio.get_event_loop().create_future()
+        self._future.set_result(self.response)
+
+    def __await__(self):
+        response = yield from self._future.__await__()
+        return response
+
+
+class FakeStreamUnaryCall(_WrappedStreamUnaryCall):
+    """Fake implementation for stream-unary RPCs.
+
+    It is a dummy object for response message. Supply the intended response
+    upon the initialization, and the coroutine will return the exact response
+    message.
+    """
+
+    def __init__(self, response=object()):
+        self.response = response
+        self._future = asyncio.get_event_loop().create_future()
+        self._future.set_result(self.response)
+
+    def __await__(self):
+        response = yield from self._future.__await__()
+        return response
+
+    async def wait_for_connection(self):
+        pass
diff --git a/google/api_core/operations_v1/__init__.py b/google/api_core/operations_v1/__init__.py
index f054956..bc9befc 100644
--- a/google/api_core/operations_v1/__init__.py
+++ b/google/api_core/operations_v1/__init__.py
@@ -14,6 +14,11 @@
 
 """Package for interacting with the google.longrunning.operations meta-API."""
 
+import sys
+
 from google.api_core.operations_v1.operations_client import OperationsClient
 
 __all__ = ["OperationsClient"]
+if sys.version_info >= (3, 6, 0):
+    from google.api_core.operations_v1.operations_async_client import OperationsAsyncClient  # noqa: F401
+    __all__.append("OperationsAsyncClient")
diff --git a/google/api_core/operations_v1/operations_async_client.py b/google/api_core/operations_v1/operations_async_client.py
new file mode 100644
index 0000000..039bec1
--- /dev/null
+++ b/google/api_core/operations_v1/operations_async_client.py
@@ -0,0 +1,274 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An async client for the google.longrunning.operations meta-API.
+
+.. _Google API Style Guide:
+    https://cloud.google.com/apis/design/design_pattern
+    s#long_running_operations
+.. _google/longrunning/operations.proto:
+    https://github.com/googleapis/googleapis/blob/master/google/longrunning
+    /operations.proto
+"""
+
+import functools
+
+from google.api_core import gapic_v1, page_iterator_async
+from google.api_core.operations_v1 import operations_client_config
+from google.longrunning import operations_pb2
+
+
+class OperationsAsyncClient:
+    """Async client for interacting with long-running operations.
+
+    Args:
+        channel (aio.Channel): The gRPC AsyncIO channel associated with the
+            service that implements the ``google.longrunning.operations``
+            interface.
+        client_config (dict):
+            A dictionary of call options for each method. If not specified
+            the default configuration is used.
+    """
+
+    def __init__(self, channel, client_config=operations_client_config.config):
+        # Create the gRPC client stub with gRPC AsyncIO channel.
+        self.operations_stub = operations_pb2.OperationsStub(channel)
+
+        # Create all wrapped methods using the interface configuration.
+        # The interface config contains all of the default settings for retry
+        # and timeout for each RPC method.
+        interfaces = client_config["interfaces"]
+        interface_config = interfaces["google.longrunning.Operations"]
+        method_configs = gapic_v1.config_async.parse_method_configs(interface_config)
+
+        self._get_operation = gapic_v1.method_async.wrap_method(
+            self.operations_stub.GetOperation,
+            default_retry=method_configs["GetOperation"].retry,
+            default_timeout=method_configs["GetOperation"].timeout,
+        )
+
+        self._list_operations = gapic_v1.method_async.wrap_method(
+            self.operations_stub.ListOperations,
+            default_retry=method_configs["ListOperations"].retry,
+            default_timeout=method_configs["ListOperations"].timeout,
+        )
+
+        self._cancel_operation = gapic_v1.method_async.wrap_method(
+            self.operations_stub.CancelOperation,
+            default_retry=method_configs["CancelOperation"].retry,
+            default_timeout=method_configs["CancelOperation"].timeout,
+        )
+
+        self._delete_operation = gapic_v1.method_async.wrap_method(
+            self.operations_stub.DeleteOperation,
+            default_retry=method_configs["DeleteOperation"].retry,
+            default_timeout=method_configs["DeleteOperation"].timeout,
+        )
+
+    async def get_operation(
+        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+    ):
+        """Gets the latest state of a long-running operation.
+
+        Clients can use this method to poll the operation result at intervals
+        as recommended by the API service.
+
+        Example:
+            >>> from google.api_core import operations_v1
+            >>> api = operations_v1.OperationsClient()
+            >>> name = ''
+            >>> response = await api.get_operation(name)
+
+        Args:
+            name (str): The name of the operation resource.
+            retry (google.api_core.retry.Retry): The retry strategy to use
+                when invoking the RPC. If unspecified, the default retry from
+                the client configuration will be used. If ``None``, then this
+                method will not retry the RPC at all.
+            timeout (float): The amount of time in seconds to wait for the RPC
+                to complete. Note that if ``retry`` is used, this timeout
+                applies to each individual attempt and the overall time it
+                takes for this method to complete may be longer. If
+                unspecified, the the default timeout in the client
+                configuration is used. If ``None``, then the RPC method will
+                not time out.
+
+        Returns:
+            google.longrunning.operations_pb2.Operation: The state of the
+                operation.
+
+        Raises:
+            google.api_core.exceptions.GoogleAPICallError: If an error occurred
+                while invoking the RPC, the appropriate ``GoogleAPICallError``
+                subclass will be raised.
+        """
+        request = operations_pb2.GetOperationRequest(name=name)
+        return await self._get_operation(request, retry=retry, timeout=timeout)
+
+    async def list_operations(
+        self,
+        name,
+        filter_,
+        retry=gapic_v1.method_async.DEFAULT,
+        timeout=gapic_v1.method_async.DEFAULT,
+    ):
+        """
+        Lists operations that match the specified filter in the request.
+
+        Example:
+            >>> from google.api_core import operations_v1
+            >>> api = operations_v1.OperationsClient()
+            >>> name = ''
+            >>>
+            >>> # Iterate over all results
+            >>> for operation in await api.list_operations(name):
+            >>>   # process operation
+            >>>   pass
+            >>>
+            >>> # Or iterate over results one page at a time
+            >>> iter = await api.list_operations(name)
+            >>> for page in iter.pages:
+            >>>   for operation in page:
+            >>>     # process operation
+            >>>     pass
+
+        Args:
+            name (str): The name of the operation collection.
+            filter_ (str): The standard list filter.
+            retry (google.api_core.retry.Retry): The retry strategy to use
+                when invoking the RPC. If unspecified, the default retry from
+                the client configuration will be used. If ``None``, then this
+                method will not retry the RPC at all.
+            timeout (float): The amount of time in seconds to wait for the RPC
+                to complete. Note that if ``retry`` is used, this timeout
+                applies to each individual attempt and the overall time it
+                takes for this method to complete may be longer. If
+                unspecified, the the default timeout in the client
+                configuration is used. If ``None``, then the RPC method will
+                not time out.
+
+        Returns:
+            google.api_core.page_iterator.Iterator: An iterator that yields
+                :class:`google.longrunning.operations_pb2.Operation` instances.
+
+        Raises:
+            google.api_core.exceptions.MethodNotImplemented: If the server
+                does not support this method. Services are not required to
+                implement this method.
+            google.api_core.exceptions.GoogleAPICallError: If an error occurred
+                while invoking the RPC, the appropriate ``GoogleAPICallError``
+                subclass will be raised.
+        """
+        # Create the request object.
+        request = operations_pb2.ListOperationsRequest(name=name, filter=filter_)
+
+        # Create the method used to fetch pages
+        method = functools.partial(self._list_operations, retry=retry, timeout=timeout)
+
+        iterator = page_iterator_async.AsyncGRPCIterator(
+            client=None,
+            method=method,
+            request=request,
+            items_field="operations",
+            request_token_field="page_token",
+            response_token_field="next_page_token",
+        )
+
+        return iterator
+
+    async def cancel_operation(
+        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+    ):
+        """Starts asynchronous cancellation on a long-running operation.
+
+        The server makes a best effort to cancel the operation, but success is
+        not guaranteed. Clients can use :meth:`get_operation` or service-
+        specific methods to check whether the cancellation succeeded or whether
+        the operation completed despite cancellation. On successful
+        cancellation, the operation is not deleted; instead, it becomes an
+        operation with an ``Operation.error`` value with a
+        ``google.rpc.Status.code`` of ``1``, corresponding to
+        ``Code.CANCELLED``.
+
+        Example:
+            >>> from google.api_core import operations_v1
+            >>> api = operations_v1.OperationsClient()
+            >>> name = ''
+            >>> api.cancel_operation(name)
+
+        Args:
+            name (str): The name of the operation resource to be cancelled.
+            retry (google.api_core.retry.Retry): The retry strategy to use
+                when invoking the RPC. If unspecified, the default retry from
+                the client configuration will be used. If ``None``, then this
+                method will not retry the RPC at all.
+            timeout (float): The amount of time in seconds to wait for the RPC
+                to complete. Note that if ``retry`` is used, this timeout
+                applies to each individual attempt and the overall time it
+                takes for this method to complete may be longer. If
+                unspecified, the the default timeout in the client
+                configuration is used. If ``None``, then the RPC method will
+                not time out.
+
+        Raises:
+            google.api_core.exceptions.MethodNotImplemented: If the server
+                does not support this method. Services are not required to
+                implement this method.
+            google.api_core.exceptions.GoogleAPICallError: If an error occurred
+                while invoking the RPC, the appropriate ``GoogleAPICallError``
+                subclass will be raised.
+        """
+        # Create the request object.
+        request = operations_pb2.CancelOperationRequest(name=name)
+        await self._cancel_operation(request, retry=retry, timeout=timeout)
+
+    async def delete_operation(
+        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+    ):
+        """Deletes a long-running operation.
+
+        This method indicates that the client is no longer interested in the
+        operation result. It does not cancel the operation.
+
+        Example:
+            >>> from google.api_core import operations_v1
+            >>> api = operations_v1.OperationsClient()
+            >>> name = ''
+            >>> api.delete_operation(name)
+
+        Args:
+            name (str): The name of the operation resource to be deleted.
+            retry (google.api_core.retry.Retry): The retry strategy to use
+                when invoking the RPC. If unspecified, the default retry from
+                the client configuration will be used. If ``None``, then this
+                method will not retry the RPC at all.
+            timeout (float): The amount of time in seconds to wait for the RPC
+                to complete. Note that if ``retry`` is used, this timeout
+                applies to each individual attempt and the overall time it
+                takes for this method to complete may be longer. If
+                unspecified, the the default timeout in the client
+                configuration is used. If ``None``, then the RPC method will
+                not time out.
+
+        Raises:
+            google.api_core.exceptions.MethodNotImplemented: If the server
+                does not support this method. Services are not required to
+                implement this method.
+            google.api_core.exceptions.GoogleAPICallError: If an error occurred
+                while invoking the RPC, the appropriate ``GoogleAPICallError``
+                subclass will be raised.
+        """
+        # Create the request object.
+        request = operations_pb2.DeleteOperationRequest(name=name)
+        await self._delete_operation(request, retry=retry, timeout=timeout)
diff --git a/tests/asyncio/gapic/test_method_async.py b/tests/asyncio/gapic/test_method_async.py
new file mode 100644
index 0000000..7318362
--- /dev/null
+++ b/tests/asyncio/gapic/test_method_async.py
@@ -0,0 +1,243 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+
+from grpc.experimental import aio
+import mock
+import pytest
+
+from google.api_core import (exceptions, gapic_v1, grpc_helpers_async,
+                             retry_async, timeout)
+
+
+def _utcnow_monotonic():
+    current_time = datetime.datetime.min
+    delta = datetime.timedelta(seconds=0.5)
+    while True:
+        yield current_time
+        current_time += delta
+
+
+@pytest.mark.asyncio
+async def test_wrap_method_basic():
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+
+    wrapped_method = gapic_v1.method_async.wrap_method(method)
+
+    result = await wrapped_method(1, 2, meep="moop")
+
+    assert result == 42
+    method.assert_called_once_with(1, 2, meep="moop", metadata=mock.ANY)
+
+    # Check that the default client info was specified in the metadata.
+    metadata = method.call_args[1]["metadata"]
+    assert len(metadata) == 1
+    client_info = gapic_v1.client_info.DEFAULT_CLIENT_INFO
+    user_agent_metadata = client_info.to_grpc_metadata()
+    assert user_agent_metadata in metadata
+
+
+@pytest.mark.asyncio
+async def test_wrap_method_with_no_client_info():
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall()
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, client_info=None
+    )
+
+    await wrapped_method(1, 2, meep="moop")
+
+    method.assert_called_once_with(1, 2, meep="moop")
+
+
+@pytest.mark.asyncio
+async def test_wrap_method_with_custom_client_info():
+    client_info = gapic_v1.client_info.ClientInfo(
+        python_version=1,
+        grpc_version=2,
+        api_core_version=3,
+        gapic_version=4,
+        client_library_version=5,
+    )
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall()
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, client_info=client_info
+    )
+
+    await wrapped_method(1, 2, meep="moop")
+
+    method.assert_called_once_with(1, 2, meep="moop", metadata=mock.ANY)
+
+    # Check that the custom client info was specified in the metadata.
+    metadata = method.call_args[1]["metadata"]
+    assert client_info.to_grpc_metadata() in metadata
+
+
+@pytest.mark.asyncio
+async def test_invoke_wrapped_method_with_metadata():
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall()
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+
+    wrapped_method = gapic_v1.method_async.wrap_method(method)
+
+    await wrapped_method(mock.sentinel.request, metadata=[("a", "b")])
+
+    method.assert_called_once_with(mock.sentinel.request, metadata=mock.ANY)
+    metadata = method.call_args[1]["metadata"]
+    # Metadata should have two items: the client info metadata and our custom
+    # metadata.
+    assert len(metadata) == 2
+    assert ("a", "b") in metadata
+
+
+@pytest.mark.asyncio
+async def test_invoke_wrapped_method_with_metadata_as_none():
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall()
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+
+    wrapped_method = gapic_v1.method_async.wrap_method(method)
+
+    await wrapped_method(mock.sentinel.request, metadata=None)
+
+    method.assert_called_once_with(mock.sentinel.request, metadata=mock.ANY)
+    metadata = method.call_args[1]["metadata"]
+    # Metadata should have just one items: the client info metadata.
+    assert len(metadata) == 1
+
+
+@mock.patch("asyncio.sleep")
+@pytest.mark.asyncio
+async def test_wrap_method_with_default_retry_and_timeout(unused_sleep):
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[
+        exceptions.InternalServerError(None),
+        fake_call,
+    ])
+
+    default_retry = retry_async.AsyncRetry()
+    default_timeout = timeout.ConstantTimeout(60)
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, default_retry, default_timeout
+    )
+
+    result = await wrapped_method()
+
+    assert result == 42
+    assert method.call_count == 2
+    method.assert_called_with(timeout=60, metadata=mock.ANY)
+
+
+@mock.patch("asyncio.sleep")
+@pytest.mark.asyncio
+async def test_wrap_method_with_default_retry_and_timeout_using_sentinel(unused_sleep):
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[
+        exceptions.InternalServerError(None),
+        fake_call,
+    ])
+
+    default_retry = retry_async.AsyncRetry()
+    default_timeout = timeout.ConstantTimeout(60)
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, default_retry, default_timeout
+    )
+
+    result = await wrapped_method(
+        retry=gapic_v1.method_async.DEFAULT,
+        timeout=gapic_v1.method_async.DEFAULT,
+    )
+
+    assert result == 42
+    assert method.call_count == 2
+    method.assert_called_with(timeout=60, metadata=mock.ANY)
+
+
+@mock.patch("asyncio.sleep")
+@pytest.mark.asyncio
+async def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep):
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[
+        exceptions.NotFound(None),
+        fake_call,
+    ])
+
+    default_retry = retry_async.AsyncRetry()
+    default_timeout = timeout.ConstantTimeout(60)
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, default_retry, default_timeout
+    )
+
+    result = await wrapped_method(
+        retry=retry_async.AsyncRetry(retry_async.if_exception_type(exceptions.NotFound)),
+        timeout=timeout.ConstantTimeout(22),
+    )
+
+    assert result == 42
+    assert method.call_count == 2
+    method.assert_called_with(timeout=22, metadata=mock.ANY)
+
+
+@mock.patch("asyncio.sleep")
+@mock.patch(
+    "google.api_core.datetime_helpers.utcnow",
+    side_effect=_utcnow_monotonic(),
+    autospec=True,
+)
+@pytest.mark.asyncio
+async def test_wrap_method_with_overriding_retry_deadline(utcnow, unused_sleep):
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(
+        spec=aio.UnaryUnaryMultiCallable,
+        side_effect=([exceptions.InternalServerError(None)] * 4) + [fake_call])
+
+    default_retry = retry_async.AsyncRetry()
+    default_timeout = timeout.ExponentialTimeout(deadline=60)
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, default_retry, default_timeout
+    )
+
+    # Overriding only the retry's deadline should also override the timeout's
+    # deadline.
+    result = await wrapped_method(retry=default_retry.with_deadline(30))
+
+    assert result == 42
+    timeout_args = [call[1]["timeout"] for call in method.call_args_list]
+    assert timeout_args == [5.0, 10.0, 20.0, 26.0, 25.0]
+    assert utcnow.call_count == (
+        1
+        + 1  # Compute wait_for timeout in retry_async
+        + 5  # First to set the deadline.
+        + 5  # One for each min(timeout, maximum, (DEADLINE - NOW).seconds)
+    )
+
+
+@pytest.mark.asyncio
+async def test_wrap_method_with_overriding_timeout_as_a_number():
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+    default_retry = retry_async.AsyncRetry()
+    default_timeout = timeout.ConstantTimeout(60)
+    wrapped_method = gapic_v1.method_async.wrap_method(
+        method, default_retry, default_timeout
+    )
+
+    result = await wrapped_method(timeout=22)
+
+    assert result == 42
+    method.assert_called_once_with(timeout=22, metadata=mock.ANY)
diff --git a/tests/asyncio/operations_v1/__init__.py b/tests/asyncio/operations_v1/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/asyncio/operations_v1/__init__.py
diff --git a/tests/asyncio/operations_v1/test_operations_async_client.py b/tests/asyncio/operations_v1/test_operations_async_client.py
new file mode 100644
index 0000000..0f9363f
--- /dev/null
+++ b/tests/asyncio/operations_v1/test_operations_async_client.py
@@ -0,0 +1,93 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from grpc.experimental import aio
+import mock
+import pytest
+
+from google.api_core import (grpc_helpers_async, operations_v1,
+                             page_iterator_async)
+from google.longrunning import operations_pb2
+from google.protobuf import empty_pb2
+
+
+def _mock_grpc_objects(response):
+    fake_call = grpc_helpers_async.FakeUnaryUnaryCall(response)
+    method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call)
+    mocked_channel = mock.Mock()
+    mocked_channel.unary_unary = mock.Mock(return_value=method)
+    return mocked_channel, method, fake_call
+
+
+@pytest.mark.asyncio
+async def test_get_operation():
+    mocked_channel, method, fake_call = _mock_grpc_objects(
+        operations_pb2.Operation(name="meep"))
+    client = operations_v1.OperationsAsyncClient(mocked_channel)
+
+    response = await client.get_operation("name")
+    assert method.call_count == 1
+    assert tuple(method.call_args_list[0])[0][0].name == "name"
+    assert response == fake_call.response
+
+
+@pytest.mark.asyncio
+async def test_list_operations():
+    operations = [
+        operations_pb2.Operation(name="1"),
+        operations_pb2.Operation(name="2"),
+    ]
+    list_response = operations_pb2.ListOperationsResponse(operations=operations)
+
+    mocked_channel, method, fake_call = _mock_grpc_objects(list_response)
+    client = operations_v1.OperationsAsyncClient(mocked_channel)
+
+    pager = await client.list_operations("name", "filter")
+
+    assert isinstance(pager, page_iterator_async.AsyncIterator)
+    responses = []
+    async for response in pager:
+        responses.append(response)
+
+    assert responses == operations
+
+    assert method.call_count == 1
+    request = tuple(method.call_args_list[0])[0][0]
+    assert isinstance(request, operations_pb2.ListOperationsRequest)
+    assert request.name == "name"
+    assert request.filter == "filter"
+
+
+@pytest.mark.asyncio
+async def test_delete_operation():
+    mocked_channel, method, fake_call = _mock_grpc_objects(
+        empty_pb2.Empty())
+    client = operations_v1.OperationsAsyncClient(mocked_channel)
+
+    await client.delete_operation("name")
+
+    assert method.call_count == 1
+    assert tuple(method.call_args_list[0])[0][0].name == "name"
+
+
+@pytest.mark.asyncio
+async def test_cancel_operation():
+    mocked_channel, method, fake_call = _mock_grpc_objects(
+        empty_pb2.Empty())
+    client = operations_v1.OperationsAsyncClient(mocked_channel)
+
+    await client.cancel_operation("name")
+
+    assert method.call_count == 1
+    assert tuple(method.call_args_list[0])[0][0].name == "name"
diff --git a/tests/asyncio/test_grpc_helpers_async.py b/tests/asyncio/test_grpc_helpers_async.py
new file mode 100644
index 0000000..0053952
--- /dev/null
+++ b/tests/asyncio/test_grpc_helpers_async.py
@@ -0,0 +1,372 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+from grpc.experimental import aio
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import grpc_helpers_async
+import google.auth.credentials
+
+
+class RpcErrorImpl(grpc.RpcError, grpc.Call):
+    def __init__(self, code):
+        super(RpcErrorImpl, self).__init__()
+        self._code = code
+
+    def code(self):
+        return self._code
+
+    def details(self):
+        return None
+
+
+@pytest.mark.asyncio
+async def test_wrap_unary_errors():
+    grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+    callable_ = mock.AsyncMock(spec=["__call__"], side_effect=grpc_error)
+
+    wrapped_callable = grpc_helpers_async._wrap_unary_errors(callable_)
+
+    with pytest.raises(exceptions.InvalidArgument) as exc_info:
+        await wrapped_callable(1, 2, three="four")
+
+    callable_.assert_called_once_with(1, 2, three="four")
+    assert exc_info.value.response == grpc_error
+
+
+@pytest.mark.asyncio
+async def test_common_methods_in_wrapped_call():
+    mock_call = mock.Mock(aio.UnaryUnaryCall, autospec=True)
+    wrapped_call = grpc_helpers_async._WrappedUnaryUnaryCall().with_call(mock_call)
+
+    await wrapped_call.initial_metadata()
+    assert mock_call.initial_metadata.call_count == 1
+
+    await wrapped_call.trailing_metadata()
+    assert mock_call.trailing_metadata.call_count == 1
+
+    await wrapped_call.code()
+    assert mock_call.code.call_count == 1
+
+    await wrapped_call.details()
+    assert mock_call.details.call_count == 1
+
+    wrapped_call.cancelled()
+    assert mock_call.cancelled.call_count == 1
+
+    wrapped_call.done()
+    assert mock_call.done.call_count == 1
+
+    wrapped_call.time_remaining()
+    assert mock_call.time_remaining.call_count == 1
+
+    wrapped_call.cancel()
+    assert mock_call.cancel.call_count == 1
+
+    callback = mock.sentinel.callback
+    wrapped_call.add_done_callback(callback)
+    mock_call.add_done_callback.assert_called_once_with(callback)
+
+    await wrapped_call.wait_for_connection()
+    assert mock_call.wait_for_connection.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_unary_stream():
+    mock_call = mock.Mock(aio.UnaryStreamCall, autospec=True)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    await wrapped_callable(1, 2, three="four")
+    multicallable.assert_called_once_with(1, 2, three="four")
+    assert mock_call.wait_for_connection.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_stream_unary():
+    mock_call = mock.Mock(aio.StreamUnaryCall, autospec=True)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    await wrapped_callable(1, 2, three="four")
+    multicallable.assert_called_once_with(1, 2, three="four")
+    assert mock_call.wait_for_connection.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_stream_stream():
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    await wrapped_callable(1, 2, three="four")
+    multicallable.assert_called_once_with(1, 2, three="four")
+    assert mock_call.wait_for_connection.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_type_error():
+    mock_call = mock.Mock()
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    with pytest.raises(TypeError):
+        await wrapped_callable()
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_raised():
+    grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    mock_call.wait_for_connection = mock.AsyncMock(side_effect=[grpc_error])
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    with pytest.raises(exceptions.InvalidArgument):
+        await wrapped_callable()
+    assert mock_call.wait_for_connection.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_read():
+    grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    mock_call.read = mock.AsyncMock(side_effect=grpc_error)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    wrapped_call = await wrapped_callable(1, 2, three="four")
+    multicallable.assert_called_once_with(1, 2, three="four")
+    assert mock_call.wait_for_connection.call_count == 1
+
+    with pytest.raises(exceptions.InvalidArgument) as exc_info:
+        await wrapped_call.read()
+    assert exc_info.value.response == grpc_error
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_aiter():
+    grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    mocked_aiter = mock.Mock(spec=['__anext__'])
+    mocked_aiter.__anext__ = mock.AsyncMock(side_effect=[mock.sentinel.response, grpc_error])
+    mock_call.__aiter__ = mock.Mock(return_value=mocked_aiter)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+    wrapped_call = await wrapped_callable()
+
+    with pytest.raises(exceptions.InvalidArgument) as exc_info:
+        async for response in wrapped_call:
+            assert response == mock.sentinel.response
+    assert exc_info.value.response == grpc_error
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_aiter_non_rpc_error():
+    non_grpc_error = TypeError('Not a gRPC error')
+
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    mocked_aiter = mock.Mock(spec=['__anext__'])
+    mocked_aiter.__anext__ = mock.AsyncMock(side_effect=[mock.sentinel.response, non_grpc_error])
+    mock_call.__aiter__ = mock.Mock(return_value=mocked_aiter)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+    wrapped_call = await wrapped_callable()
+
+    with pytest.raises(TypeError) as exc_info:
+        async for response in wrapped_call:
+            assert response == mock.sentinel.response
+    assert exc_info.value == non_grpc_error
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_aiter_called_multiple_times():
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+    wrapped_call = await wrapped_callable()
+
+    assert wrapped_call.__aiter__() == wrapped_call.__aiter__()
+
+
+@pytest.mark.asyncio
+async def test_wrap_stream_errors_write():
+    grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
+
+    mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
+    mock_call.write = mock.AsyncMock(side_effect=[None, grpc_error])
+    mock_call.done_writing = mock.AsyncMock(side_effect=[None, grpc_error])
+    multicallable = mock.Mock(return_value=mock_call)
+
+    wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable)
+
+    wrapped_call = await wrapped_callable()
+
+    await wrapped_call.write(mock.sentinel.request)
+    with pytest.raises(exceptions.InvalidArgument) as exc_info:
+        await wrapped_call.write(mock.sentinel.request)
+    assert mock_call.write.call_count == 2
+    assert exc_info.value.response == grpc_error
+
+    await wrapped_call.done_writing()
+    with pytest.raises(exceptions.InvalidArgument) as exc_info:
+        await wrapped_call.done_writing()
+    assert mock_call.done_writing.call_count == 2
+    assert exc_info.value.response == grpc_error
+
+
+@mock.patch("google.api_core.grpc_helpers_async._wrap_unary_errors")
+def test_wrap_errors_non_streaming(wrap_unary_errors):
+    callable_ = mock.create_autospec(aio.UnaryUnaryMultiCallable)
+
+    result = grpc_helpers_async.wrap_errors(callable_)
+
+    assert result == wrap_unary_errors.return_value
+    wrap_unary_errors.assert_called_once_with(callable_)
+
+
+@mock.patch("google.api_core.grpc_helpers_async._wrap_stream_errors")
+def test_wrap_errors_streaming(wrap_stream_errors):
+    callable_ = mock.create_autospec(aio.UnaryStreamMultiCallable)
+
+    result = grpc_helpers_async.wrap_errors(callable_)
+
+    assert result == wrap_stream_errors.return_value
+    wrap_stream_errors.assert_called_once_with(callable_)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+    "google.auth.default",
+    return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_call):
+    target = "example.com:443"
+    composite_creds = composite_creds_call.return_value
+
+    channel = grpc_helpers_async.create_channel(target)
+
+    assert channel is grpc_secure_channel.return_value
+    default.assert_called_once_with(scopes=None)
+    grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+    "google.auth.default",
+    return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_implicit_with_ssl_creds(
+    grpc_secure_channel, default, composite_creds_call
+):
+    target = "example.com:443"
+
+    ssl_creds = grpc.ssl_channel_credentials()
+
+    grpc_helpers_async.create_channel(target, ssl_credentials=ssl_creds)
+
+    default.assert_called_once_with(scopes=None)
+    composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY)
+    composite_creds = composite_creds_call.return_value
+    grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch(
+    "google.auth.default",
+    return_value=(mock.sentinel.credentials, mock.sentinel.projet),
+)
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_implicit_with_scopes(
+    grpc_secure_channel, default, composite_creds_call
+):
+    target = "example.com:443"
+    composite_creds = composite_creds_call.return_value
+
+    channel = grpc_helpers_async.create_channel(target, scopes=["one", "two"])
+
+    assert channel is grpc_secure_channel.return_value
+    default.assert_called_once_with(scopes=["one", "two"])
+    grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("google.auth.credentials.with_scopes_if_required")
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_creds_call):
+    target = "example.com:443"
+    composite_creds = composite_creds_call.return_value
+
+    channel = grpc_helpers_async.create_channel(target, credentials=mock.sentinel.credentials)
+
+    auth_creds.assert_called_once_with(mock.sentinel.credentials, None)
+    assert channel is grpc_secure_channel.return_value
+    grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@mock.patch("grpc.composite_channel_credentials")
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_call):
+    target = "example.com:443"
+    scopes = ["1", "2"]
+    composite_creds = composite_creds_call.return_value
+
+    credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+    credentials.requires_scopes = True
+
+    channel = grpc_helpers_async.create_channel(
+        target, credentials=credentials, scopes=scopes
+    )
+
+    credentials.with_scopes.assert_called_once_with(scopes)
+    assert channel is grpc_secure_channel.return_value
+    grpc_secure_channel.assert_called_once_with(target, composite_creds)
+
+
+@pytest.mark.skipif(grpc_helpers_async.HAS_GRPC_GCP, reason="grpc_gcp module not available")
+@mock.patch("grpc.experimental.aio.secure_channel")
+def test_create_channel_without_grpc_gcp(grpc_secure_channel):
+    target = "example.com:443"
+    scopes = ["test_scope"]
+
+    credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True)
+    credentials.requires_scopes = True
+
+    grpc_helpers_async.create_channel(target, credentials=credentials, scopes=scopes)
+    grpc_secure_channel.assert_called()
+    credentials.with_scopes.assert_called_once_with(scopes)
+
+
+@pytest.mark.asyncio
+async def test_fake_stream_unary_call():
+    fake_call = grpc_helpers_async.FakeStreamUnaryCall()
+    await fake_call.wait_for_connection()
+    response = await fake_call
+    assert fake_call.response == response