Merge pull request #13891 from nathanielmanistaatgoogle/12531

Reform cygrpc.OperationTag and cygrpc.Event.
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6361669..0892215 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -26,16 +26,13 @@
   def _start_batch(self, operations, tag, retain_self):
     if not self.is_valid:
       raise ValueError("invalid call object cannot be used from Python")
-    cdef OperationTag operation_tag = OperationTag(tag, operations)
-    if retain_self:
-      operation_tag.operation_call = self
-    else:
-      operation_tag.operation_call = None
-    operation_tag.store_ops()
-    cpython.Py_INCREF(operation_tag)
+    cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
+        tag, operations, self if retain_self else None)
+    batch_operation_tag.prepare()
+    cpython.Py_INCREF(batch_operation_tag)
     return grpc_call_start_batch(
-          self.c_call, operation_tag.c_ops, operation_tag.c_nops,
-          <cpython.PyObject *>operation_tag, NULL)
+          self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
+          <cpython.PyObject *>batch_operation_tag, NULL)
 
   def start_client_batch(self, operations, tag):
     # We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 644df67..443d534 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -76,12 +76,12 @@
   def watch_connectivity_state(
       self, grpc_connectivity_state last_observed_state,
       Timespec deadline not None, CompletionQueue queue not None, tag):
