feat: allow gRPC metadata to be passed to operations client (#127)

diff --git a/google/api_core/operation.py b/google/api_core/operation.py
index 9af9c4e..a806523 100644
--- a/google/api_core/operation.py
+++ b/google/api_core/operation.py
@@ -287,7 +287,7 @@
     operations_stub.CancelOperation(request_pb)
 
 
-def from_grpc(operation, operations_stub, result_type, **kwargs):
+def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwargs):
     """Create an operation future using a gRPC client.
 
     This interacts with the long-running operations `service`_ (specific
@@ -302,18 +302,20 @@
         operations_stub (google.longrunning.operations_pb2.OperationsStub):
             The operations stub.
         result_type (:func:`type`): The protobuf result type.
+        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
+            to the rpc.
         kwargs: Keyword args passed into the :class:`Operation` constructor.
 
     Returns:
         ~.api_core.operation.Operation: The operation future to track the given
             operation.
     """
-    refresh = functools.partial(_refresh_grpc, operations_stub, operation.name)
-    cancel = functools.partial(_cancel_grpc, operations_stub, operation.name)
+    refresh = functools.partial(_refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata)
+    cancel = functools.partial(_cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata)
     return Operation(operation, refresh, cancel, result_type, **kwargs)
 
 
-def from_gapic(operation, operations_client, result_type, **kwargs):
+def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs):
     """Create an operation future from a gapic client.
 
     This interacts with the long-running operations `service`_ (specific
@@ -328,12 +330,14 @@
         operations_client (google.api_core.operations_v1.OperationsClient):
             The operations client.
         result_type (:func:`type`): The protobuf result type.
+        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
+            to the rpc.
         kwargs: Keyword args passed into the :class:`Operation` constructor.
 
     Returns:
         ~.api_core.operation.Operation: The operation future to track the given
             operation.
     """
-    refresh = functools.partial(operations_client.get_operation, operation.name)
-    cancel = functools.partial(operations_client.cancel_operation, operation.name)
+    refresh = functools.partial(operations_client.get_operation, operation.name, metadata=grpc_metadata)
+    cancel = functools.partial(operations_client.cancel_operation, operation.name, metadata=grpc_metadata)
     return Operation(operation, refresh, cancel, result_type, **kwargs)
diff --git a/google/api_core/operation_async.py b/google/api_core/operation_async.py
index 89500af..b137235 100644
--- a/google/api_core/operation_async.py
+++ b/google/api_core/operation_async.py
@@ -189,7 +189,7 @@
         )
 
 
-def from_gapic(operation, operations_client, result_type, **kwargs):
+def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs):
     """Create an operation future from a gapic client.
 
     This interacts with the long-running operations `service`_ (specific
@@ -204,12 +204,14 @@
         operations_client (google.api_core.operations_v1.OperationsClient):
             The operations client.
         result_type (:func:`type`): The protobuf result type.
+        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
+            to the rpc.
         kwargs: Keyword args passed into the :class:`Operation` constructor.
 
     Returns:
         ~.api_core.operation.Operation: The operation future to track the given
             operation.
     """
-    refresh = functools.partial(operations_client.get_operation, operation.name)
-    cancel = functools.partial(operations_client.cancel_operation, operation.name)
+    refresh = functools.partial(operations_client.get_operation, operation.name, metadata=grpc_metadata)
+    cancel = functools.partial(operations_client.cancel_operation, operation.name, metadata=grpc_metadata)
     return AsyncOperation(operation, refresh, cancel, result_type, **kwargs)
diff --git a/google/api_core/operations_v1/operations_async_client.py b/google/api_core/operations_v1/operations_async_client.py
index 039bec1..5d7b26c 100644
--- a/google/api_core/operations_v1/operations_async_client.py
+++ b/google/api_core/operations_v1/operations_async_client.py
@@ -77,7 +77,11 @@
         )
 
     async def get_operation(
-        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method_async.DEFAULT,
+        timeout=gapic_v1.method_async.DEFAULT,
+        metadata=None,
     ):
         """Gets the latest state of a long-running operation.
 
@@ -103,6 +107,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]):
+                Additional gRPC metadata.
 
         Returns:
             google.longrunning.operations_pb2.Operation: The state of the
@@ -114,7 +120,7 @@
                 subclass will be raised.
         """
         request = operations_pb2.GetOperationRequest(name=name)
