Merge branch 'metadata_filter' of github.com:ctiller/grpc into metadata_filter
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index e4c24a8..246e839 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -60,24 +60,25 @@
                   method, host, Timespec deadline not None):
     if queue.is_shutting_down:
       raise ValueError("queue must not be shutting down or shutdown")
-    cdef Slice method_slice = Slice.from_bytes(method)
-    cdef Slice host_slice
-    cdef grpc_slice *host_c_slice = NULL
+    cdef grpc_slice method_slice = _slice_from_bytes(method)
+    cdef grpc_slice host_slice
+    cdef grpc_slice *host_slice_ptr = NULL
     if host is not None:
-      host_slice = Slice.from_bytes(host)
-      host_c_slice = &host_slice.c_slice
-    else:
-      host_slice = Slice()
+      host_slice = _slice_from_bytes(host)
+      host_slice_ptr = &host_slice
     cdef Call operation_call = Call()
-    operation_call.references = [self, method_slice, host_slice, queue]
+    operation_call.references = [self, queue]
     cdef grpc_call *parent_call = NULL
     if parent is not None:
       parent_call = parent.c_call
     with nogil:
       operation_call.c_call = grpc_channel_create_call(
           self.c_channel, parent_call, flags,
-          queue.c_completion_queue, method_slice.c_slice, host_c_slice,
+          queue.c_completion_queue, method_slice, host_slice_ptr,
           deadline.c_time, NULL)
+      grpc_slice_unref(method_slice)
+      if host_slice_ptr:
+        grpc_slice_unref(host_slice)
     return operation_call
 
   def check_connectivity_state(self, bint try_to_connect):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 2b5cce8..d8df6c2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -51,6 +51,7 @@
     cdef CallDetails request_call_details = None
     cdef Metadata request_metadata = None
     cdef Operations batch_operations = None
+    cdef Operation batch_operation = None
     if event.type == GRPC_QUEUE_TIMEOUT:
       return Event(
           event.type, False, None, None, None, None, False, None)
@@ -69,10 +70,15 @@
         user_tag = tag.user_tag
         operation_call = tag.operation_call
         request_call_details = tag.request_call_details
-        request_metadata = tag.request_metadata
-        if request_metadata is not None:
+        if tag.request_metadata is not None:
+          request_metadata = tag.request_metadata
           request_metadata._claim_slice_ownership()
         batch_operations = tag.batch_operations
+        if tag.batch_operations is not None:
+          for op in batch_operations.operations:
+            batch_operation = <Operation>op
+            if batch_operation._received_metadata is not None:
+              batch_operation._received_metadata._claim_slice_ownership()
         if tag.is_new_request:
           # Stuff in the tag not explicitly handled by us needs to live through
           # the life of the call
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index e927605..c4a1711 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,6 +28,11 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 
+cdef bytes _slice_bytes(grpc_slice slice)
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil
+cdef grpc_slice _slice_from_bytes(bytes value) nogil
+
+
 cdef class Timespec:
 
   cdef gpr_timespec c_time
@@ -70,17 +75,6 @@
   cdef readonly Operations batch_operations
 
 
-cdef class Slice:
-
-  cdef grpc_slice c_slice
-
-  cdef void _assign_slice(self, grpc_slice new_slice) nogil
-  @staticmethod
-  cdef Slice from_slice(grpc_slice slice)
-  @staticmethod
-  cdef bytes bytes_from_slice(grpc_slice slice)
-
-
 cdef class ByteBuffer:
 
   cdef grpc_byte_buffer *c_byte_buffer
@@ -108,17 +102,13 @@
 cdef class Metadatum:
 
   cdef grpc_metadata c_metadata
-  cdef Slice _key,
-  cdef Slice _value
+  cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
 
 
 cdef class Metadata:
 
   cdef grpc_metadata_array c_metadata_array
-  cdef bint owns_metadata_slices
-  cdef object metadata
   cdef void _claim_slice_ownership(self)
-  cdef void _drop_slice_ownership(self)
 
 
 cdef class Operation:
@@ -127,7 +117,7 @@
   cdef ByteBuffer _received_message
   cdef Metadata _received_metadata
   cdef grpc_status_code _received_status_code
-  cdef Slice _received_status_details
+  cdef grpc_slice _status_details
   cdef int _received_cancelled
   cdef readonly bint is_valid
   cdef object references
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index d7a7713..d052b3f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -29,6 +29,26 @@
 
 from libc.stdint cimport intptr_t
 