-    cdef OperationTag operation_tag = OperationTag(tag, None)
-    cpython.Py_INCREF(operation_tag)
+    cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
+    cpython.Py_INCREF(connectivity_tag)
     with nogil:
       grpc_channel_watch_connectivity_state(
           self.c_channel, last_observed_state, deadline.c_time,
-          queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+          queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
 
   def target(self):
     cdef char *target = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 140fc35..e259789 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -37,42 +37,20 @@
     self.is_shutdown = False
 
   cdef _interpret_event(self, grpc_event event):
-    cdef OperationTag tag = None
-    cdef object user_tag = None
-    cdef Call operation_call = None
-    cdef CallDetails request_call_details = None
-    cdef object request_metadata = None
-    cdef object batch_operations = None
+    cdef _Tag tag = None
     if event.type == GRPC_QUEUE_TIMEOUT:
-      return Event(
-          event.type, False, None, None, None, None, False, None)
+      # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+      return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
     elif event.type == GRPC_QUEUE_SHUTDOWN:
       self.is_shutdown = True
-      return Event(
-          event.type, True, None, None, None, None, False, None)
+      # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+      return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
     else:
-      if event.tag != NULL:
-        tag = <OperationTag>event.tag
-        # We receive event tags only after they've been inc-ref'd elsewhere in
-        # the code.
-        cpython.Py_DECREF(tag)
-        if tag.shutting_down_server is not None:
-          tag.shutting_down_server.notify_shutdown_complete()
-        user_tag = tag.user_tag
-        operation_call = tag.operation_call
-        request_call_details = tag.request_call_details
-        if tag.is_new_request:
-          request_metadata = _metadata(&tag._c_request_metadata)
-          grpc_metadata_array_destroy(&tag._c_request_metadata)
-        batch_operations = tag.release_ops()
-        if tag.is_new_request:
-          # Stuff in the tag not explicitly handled by us needs to live through
-          # the life of the call
-          operation_call.references.extend(tag.references)
-      return Event(
-          event.type, event.success, user_tag, operation_call,
-          request_call_details, request_metadata, tag.is_new_request,
-          batch_operations)
+      tag = <_Tag>event.tag
+      # We receive event tags only after they've been inc-ref'd elsewhere in
+      # the code.
+      cpython.Py_DECREF(tag)
+      return tag.event(event)
 
   def poll(self, Timespec deadline=None):
     # We name this 'poll' to avoid problems with CPython's expectations for
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
new file mode 100644
index 0000000..686199e
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
@@ -0,0 +1,45 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+  cdef readonly grpc_completion_type completion_type
+  cdef readonly bint success
+  cdef readonly object tag
+
+
+cdef class RequestCallEvent:
+
+  cdef readonly grpc_completion_type completion_type
+  cdef readonly bint success
+  cdef readonly object tag
+  cdef readonly Call call
+  cdef readonly CallDetails call_details
+  cdef readonly tuple invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+  cdef readonly grpc_completion_type completion_type
+  cdef readonly bint success
+  cdef readonly object tag
+  cdef readonly object batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+  cdef readonly grpc_completion_type completion_type
+  cdef readonly bint success
+  cdef readonly object tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
new file mode 100644
index 0000000..af26d27
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
@@ -0,0 +1,55 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+  def __cinit__(
+      self, grpc_completion_type completion_type, bint success, object tag):
+    self.completion_type = completion_type
+    self.success = success
+    self.tag = tag
+
+
+cdef class RequestCallEvent:
+
+  def __cinit__(
+      self, grpc_completion_type completion_type, bint success, object tag,
+      Call call, CallDetails call_details, tuple invocation_metadata):
+    self.completion_type = completion_type
+    self.success = success
+    self.tag = tag
+    self.call = call
+    self.call_details = call_details
+    self.invocation_metadata = invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+  def __cinit__(
+      self, grpc_completion_type completion_type, bint success, object tag,
+      object batch_operations):
+    self.completion_type = completion_type
+    self.success = success
+    self.tag = tag
+    self.batch_operations = batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+  def __cinit__(
+      self, grpc_completion_type completion_type, bint success, object tag):
+    self.completion_type = completion_type
+    self.success = success
+    self.tag = tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 537cf2b..7b2482d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,43 +28,6 @@
   cdef grpc_call_details c_details
 
 
-cdef class OperationTag:
-
-  cdef object user_tag
-  cdef list references
-  # This allows CompletionQueue to notify the Python Server object that the
-  # underlying GRPC core server has shutdown
-  cdef Server shutting_down_server
-  cdef Call operation_call
-  cdef CallDetails request_call_details
-  cdef grpc_metadata_array _c_request_metadata
-  cdef grpc_op *c_ops
-  cdef size_t c_nops
-  cdef readonly object _operations
-  cdef bint is_new_request
-
-  cdef void store_ops(self)
-  cdef object release_ops(self)
-
-
-cdef class Event:
-
-  cdef readonly grpc_completion_type type
-  cdef readonly bint success
-  cdef readonly object tag
-
-  # For Server.request_call
-  cdef readonly bint is_new_request
-  cdef readonly CallDetails request_call_details
-  cdef readonly object request_metadata
-
-  # For server calls
-  cdef readonly Call operation_call
-
-  # For Call.start_batch
-  cdef readonly object batch_operations
-
-
 cdef class SslPemKeyCertPair:
 
   cdef grpc_ssl_pem_key_cert_pair c_pair
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 7e6272f..bc2cd03 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -218,50 +218,6 @@
     return timespec
 
 
-cdef class OperationTag:
-
-  def __cinit__(self, user_tag, operations):
-    self.user_tag = user_tag
-    self.references = []
-    self._operations = operations
-
-  cdef void store_ops(self):
-    self.c_nops = 0 if self._operations is None else len(self._operations)
-    if 0 < self.c_nops:
-      self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
-      for index, operation in enumerate(self._operations):
-        (<Operation>operation).c()
-        self.c_ops[index] = (<Operation>operation).c_op
-
-  cdef object release_ops(self):
-    if 0 < self.c_nops:
-      for index, operation in enumerate(self._operations):
-        (<Operation>operation).c_op = self.c_ops[index]
-        (<Operation>operation).un_c()
-      gpr_free(self.c_ops)
-      return self._operations
-    else:
-      return ()
-
-
-cdef class Event:
-
-  def __cinit__(self, grpc_completion_type type, bint success,
-                object tag, Call operation_call,
-                CallDetails request_call_details,
-                object request_metadata,
-                bint is_new_request,
-                object batch_operations):
-    self.type = type
-    self.success = success
-    self.tag = tag
-    self.operation_call = operation_call
-    self.request_call_details = request_call_details
-    self.request_metadata = request_metadata
-    self.batch_operations = batch_operations
-    self.is_new_request = is_new_request
-
-
 cdef class SslPemKeyCertPair:
 
   def __cinit__(self, bytes private_key, bytes certificate_chain):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index f8d7892..c19becc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -78,19 +78,15 @@
       raise ValueError("server must be started and not shutting down")
     if server_queue not in self.registered_completion_queues:
       raise ValueError("server_queue must be a registered completion queue")
-    cdef OperationTag operation_tag = OperationTag(tag, None)
-    operation_tag.operation_call = Call()
-    operation_tag.request_call_details = CallDetails()
-    grpc_metadata_array_init(&operation_tag._c_request_metadata)
-    operation_tag.references.extend([self, call_queue, server_queue])
-    operation_tag.is_new_request = True
-    cpython.Py_INCREF(operation_tag)
+    cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
+    request_call_tag.prepare()
+    cpython.Py_INCREF(request_call_tag)
     return grpc_server_request_call(
-        self.c_server, &operation_tag.operation_call.c_call,
-        &operation_tag.request_call_details.c_details,
-        &operation_tag._c_request_metadata,
+        self.c_server, &request_call_tag.call.c_call,
+        &request_call_tag.call_details.c_details,
+        &request_call_tag.c_invocation_metadata,
         call_queue.c_completion_queue, server_queue.c_completion_queue,
-        <cpython.PyObject *>operation_tag)
+        <cpython.PyObject *>request_call_tag)
 
   def register_completion_queue(
       self, CompletionQueue queue not None):