-        return await self._get_operation(request, retry=retry, timeout=timeout)
+        return await self._get_operation(request, retry=retry, timeout=timeout, metadata=metadata)
 
     async def list_operations(
         self,
@@ -122,6 +128,7 @@
         filter_,
         retry=gapic_v1.method_async.DEFAULT,
         timeout=gapic_v1.method_async.DEFAULT,
+        metadata=None,
     ):
         """
         Lists operations that match the specified filter in the request.
@@ -157,6 +164,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
 
         Returns:
             google.api_core.page_iterator.Iterator: An iterator that yields
@@ -174,7 +183,7 @@
         request = operations_pb2.ListOperationsRequest(name=name, filter=filter_)
 
         # Create the method used to fetch pages
-        method = functools.partial(self._list_operations, retry=retry, timeout=timeout)
+        method = functools.partial(self._list_operations, retry=retry, timeout=timeout, metadata=metadata)
 
         iterator = page_iterator_async.AsyncGRPCIterator(
             client=None,
@@ -188,7 +197,11 @@
         return iterator
 
     async def cancel_operation(
-        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method_async.DEFAULT,
+        timeout=gapic_v1.method_async.DEFAULT,
+        metadata=None,
     ):
         """Starts asynchronous cancellation on a long-running operation.
 
@@ -228,13 +241,19 @@
             google.api_core.exceptions.GoogleAPICallError: If an error occurred
                 while invoking the RPC, the appropriate ``GoogleAPICallError``
                 subclass will be raised.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
         """
         # Create the request object.
         request = operations_pb2.CancelOperationRequest(name=name)
-        await self._cancel_operation(request, retry=retry, timeout=timeout)
+        await self._cancel_operation(request, retry=retry, timeout=timeout, metadata=metadata)
 
     async def delete_operation(
-        self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method_async.DEFAULT,
+        timeout=gapic_v1.method_async.DEFAULT,
+        metadata=None,
     ):
         """Deletes a long-running operation.
 
@@ -260,6 +279,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
 
         Raises:
             google.api_core.exceptions.MethodNotImplemented: If the server
@@ -271,4 +292,4 @@
         """
         # Create the request object.
         request = operations_pb2.DeleteOperationRequest(name=name)
-        await self._delete_operation(request, retry=retry, timeout=timeout)
+        await self._delete_operation(request, retry=retry, timeout=timeout, metadata=metadata)
diff --git a/google/api_core/operations_v1/operations_client.py b/google/api_core/operations_v1/operations_client.py
index cd2923b..b850796 100644
--- a/google/api_core/operations_v1/operations_client.py
+++ b/google/api_core/operations_v1/operations_client.py
@@ -91,7 +91,11 @@
 
     # Service calls
     def get_operation(
-        self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method.DEFAULT,
+        timeout=gapic_v1.method.DEFAULT,
+        metadata=None,
     ):
         """Gets the latest state of a long-running operation.
 
@@ -117,6 +121,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]):
+                Additional gRPC metadata.
 
         Returns:
             google.longrunning.operations_pb2.Operation: The state of the
@@ -128,7 +134,7 @@
                 subclass will be raised.
         """
         request = operations_pb2.GetOperationRequest(name=name)
-        return self._get_operation(request, retry=retry, timeout=timeout)
+        return self._get_operation(request, retry=retry, timeout=timeout, metadata=metadata)
 
     def list_operations(
         self,
@@ -136,6 +142,7 @@
         filter_,
         retry=gapic_v1.method.DEFAULT,
         timeout=gapic_v1.method.DEFAULT,
+        metadata=None,
     ):
         """
         Lists operations that match the specified filter in the request.
@@ -171,6 +178,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
 
         Returns:
             google.api_core.page_iterator.Iterator: An iterator that yields
@@ -188,7 +197,7 @@
         request = operations_pb2.ListOperationsRequest(name=name, filter=filter_)
 
         # Create the method used to fetch pages