+
+cdef bytes _slice_bytes(grpc_slice slice):
+  cdef void *start = grpc_slice_start_ptr(slice)
+  cdef size_t length = grpc_slice_length(slice)
+  return (<const char *>start)[:length]
+
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
+  cdef void *start = grpc_slice_start_ptr(slice)
+  cdef size_t length = grpc_slice_length(slice)
+  return grpc_slice_from_copied_buffer(<const char *>start, length)
+
+cdef grpc_slice _slice_from_bytes(bytes value) nogil:
+  cdef const char *value_ptr
+  cdef size_t length
+  with gil:
+    value_ptr = <const char *>value
+    length = len(value)
+  return grpc_slice_from_copied_buffer(value_ptr, length)
+
+
 class ConnectivityState:
   idle = GRPC_CHANNEL_IDLE
   connecting = GRPC_CHANNEL_CONNECTING
@@ -189,11 +209,11 @@
 
   @property
   def method(self):
-    return Slice.bytes_from_slice(self.c_details.method)
+    return _slice_bytes(self.c_details.method)
 
   @property
   def host(self):
-    return Slice.bytes_from_slice(self.c_details.host)
+    return _slice_bytes(self.c_details.host)
 
   @property
   def deadline(self):
@@ -227,46 +247,6 @@
     self.is_new_request = is_new_request
 
 
-cdef class Slice:
-
-  def __cinit__(self):
-    with nogil:
-      grpc_init()
-      self.c_slice = grpc_empty_slice()
-
-  cdef void _assign_slice(self, grpc_slice new_slice) nogil:
-    grpc_slice_unref(self.c_slice)
-    self.c_slice = new_slice
-
-  @staticmethod
-  def from_bytes(bytes data):
-    cdef Slice self = Slice()
-    self._assign_slice(grpc_slice_from_copied_buffer(data, len(data)))
-    return self
-
-  @staticmethod
-  cdef Slice from_slice(grpc_slice slice):
-    cdef Slice self = Slice()
-    grpc_slice_ref(slice)
-    self._assign_slice(slice)
-    return self
-
-  @staticmethod
-  cdef bytes bytes_from_slice(grpc_slice slice):
-    with nogil:
-      pointer = grpc_slice_start_ptr(slice)
-      length = grpc_slice_length(slice)
-    return (<char *>pointer)[:length]
-
-  def bytes(self):
-    return Slice.bytes_from_slice(self.c_slice)
-
-  def __dealloc__(self):
-    with nogil:
-      grpc_slice_unref(self.c_slice)
-      grpc_shutdown()
-
-
 cdef class ByteBuffer:
 
   def __cinit__(self, bytes data):
@@ -416,20 +396,21 @@
 
 cdef class Metadatum:
 
-  # TODO(atash) this should just accept Slice objects.
   def __cinit__(self, bytes key, bytes value):
-    self._key = Slice.from_bytes(key)
-    self._value = Slice.from_bytes(value)
-    self.c_metadata.key = self._key.c_slice
-    self.c_metadata.value = self._value.c_slice
+    self.c_metadata.key = _slice_from_bytes(key)
+    self.c_metadata.value = _slice_from_bytes(value)
+
+  cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
+    destination[0].key = _copy_slice(self.c_metadata.key)
+    destination[0].value = _copy_slice(self.c_metadata.value)
 
   @property
   def key(self):
-    return self._key.bytes()
+    return _slice_bytes(self.c_metadata.key)
 
   @property
   def value(self):
-    return self._value.bytes()
+    return _slice_bytes(self.c_metadata.value)
 
   def __len__(self):
     return 2
@@ -445,6 +426,9 @@
   def __iter__(self):
     return iter((self.key, self.value))
 
+  def __dealloc__(self):
+    grpc_slice_unref(self.c_metadata.key)
+    grpc_slice_unref(self.c_metadata.value)
 
 cdef class _MetadataIterator:
 
@@ -466,34 +450,27 @@
     else:
       raise StopIteration
 
-cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
-  cdef void *start = grpc_slice_start_ptr(slice)
-  cdef size_t length = grpc_slice_length(slice)
-  return grpc_slice_from_copied_buffer(<const char *>start, length)
 
 cdef class Metadata:
 
-  def __cinit__(self, metadata):
+  def __cinit__(self, metadata_iterable):
     with nogil:
       grpc_init()
       grpc_metadata_array_init(&self.c_metadata_array)