@@ -131,16 +127,14 @@
 
   cdef _c_shutdown(self, CompletionQueue queue, tag):
     self.is_shutting_down = True
-    operation_tag = OperationTag(tag, None)
-    operation_tag.shutting_down_server = self
-    cpython.Py_INCREF(operation_tag)
+    cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
+    cpython.Py_INCREF(server_shutdown_tag)
     with nogil:
       grpc_server_shutdown_and_notify(
           self.c_server, queue.c_completion_queue,
-          <cpython.PyObject *>operation_tag)
+          <cpython.PyObject *>server_shutdown_tag)
 
   def shutdown(self, CompletionQueue queue not None, tag):
-    cdef OperationTag operation_tag
     if queue.is_shutting_down:
       raise ValueError("queue must be live")
     elif not self.is_started:
@@ -153,7 +147,8 @@
       self._c_shutdown(queue, tag)
 
   cdef notify_shutdown_complete(self):
-    # called only by a completion queue on receiving our shutdown operation tag
+    # called only after our server shutdown tag has emerged from a completion
+    # queue.
     self.is_shutdown = True
 
   def cancel_all_calls(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
new file mode 100644
index 0000000..f9a3b5e
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
@@ -0,0 +1,58 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+  cdef object event(self, grpc_event c_event)
+
+
+cdef class _ConnectivityTag(_Tag):
+
+  cdef readonly object _user_tag
+
+  cdef ConnectivityEvent event(self, grpc_event c_event)
+
+
+cdef class _RequestCallTag(_Tag):
+
+  cdef readonly object _user_tag
+  cdef Call call
+  cdef CallDetails call_details
+  cdef grpc_metadata_array c_invocation_metadata
+
+  cdef void prepare(self)
+  cdef RequestCallEvent event(self, grpc_event c_event)
+
+
+cdef class _BatchOperationTag(_Tag):
+
+  cdef object _user_tag
+  cdef readonly object _operations
+  cdef readonly object _retained_call
+  cdef grpc_op *c_ops
+  cdef size_t c_nops
+
+  cdef void prepare(self)
+  cdef BatchOperationEvent event(self, grpc_event c_event)
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+  cdef readonly object _user_tag
+  # This allows CompletionQueue to notify the Python Server object that the
+  # underlying GRPC core server has shutdown
+  cdef readonly Server _shutting_down_server
+
+  cdef ServerShutdownEvent event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
new file mode 100644
index 0000000..aaca458
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
@@ -0,0 +1,87 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+  cdef object event(self, grpc_event c_event):
+    raise NotImplementedError()
+
+
+cdef class _ConnectivityTag(_Tag):
+
+  def __cinit__(self, user_tag):
+    self._user_tag = user_tag
+
+  cdef ConnectivityEvent event(self, grpc_event c_event):
+    return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
+
+
+cdef class _RequestCallTag(_Tag):
+
+  def __cinit__(self, user_tag):
+    self._user_tag = user_tag
+    self.call = None
+    self.call_details = None
+
+  cdef void prepare(self):
+    self.call = Call()
+    self.call_details = CallDetails()
+    grpc_metadata_array_init(&self.c_invocation_metadata)
+
+  cdef RequestCallEvent event(self, grpc_event c_event):
+    cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
+    grpc_metadata_array_destroy(&self.c_invocation_metadata)
+    return RequestCallEvent(
+        c_event.type, c_event.success, self._user_tag, self.call,
+        self.call_details, invocation_metadata)
+
+
+cdef class _BatchOperationTag:
+
+  def __cinit__(self, user_tag, operations, call):
+    self._user_tag = user_tag
+    self._operations = operations
+    self._retained_call = call
+
+  cdef void prepare(self):
+    self.c_nops = 0 if self._operations is None else len(self._operations)
+    if 0 < self.c_nops:
+      self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+      for index, operation in enumerate(self._operations):
+        (<Operation>operation).c()
+        self.c_ops[index] = (<Operation>operation).c_op
+
+  cdef BatchOperationEvent event(self, grpc_event c_event):
+    if 0 < self.c_nops:
+      for index, operation in enumerate(self._operations):
+        (<Operation>operation).c_op = self.c_ops[index]
+        (<Operation>operation).un_c()
+      gpr_free(self.c_ops)
+      return BatchOperationEvent(
+          c_event.type, c_event.success, self._user_tag, self._operations)
+    else:
+      return BatchOperationEvent(
+          c_event.type, c_event.success, self._user_tag, ())
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+  def __cinit__(self, user_tag, shutting_down_server):
+    self._user_tag = user_tag
+    self._shutting_down_server = shutting_down_server
+
+  cdef ServerShutdownEvent event(self, grpc_event c_event):
+    self._shutting_down_server.notify_shutdown_complete()
+    return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag)
\ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index ad229de..b32fa51 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -18,8 +18,10 @@
 include "_cygrpc/channel.pxd.pxi"
 include "_cygrpc/credentials.pxd.pxi"
 include "_cygrpc/completion_queue.pxd.pxi"
