Use a class to wrap grpc streaming errors instead of monkey-patching (#4995)

diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py
index 7d81c75..329971e 100644
--- a/google/api_core/grpc_helpers.py
+++ b/google/api_core/grpc_helpers.py
@@ -58,6 +58,55 @@
     return error_remapped_callable
 
 
+class _StreamingResponseIterator(grpc.Call):
+    def __init__(self, wrapped):
+        self._wrapped = wrapped
+
+    def __iter__(self):
+        """This iterator is also an iterable that returns itself."""
+        return self
+
+    def next(self):
+        """Get the next response from the stream.
+
+        Returns:
+            protobuf.Message: A single response from the stream.
+        """
+        try:
+            return six.next(self._wrapped)
+        except grpc.RpcError as exc:
+            six.raise_from(exceptions.from_grpc_error(exc), exc)
+
+    # Alias needed for Python 2/3 support.
+    __next__ = next
+
+    # grpc.Call & grpc.RpcContext interface
+
+    def add_callback(self, callback):
+        return self._wrapped.add_callback(callback)
+
+    def cancel(self):
+        return self._wrapped.cancel()
+
+    def code(self):
+        return self._wrapped.code()
+
+    def details(self):
+        return self._wrapped.details()
+
+    def initial_metadata(self):
+        return self._wrapped.initial_metadata()
+
+    def is_active(self):
+        return self._wrapped.is_active()
+
+    def time_remaining(self):
+        return self._wrapped.time_remaining()
+
+    def trailing_metadata(self):
+        return self._wrapped.trailing_metadata()
+
+
 def _wrap_stream_errors(callable_):
     """Wrap errors for Unary-Stream and Stream-Stream gRPC callables.
 
@@ -71,18 +120,7 @@
     def error_remapped_callable(*args, **kwargs):
         try:
             result = callable_(*args, **kwargs)
-            # Note: we are patching the private grpc._channel._Rendezvous._next
-            # method as magic methods (__next__ in this case) can not be
-            # patched on a per-instance basis (see
-            # https://docs.python.org/3/reference/datamodel.html
-            # #special-lookup).
-            # In an ideal world, gRPC would return a *specific* interface
-            # from *StreamMultiCallables, but they return a God class that's
-            # a combination of basically every interface in gRPC making it
-            # untenable for us to implement a wrapper object using the same
-            # interface.
-            result._next = _wrap_unary_errors(result._next)
-            return result
+            return _StreamingResponseIterator(result)
         except grpc.RpcError as exc:
             six.raise_from(exceptions.from_grpc_error(exc), exc)
 
diff --git a/tests/unit/test_grpc_helpers.py b/tests/unit/test_grpc_helpers.py
index de093e5..a5bcced 100644
--- a/tests/unit/test_grpc_helpers.py
+++ b/tests/unit/test_grpc_helpers.py
@@ -66,6 +66,57 @@
     assert exc_info.value.response == grpc_error
 
 
+def test_wrap_stream_okay():
+    expected_responses = [1, 2, 3]
+    callable_ = mock.Mock(spec=[
+        '__call__'], return_value=iter(expected_responses))
+
+    wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+    got_iterator = wrapped_callable(1, 2, three='four')
+
+    responses = list(got_iterator)
+
+    callable_.assert_called_once_with(1, 2, three='four')
+    assert responses == expected_responses
+
+
+def test_wrap_stream_iterable_iterface():
+    response_iter = mock.create_autospec(grpc.Call, instance=True)
+    callable_ = mock.Mock(spec=['__call__'], return_value=response_iter)
+
+    wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
+
+    got_iterator = wrapped_callable()
+
+    callable_.assert_called_once_with()
+
+    # Check each aliased method in the grpc.Call interface
+    got_iterator.add_callback(mock.sentinel.callback)
+    response_iter.add_callback.assert_called_once_with(mock.sentinel.callback)
+
+    got_iterator.cancel()
+    response_iter.cancel.assert_called_once_with()
+
+    got_iterator.code()
+    response_iter.code.assert_called_once_with()
+
+    got_iterator.details()
+    response_iter.details.assert_called_once_with()
+
+    got_iterator.initial_metadata()
+    response_iter.initial_metadata.assert_called_once_with()
+
+    got_iterator.is_active()
+    response_iter.is_active.assert_called_once_with()
+
+    got_iterator.time_remaining()
+    response_iter.time_remaining.assert_called_once_with()
+
+    got_iterator.trailing_metadata()
+    response_iter.trailing_metadata.assert_called_once_with()
+
+
 def test_wrap_stream_errors_invocation():
     grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
     callable_ = mock.Mock(spec=['__call__'], side_effect=grpc_error)
@@ -83,16 +134,10 @@
     def __init__(self, exception):
         self._exception = exception
 
-    # Note: This matches grpc._channel._Rendezvous._next which is what is
-    # patched by _wrap_stream_errors.
-    def _next(self):
+    def next(self):
         raise self._exception
 
-    def __next__(self):  # pragma: NO COVER
-        return self._next()
-
-    def next(self):  # pragma: NO COVER
-        return self._next()
+    __next__ = next
 
 
 def test_wrap_stream_errors_iterator():
@@ -107,7 +152,6 @@
     with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
         next(got_iterator)
 
-    assert got_iterator == response_iter
     callable_.assert_called_once_with(1, 2, three='four')
     assert exc_info.value.response == grpc_error