-        method = functools.partial(self._list_operations, retry=retry, timeout=timeout)
+        method = functools.partial(self._list_operations, retry=retry, timeout=timeout, metadata=metadata)
 
         iterator = page_iterator.GRPCIterator(
             client=None,
@@ -202,7 +211,11 @@
         return iterator
 
     def cancel_operation(
-        self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method.DEFAULT,
+        timeout=gapic_v1.method.DEFAULT,
+        metadata=None,
     ):
         """Starts asynchronous cancellation on a long-running operation.
 
@@ -234,6 +247,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
 
         Raises:
             google.api_core.exceptions.MethodNotImplemented: If the server
@@ -245,10 +260,14 @@
         """
         # Create the request object.
         request = operations_pb2.CancelOperationRequest(name=name)
-        self._cancel_operation(request, retry=retry, timeout=timeout)
+        self._cancel_operation(request, retry=retry, timeout=timeout, metadata=metadata)
 
     def delete_operation(
-        self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
+        self,
+        name,
+        retry=gapic_v1.method.DEFAULT,
+        timeout=gapic_v1.method.DEFAULT,
+        metadata=None,
     ):
         """Deletes a long-running operation.
 
@@ -274,6 +293,8 @@
                 unspecified, the the default timeout in the client
                 configuration is used. If ``None``, then the RPC method will
                 not time out.
+            metadata (Optional[List[Tuple[str, str]]]): Additional gRPC
+                metadata.
 
         Raises:
             google.api_core.exceptions.MethodNotImplemented: If the server
@@ -285,4 +306,4 @@
         """
         # Create the request object.
         request = operations_pb2.DeleteOperationRequest(name=name)
-        self._delete_operation(request, retry=retry, timeout=timeout)
+        self._delete_operation(request, retry=retry, timeout=timeout, metadata=metadata)
diff --git a/tests/asyncio/operations_v1/test_operations_async_client.py b/tests/asyncio/operations_v1/test_operations_async_client.py
index 0f9363f..830cd46 100644
--- a/tests/asyncio/operations_v1/test_operations_async_client.py
+++ b/tests/asyncio/operations_v1/test_operations_async_client.py
@@ -36,9 +36,10 @@
         operations_pb2.Operation(name="meep"))
     client = operations_v1.OperationsAsyncClient(mocked_channel)
 
-    response = await client.get_operation("name")
+    response = await client.get_operation("name", metadata=[("x-goog-request-params", "foo")])
     assert method.call_count == 1
     assert tuple(method.call_args_list[0])[0][0].name == "name"
+    assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"]
     assert response == fake_call.response
 
 
@@ -53,7 +54,7 @@
     mocked_channel, method, fake_call = _mock_grpc_objects(list_response)
     client = operations_v1.OperationsAsyncClient(mocked_channel)
 
-    pager = await client.list_operations("name", "filter")
+    pager = await client.list_operations("name", "filter", metadata=[("x-goog-request-params", "foo")])
 
     assert isinstance(pager, page_iterator_async.AsyncIterator)
     responses = []
@@ -63,6 +64,7 @@
     assert responses == operations
 
     assert method.call_count == 1
+    assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"]
     request = tuple(method.call_args_list[0])[0][0]
     assert isinstance(request, operations_pb2.ListOperationsRequest)
     assert request.name == "name"
@@ -75,10 +77,11 @@
         empty_pb2.Empty())
     client = operations_v1.OperationsAsyncClient(mocked_channel)
 
-    await client.delete_operation("name")
+    await client.delete_operation("name", metadata=[("x-goog-request-params", "foo")])
 
     assert method.call_count == 1
     assert tuple(method.call_args_list[0])[0][0].name == "name"
+    assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"]
 
 
 @pytest.mark.asyncio
@@ -87,7 +90,8 @@
         empty_pb2.Empty())
     client = operations_v1.OperationsAsyncClient(mocked_channel)
 
-    await client.cancel_operation("name")
+    await client.cancel_operation("name", metadata=[("x-goog-request-params", "foo")])
 
     assert method.call_count == 1
     assert tuple(method.call_args_list[0])[0][0].name == "name"
+    assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"]
diff --git a/tests/asyncio/test_operation_async.py b/tests/asyncio/test_operation_async.py
index 419749f..e35d139 100644
--- a/tests/asyncio/test_operation_async.py
+++ b/tests/asyncio/test_operation_async.py
@@ -177,12 +177,15 @@
         operations_client,
         struct_pb2.Struct,
         metadata_type=struct_pb2.Struct,