+include "_cygrpc/event.pxd.pxi"
 include "_cygrpc/metadata.pxd.pxi"
 include "_cygrpc/operation.pxd.pxi"
 include "_cygrpc/records.pxd.pxi"
 include "_cygrpc/security.pxd.pxi"
 include "_cygrpc/server.pxd.pxi"
+include "_cygrpc/tag.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 0964fb6..5106394 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -25,11 +25,13 @@
 include "_cygrpc/channel.pyx.pxi"
 include "_cygrpc/credentials.pyx.pxi"
 include "_cygrpc/completion_queue.pyx.pxi"
+include "_cygrpc/event.pyx.pxi"
 include "_cygrpc/metadata.pyx.pxi"
 include "_cygrpc/operation.pyx.pxi"
 include "_cygrpc/records.pyx.pxi"
 include "_cygrpc/security.pyx.pxi"
 include "_cygrpc/server.pyx.pxi"
+include "_cygrpc/tag.pyx.pxi"
 
 #
 # initialize gRPC
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index eec31bd..22244b9 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -217,11 +217,10 @@
 
     def time_remaining(self):
         return max(
-            float(self._rpc_event.request_call_details.deadline) - time.time(),
-            0)
+            float(self._rpc_event.call_details.deadline) - time.time(), 0)
 
     def cancel(self):