-    self.owns_metadata_slices = False
-    self.metadata = list(metadata)
-    for metadatum in self.metadata:
+    metadata = list(metadata_iterable)
+    for metadatum in metadata:
       if not isinstance(metadatum, Metadatum):
         raise TypeError("expected list of Metadatum")
-    self.c_metadata_array.count = len(self.metadata)
-    self.c_metadata_array.capacity = len(self.metadata)
+    self.c_metadata_array.count = len(metadata)
+    self.c_metadata_array.capacity = len(metadata)
     with nogil:
       self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
           self.c_metadata_array.count*sizeof(grpc_metadata)
       )
     for i in range(self.c_metadata_array.count):
-      self.c_metadata_array.metadata[i] = (
-          (<Metadatum>self.metadata[i]).c_metadata)
+      (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
 
   def __dealloc__(self):
-    self._drop_slice_ownership()
     with nogil:
       # this frees the allocated memory for the grpc_metadata_array (although
       # it'd be nice if that were documented somewhere...)
@@ -507,30 +484,26 @@
   def __getitem__(self, size_t i):
     if i >= self.c_metadata_array.count:
       raise IndexError
-    return Metadatum(
-        key=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].key),
-        value=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].value))
+    key = _slice_bytes(self.c_metadata_array.metadata[i].key)
+    value = _slice_bytes(self.c_metadata_array.metadata[i].value)
+    return Metadatum(key=key, value=value)
 
   def __iter__(self):
     return _MetadataIterator(self)
 
   cdef void _claim_slice_ownership(self):
-    if self.owns_metadata_slices:
-      return
+    cdef grpc_metadata_array new_c_metadata_array
+    grpc_metadata_array_init(&new_c_metadata_array)
+    new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
+        self.c_metadata_array.count*sizeof(grpc_metadata))
+    new_c_metadata_array.count = self.c_metadata_array.count
     for i in range(self.c_metadata_array.count):
-      self.c_metadata_array.metadata[i].key = _copy_slice(
+      new_c_metadata_array.metadata[i].key = _copy_slice(
           self.c_metadata_array.metadata[i].key)
-      self.c_metadata_array.metadata[i].value = _copy_slice(
+      new_c_metadata_array.metadata[i].value = _copy_slice(
           self.c_metadata_array.metadata[i].value)
-    self.owns_metadata_slices = True
-
-  cdef void _drop_slice_ownership(self):
-    if not self.owns_metadata_slices:
-      return
-    for i in range(self.c_metadata_array.count):
-      grpc_slice_unref(self.c_metadata_array.metadata[i].key)
-      grpc_slice_unref(self.c_metadata_array.metadata[i].value)
-    self.owns_metadata_slices = False
+    grpc_metadata_array_destroy(&self.c_metadata_array)
+    self.c_metadata_array = new_c_metadata_array
 
 
 cdef class Operation:
@@ -538,7 +511,7 @@
   def __cinit__(self):
     grpc_init()
     self.references = []
-    self._received_status_details = Slice()
+    self._status_details = grpc_empty_slice()
     self.is_valid = False
 
   @property
@@ -595,13 +568,13 @@
   def received_status_details(self):
     if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
       raise TypeError("self must be an operation receiving status details")
-    return self._received_status_details.bytes()
+    return _slice_bytes(self._status_details)
 
   @property
   def received_status_details_or_none(self):
     if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
       return None
-    return self._received_status_details.bytes()
+    return _slice_bytes(self._status_details)
 
   @property
   def received_cancelled(self):
@@ -617,6 +590,7 @@
     return False if self._received_cancelled == 0 else True
 
   def __dealloc__(self):
+    grpc_slice_unref(self._status_details)
     grpc_shutdown()
 
 def operation_send_initial_metadata(Metadata metadata, int flags):
@@ -657,10 +631,10 @@
   op.c_op.data.send_status_from_server.trailing_metadata = (
       metadata.c_metadata_array.metadata)
   op.c_op.data.send_status_from_server.status = code
-  cdef Slice details_slice = Slice.from_bytes(details)
-  op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice
+  grpc_slice_unref(op._status_details)
+  op._status_details = _slice_from_bytes(details)
+  op.c_op.data.send_status_from_server.status_details = &op._status_details
   op.references.append(metadata)
-  op.references.append(details_slice)
   op.is_valid = True
   return op
 
@@ -696,7 +670,7 @@
   op.c_op.data.receive_status_on_client.status = (
       &op._received_status_code)
   op.c_op.data.receive_status_on_client.status_details = (
-      &op._received_status_details.c_slice)
+      &op._status_details)
   op.is_valid = True
   return op