+        grpc_metadata=[('x-goog-request-params', 'foo')]
     )
 
     assert future._result_type == struct_pb2.Struct
     assert future._metadata_type == struct_pb2.Struct
     assert future.operation.name == TEST_OPERATION_NAME
     assert future.done
+    assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')]
+    assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')]
 
 
 def test_deserialize():
diff --git a/tests/unit/operations_v1/test_operations_client.py b/tests/unit/operations_v1/test_operations_client.py
index cc57461..bd7f373 100644
--- a/tests/unit/operations_v1/test_operations_client.py
+++ b/tests/unit/operations_v1/test_operations_client.py
@@ -24,8 +24,9 @@
     client = operations_v1.OperationsClient(channel)
     channel.GetOperation.response = operations_pb2.Operation(name="meep")
 
-    response = client.get_operation("name")
+    response = client.get_operation("name", metadata=[("x-goog-request-params", "foo")])
 
+    assert ("x-goog-request-params", "foo") in channel.GetOperation.calls[0].metadata
     assert len(channel.GetOperation.requests) == 1
     assert channel.GetOperation.requests[0].name == "name"
     assert response == channel.GetOperation.response
@@ -41,11 +42,12 @@
     list_response = operations_pb2.ListOperationsResponse(operations=operations)
     channel.ListOperations.response = list_response
 
-    response = client.list_operations("name", "filter")
+    response = client.list_operations("name", "filter", metadata=[("x-goog-request-params", "foo")])
 
     assert isinstance(response, page_iterator.Iterator)
     assert list(response) == operations
 
+    assert ("x-goog-request-params", "foo") in channel.ListOperations.calls[0].metadata
     assert len(channel.ListOperations.requests) == 1
     request = channel.ListOperations.requests[0]
     assert isinstance(request, operations_pb2.ListOperationsRequest)
@@ -58,8 +60,9 @@
     client = operations_v1.OperationsClient(channel)
     channel.DeleteOperation.response = empty_pb2.Empty()
 
-    client.delete_operation("name")
+    client.delete_operation("name", metadata=[("x-goog-request-params", "foo")])
 
+    assert ("x-goog-request-params", "foo") in channel.DeleteOperation.calls[0].metadata
     assert len(channel.DeleteOperation.requests) == 1
     assert channel.DeleteOperation.requests[0].name == "name"
 
@@ -69,7 +72,8 @@
     client = operations_v1.OperationsClient(channel)
     channel.CancelOperation.response = empty_pb2.Empty()
 
-    client.cancel_operation("name")
+    client.cancel_operation("name", metadata=[("x-goog-request-params", "foo")])
 
+    assert ("x-goog-request-params", "foo") in channel.CancelOperation.calls[0].metadata
     assert len(channel.CancelOperation.requests) == 1
     assert channel.CancelOperation.requests[0].name == "name"
diff --git a/tests/unit/test_operation.py b/tests/unit/test_operation.py
index 2229c2d..ae9bafe 100644
--- a/tests/unit/test_operation.py
+++ b/tests/unit/test_operation.py
@@ -279,12 +279,15 @@
         operations_stub,
         struct_pb2.Struct,
         metadata_type=struct_pb2.Struct,
+        grpc_metadata=[('x-goog-request-params', 'foo')]
     )
 
     assert future._result_type == struct_pb2.Struct
     assert future._metadata_type == struct_pb2.Struct
     assert future.operation.name == TEST_OPERATION_NAME
     assert future.done
+    assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')]
+    assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')]
 
 
 def test_from_gapic():
@@ -298,12 +301,15 @@
         operations_client,
         struct_pb2.Struct,
         metadata_type=struct_pb2.Struct,
+        grpc_metadata=[('x-goog-request-params', 'foo')]
     )
 
     assert future._result_type == struct_pb2.Struct
     assert future._metadata_type == struct_pb2.Struct
     assert future.operation.name == TEST_OPERATION_NAME
     assert future.done
+    assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')]
+    assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')]
 
 
 def test_deserialize():