-        self._rpc_event.operation_call.cancel()
+        self._rpc_event.call.cancel()
 
     def add_callback(self, callback):
         with self._state.condition:
@@ -236,23 +235,23 @@
             self._state.disable_next_compression = True
 
     def invocation_metadata(self):
-        return self._rpc_event.request_metadata
+        return self._rpc_event.invocation_metadata
 
     def peer(self):
-        return _common.decode(self._rpc_event.operation_call.peer())
+        return _common.decode(self._rpc_event.call.peer())
 
     def peer_identities(self):
-        return cygrpc.peer_identities(self._rpc_event.operation_call)
+        return cygrpc.peer_identities(self._rpc_event.call)
 
     def peer_identity_key(self):
-        id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call)
+        id_key = cygrpc.peer_identity_key(self._rpc_event.call)
         return id_key if id_key is None else _common.decode(id_key)
 
     def auth_context(self):
         return {
             _common.decode(key): value
             for key, value in six.iteritems(
-                cygrpc.auth_context(self._rpc_event.operation_call))
+                cygrpc.auth_context(self._rpc_event.call))
         }
 
     def send_initial_metadata(self, initial_metadata):
@@ -263,7 +262,7 @@
                 if self._state.initial_metadata_allowed:
                     operation = cygrpc.SendInitialMetadataOperation(
                         initial_metadata, _EMPTY_FLAGS)
-                    self._rpc_event.operation_call.start_server_batch(
+                    self._rpc_event.call.start_server_batch(
                         (operation,), _send_initial_metadata(self._state))
                     self._state.initial_metadata_allowed = False
                     self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
@@ -346,9 +345,9 @@
             if state.client is _CANCELLED or state.statused:
                 return None
             else:
-                rpc_event.operation_call.start_server_batch(
+                rpc_event.call.start_server_batch(
                     (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
-                    _receive_message(state, rpc_event.operation_call,
+                    _receive_message(state, rpc_event.call,
                                      request_deserializer))
                 state.due.add(_RECEIVE_MESSAGE_TOKEN)
                 while True:
@@ -356,8 +355,8 @@
                     if state.request is None:
                         if state.client is _CLOSED:
                             details = '"{}" requires exactly one request message.'.format(
-                                rpc_event.request_call_details.method)
-                            _abort(state, rpc_event.operation_call,
+                                rpc_event.call_details.method)
+                            _abort(state, rpc_event.call,
                                    cygrpc.StatusCode.unimplemented,
                                    _common.encode(details))
                             return None
@@ -378,13 +377,13 @@
     except Exception as exception:  # pylint: disable=broad-except
         with state.condition:
             if exception is state.abortion:
-                _abort(state, rpc_event.operation_call,
-                       cygrpc.StatusCode.unknown, b'RPC Aborted')
+                _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+                       b'RPC Aborted')
             elif exception not in state.rpc_errors:
                 details = 'Exception calling application: {}'.format(exception)
                 logging.exception(details)
-                _abort(state, rpc_event.operation_call,
-                       cygrpc.StatusCode.unknown, _common.encode(details))
+                _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+                       _common.encode(details))
         return None, False
 
 
@@ -396,13 +395,13 @@
     except Exception as exception:  # pylint: disable=broad-except
         with state.condition:
             if exception is state.abortion:
-                _abort(state, rpc_event.operation_call,
-                       cygrpc.StatusCode.unknown, b'RPC Aborted')
+                _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+                       b'RPC Aborted')
             elif exception not in state.rpc_errors:
                 details = 'Exception iterating responses: {}'.format(exception)
                 logging.exception(details)
-                _abort(state, rpc_event.operation_call,
-                       cygrpc.StatusCode.unknown, _common.encode(details))
+                _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+                       _common.encode(details))
         return None, False
 
 
@@ -410,7 +409,7 @@
     serialized_response = _common.serialize(response, response_serializer)
     if serialized_response is None:
         with state.condition:
-            _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
+            _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
                    b'Failed to serialize response!')
         return None
     else:
@@ -433,8 +432,8 @@
                 operations = (cygrpc.SendMessageOperation(serialized_response,
                                                           _EMPTY_FLAGS),)
                 token = _SEND_MESSAGE_TOKEN
-            rpc_event.operation_call.start_server_batch(
-                operations, _send_message(state, token))
+            rpc_event.call.start_server_batch(operations,
+                                              _send_message(state, token))
             state.due.add(token)
             while True:
                 state.condition.wait()
@@ -458,7 +457,7 @@
                 operations.append(
                     cygrpc.SendMessageOperation(serialized_response,
                                                 _EMPTY_FLAGS))
-            rpc_event.operation_call.start_server_batch(
+            rpc_event.call.start_server_batch(
                 operations,
                 _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
             state.statused = True
@@ -525,7 +524,7 @@
 
 
 def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
-    request_iterator = _RequestIterator(state, rpc_event.operation_call,
+    request_iterator = _RequestIterator(state, rpc_event.call,
                                         method_handler.request_deserializer)
     return thread_pool.submit(
         _unary_response_in_pool, rpc_event, state, method_handler.stream_unary,
@@ -534,7 +533,7 @@
 
 
 def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
-    request_iterator = _RequestIterator(state, rpc_event.operation_call,
+    request_iterator = _RequestIterator(state, rpc_event.call,
                                         method_handler.request_deserializer)
     return thread_pool.submit(
         _stream_response_in_pool, rpc_event, state,
@@ -552,8 +551,8 @@
         return None
 
     handler_call_details = _HandlerCallDetails(
-        _common.decode(rpc_event.request_call_details.method),
-        rpc_event.request_metadata)
+        _common.decode(rpc_event.call_details.method),
+        rpc_event.invocation_metadata)
 
     if interceptor_pipeline is not None:
         return interceptor_pipeline.execute(query_handlers,
@@ -568,15 +567,15 @@
                   cygrpc.SendStatusFromServerOperation(None, status, details,
                                                        _EMPTY_FLAGS),)
     rpc_state = _RPCState()
-    rpc_event.operation_call.start_server_batch(
-        operations, lambda ignored_event: (rpc_state, (),))
+    rpc_event.call.start_server_batch(operations,
+                                      lambda ignored_event: (rpc_state, (),))
     return rpc_state
 
 
 def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
     state = _RPCState()
     with state.condition:
-        rpc_event.operation_call.start_server_batch(
+        rpc_event.call.start_server_batch(
             (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
             _receive_close_on_server(state))
         state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
@@ -600,7 +599,7 @@
                  concurrency_exceeded):
     if not rpc_event.success:
         return None, None
-    if rpc_event.request_call_details.method is not None:
+    if rpc_event.call_details.method is not None:
         try:
             method_handler = _find_method_handler(rpc_event, generic_handlers,
                                                   interceptor_pipeline)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 75b6b9e..cdb3572 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -53,7 +53,7 @@
         self._state = state
         self._lock = threading.Lock()
         self._completion_queue = completion_queue
-        self._call = rpc_event.operation_call
+        self._call = rpc_event.call
 
     def __call__(self):
         with self._state.condition:
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index 41291cc..583136c 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -72,7 +72,7 @@
 
         with server_call_condition:
             server_send_initial_metadata_start_batch_result = (
-                server_request_call_event.operation_call.start_server_batch([
+                server_request_call_event.call.start_server_batch([
                     cygrpc.SendInitialMetadataOperation(
                         _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
@@ -84,7 +84,7 @@
 
         with server_call_condition:
             server_complete_rpc_start_batch_result = (
-                server_request_call_event.operation_call.start_server_batch([
+                server_request_call_event.call.start_server_batch([
                     cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
                     cygrpc.SendStatusFromServerOperation(
                         _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -101,23 +101,24 @@
         client_complete_rpc_event = self.client_driver.event_with_tag(
             client_complete_rpc_tag)
 
-        return (_common.OperationResult(server_request_call_start_batch_result,
-                                        server_request_call_event.type,
-                                        server_request_call_event.success),
+        return (_common.OperationResult(
+            server_request_call_start_batch_result,
+            server_request_call_event.completion_type,
+            server_request_call_event.success), _common.OperationResult(
+                client_receive_initial_metadata_start_batch_result,
+                client_receive_initial_metadata_event.completion_type,
+                client_receive_initial_metadata_event.success),
                 _common.OperationResult(
-                    client_receive_initial_metadata_start_batch_result,
-                    client_receive_initial_metadata_event.type,
-                    client_receive_initial_metadata_event.success),
-                _common.OperationResult(client_complete_rpc_start_batch_result,
-                                        client_complete_rpc_event.type,
-                                        client_complete_rpc_event.success),
+                    client_complete_rpc_start_batch_result,
+                    client_complete_rpc_event.completion_type,
+                    client_complete_rpc_event.success), _common.OperationResult(
+                        server_send_initial_metadata_start_batch_result,
+                        server_send_initial_metadata_event.completion_type,
+                        server_send_initial_metadata_event.success),
                 _common.OperationResult(
-                    server_send_initial_metadata_start_batch_result,
-                    server_send_initial_metadata_event.type,
-                    server_send_initial_metadata_event.success),
-                _common.OperationResult(server_complete_rpc_start_batch_result,
-                                        server_complete_rpc_event.type,
-                                        server_complete_rpc_event.success),)
+                    server_complete_rpc_start_batch_result,
+                    server_complete_rpc_event.completion_type,
+                    server_complete_rpc_event.success),)
 
     def test_rpcs(self):
         expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index b429a20..c5cf606 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -63,7 +63,7 @@
 
         with self.server_condition:
             server_send_initial_metadata_start_batch_result = (
-                server_request_call_event.operation_call.start_server_batch([
+                server_request_call_event.call.start_server_batch([
                     cygrpc.SendInitialMetadataOperation(
                         _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
@@ -75,7 +75,7 @@
 
         with self.server_condition:
             server_complete_rpc_start_batch_result = (
-                server_request_call_event.operation_call.start_server_batch([
+                server_request_call_event.call.start_server_batch([
                     cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
                     cygrpc.SendStatusFromServerOperation(
                         _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -92,23 +92,24 @@
         client_complete_rpc_event = self.client_driver.event_with_tag(
             client_complete_rpc_tag)
 
-        return (_common.OperationResult(server_request_call_start_batch_result,
-                                        server_request_call_event.type,
-                                        server_request_call_event.success),
+        return (_common.OperationResult(
+            server_request_call_start_batch_result,
+            server_request_call_event.completion_type,
+            server_request_call_event.success), _common.OperationResult(
+                client_receive_initial_metadata_start_batch_result,
+                client_receive_initial_metadata_event.completion_type,
+                client_receive_initial_metadata_event.success),
                 _common.OperationResult(
-                    client_receive_initial_metadata_start_batch_result,
-                    client_receive_initial_metadata_event.type,
-                    client_receive_initial_metadata_event.success),
-                _common.OperationResult(client_complete_rpc_start_batch_result,
-                                        client_complete_rpc_event.type,
-                                        client_complete_rpc_event.success),
+                    client_complete_rpc_start_batch_result,
+                    client_complete_rpc_event.completion_type,
+                    client_complete_rpc_event.success), _common.OperationResult(
+                        server_send_initial_metadata_start_batch_result,
+                        server_send_initial_metadata_event.completion_type,
+                        server_send_initial_metadata_event.success),
                 _common.OperationResult(
-                    server_send_initial_metadata_start_batch_result,
-                    server_send_initial_metadata_event.type,
-                    server_send_initial_metadata_event.success),
-                _common.OperationResult(server_complete_rpc_start_batch_result,
-                                        server_complete_rpc_event.type,
-                                        server_complete_rpc_event.success),)
+                    server_complete_rpc_start_batch_result,
+                    server_complete_rpc_event.completion_type,
+                    server_complete_rpc_event.success),)
 
     def test_rpcs(self):
         expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index 87d0dd7..a5ec54e 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -175,12 +175,12 @@
 
         with server_call_condition:
             server_send_initial_metadata_start_batch_result = (
-                server_rpc_event.operation_call.start_server_batch([
+                server_rpc_event.call.start_server_batch([
                     cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
                                                         _EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
             server_send_first_message_start_batch_result = (
-                server_rpc_event.operation_call.start_server_batch([
+                server_rpc_event.call.start_server_batch([
                     cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
                 ], server_send_first_message_tag))
         server_send_initial_metadata_event = server_call_driver.event_with_tag(
@@ -189,11 +189,11 @@
             server_send_first_message_tag)
         with server_call_condition:
             server_send_second_message_start_batch_result = (
-                server_rpc_event.operation_call.start_server_batch([
+                server_rpc_event.call.start_server_batch([
                     cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
                 ], server_send_second_message_tag))
             server_complete_rpc_start_batch_result = (
-                server_rpc_event.operation_call.start_server_batch([
+                server_rpc_event.call.start_server_batch([
                     cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
                     cygrpc.SendStatusFromServerOperation(
                         (), cygrpc.StatusCode.ok, b'test details',
@@ -232,9 +232,8 @@
         self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
         self.assertIs(server_rpc_tag, server_rpc_event.tag)
         self.assertEqual(cygrpc.CompletionType.operation_complete,
-                         server_rpc_event.type)
-        self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
-        self.assertEqual(0, len(server_rpc_event.batch_operations))
+                         server_rpc_event.completion_type)
+        self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
 
 
 if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index e34892c..5453735 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -87,7 +87,8 @@
         shutdown_tag = object()
         server.shutdown(completion_queue, shutdown_tag)
         event = completion_queue.poll()
-        self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+        self.assertEqual(cygrpc.CompletionType.operation_complete,
+                         event.completion_type)
         self.assertIs(shutdown_tag, event.tag)
         del server
         del completion_queue
@@ -147,7 +148,7 @@
                 self.assertEqual(cygrpc.CallError.ok, call_result)
                 event = queue.poll(deadline)
                 self.assertEqual(cygrpc.CompletionType.operation_complete,
-                                 event.type)
+                                 event.completion_type)
                 self.assertTrue(event.success)
                 self.assertIs(tag, event.tag)
             except Exception as error:
@@ -205,22 +206,20 @@
 
         request_event = self.server_completion_queue.poll(cygrpc_deadline)
         self.assertEqual(cygrpc.CompletionType.operation_complete,
-                         request_event.type)
-        self.assertIsInstance(request_event.operation_call, cygrpc.Call)
+                         request_event.completion_type)
+        self.assertIsInstance(request_event.call, cygrpc.Call)
         self.assertIs(server_request_tag, request_event.tag)
-        self.assertEqual(0, len(request_event.batch_operations))
         self.assertTrue(
             test_common.metadata_transmitted(client_initial_metadata,
-                                             request_event.request_metadata))
-        self.assertEqual(METHOD, request_event.request_call_details.method)
-        self.assertEqual(self.expected_host,
-                         request_event.request_call_details.host)
+                                             request_event.invocation_metadata))
+        self.assertEqual(METHOD, request_event.call_details.method)
+        self.assertEqual(self.expected_host, request_event.call_details.host)
         self.assertLess(
-            abs(DEADLINE - float(request_event.request_call_details.deadline)),
+            abs(DEADLINE - float(request_event.call_details.deadline)),
             DEADLINE_TOLERANCE)
 
         server_call_tag = object()
-        server_call = request_event.operation_call
+        server_call = request_event.call
         server_initial_metadata = (
             (SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
         server_trailing_metadata = (
@@ -322,7 +321,7 @@
         ], "Client prologue")
 
         request_event = self.server_completion_queue.poll(cygrpc_deadline)
-        server_call = request_event.operation_call
+        server_call = request_event.call
 
         def perform_server_operations(operations, description):
             return self._perform_operations(operations, server_call,