Redesign of compression algorithm propagation based on metadata
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 918cb2d..ad42bbb 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -31,6 +31,7 @@
  *
  */
 
+#include <assert.h>
 #include <string.h>
 
 #include "src/core/channel/compress_filter.h"
@@ -42,16 +43,16 @@
 
 typedef struct call_data {
   gpr_slice_buffer slices;
+  grpc_linked_mdelem compression_algorithm_storage;
   int remaining_slice_bytes;
-  int no_compress;  /**< whether to skip compression for this specific call */
-
-  grpc_linked_mdelem compression_algorithm;
+  grpc_compression_algorithm compression_algorithm;
+  gpr_uint8 has_compression_algorithm;
 } call_data;
 
 typedef struct channel_data {
-  grpc_compression_algorithm compress_algorithm;
-  grpc_mdelem *compress_algorithm_md;
-  grpc_mdelem *no_compression_md;
+  grpc_mdstr *mdstr_compression_algorithm_key;
+  grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+  grpc_compression_algorithm default_compression_algorithm;
 } channel_data;
 
 /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
@@ -70,14 +71,41 @@
   return did_compress;
 }
 
+/** For each \a md element from the incoming metadata, filter out the entry for
+ * "grpc-compression-algorithm", using its value to populate the call data's
+ * compression_algorithm field. */
+static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+  grpc_call_element *elem = user_data;
+  call_data *calld = elem->call_data;
+  channel_data *channeld = elem->channel_data;
+
+  if (md->key == channeld->mdstr_compression_algorithm_key) {
+    assert(GPR_SLICE_LENGTH(md->value->slice) ==
+           sizeof(grpc_compression_algorithm));
+    memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice),
+           sizeof(grpc_compression_algorithm));
+    calld->has_compression_algorithm = 1;
+    return NULL;
+  }
+
+  return md;
+}
+
+static int skip_compression(channel_data *channeld, call_data *calld) {
+  if (calld->has_compression_algorithm &&
+      (calld->compression_algorithm == GRPC_COMPRESS_NONE)) {
+    return 1;
+  }
+  /* no per-call compression override */
+  return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
+}
+
 static void process_send_ops(grpc_call_element *elem,
                              grpc_stream_op_buffer *send_ops) {
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
   size_t i, j;
-  int begin_message_index = -1;
-  int metadata_op_index = -1;
-  grpc_mdelem *actual_compression_md;
+  int did_compress = 0;
 
   /* buffer up slices until we've processed all the expected ones (as given by
    * GRPC_OP_BEGIN_MESSAGE) */
@@ -85,73 +113,83 @@
     grpc_stream_op *sop = &send_ops->ops[i];
     switch (sop->type) {
       case GRPC_OP_BEGIN_MESSAGE:
-        begin_message_index = i;
         calld->remaining_slice_bytes = sop->data.begin_message.length;
-        calld->no_compress =
-            !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS);
+        /* TODO(dgq): we may want to get rid of the flags mechanism to have
+         * exceptions to compression: we can rely solely on metadata to set NONE
+         * as the compression algorithm */
+        if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
+          calld->has_compression_algorithm = 1;  /* GPR_TRUE */
+          calld->compression_algorithm = GRPC_COMPRESS_NONE;
+        }
+        break;
+      case GRPC_OP_METADATA:
+        /* Parse incoming request for compression. If any, it'll be available at
+         * calld->compression_algorithm */
+        grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter,
+                                   elem);
+        if (!calld->has_compression_algorithm) {
+          /* If no algorithm was found in the metadata and we aren't
+           * exceptionally skipping compression, fall back to the channel
+           * default */
+          calld->compression_algorithm =
+              channeld->default_compression_algorithm;
+          calld->has_compression_algorithm = 1; /* GPR_TRUE */
+        }
         break;
       case GRPC_OP_SLICE:
-        if (calld->no_compress) continue;
+        if (skip_compression(channeld, calld)) continue;
         GPR_ASSERT(calld->remaining_slice_bytes > 0);
         /* add to calld->slices */
         gpr_slice_buffer_add(&calld->slices, sop->data.slice);
         calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice);
         if (calld->remaining_slice_bytes == 0) {
           /* compress */
-          if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) {
-            calld->no_compress = 1;  /* GPR_TRUE */
-          }
+          did_compress =
+              compress_send_sb(calld->compression_algorithm, &calld->slices);
+        }
+        break;
+      case GRPC_NO_OP:
+        ;  /* fallthrough */
+    }
+  }
+
+  /* We need to:
+   * - (OP_SLICE) If compression happened, replace the input slices with the
+   *   compressed ones.
+   * - (BEGIN_MESSAGE) Update the message info (size, flags).
+   * - (OP_METADATA) Convey the compression configuration */
+  for (i = 0, j = 0; i < send_ops->nops; ++i) {
+    grpc_stream_op *sop = &send_ops->ops[i];
+    switch (sop->type) {
+      case GRPC_OP_BEGIN_MESSAGE:
+        if (did_compress) {
+          sop->data.begin_message.length = calld->slices.length;
+          sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+        } else {
+          /* either because the user requested the exception or because compressing
+           * would have resulted in a larger output */
+          calld->compression_algorithm = GRPC_COMPRESS_NONE;
+          /* reset the flag compression bit */
+          sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
         }
         break;
       case GRPC_OP_METADATA:
-        /* Save the index of the first metadata op, to be processed after we
-         * know whether compression actually happened */
-        if (metadata_op_index < 0) metadata_op_index = i;
+        grpc_metadata_batch_add_head(
+            &(sop->data.metadata), &calld->compression_algorithm_storage,
+            grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+                                [calld->compression_algorithm]));
         break;
-      case GRPC_NO_OP:
-        ;  /* fallthrough, ignore */
-    }
-  }
-
-  if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */
-    return;
-  }
-
-  /* update both the metadata and the begin_message's flags */
-  if (calld->no_compress) {
-    /* either because the user requested the exception or because compressing
-     * would have resulted in a larger output */
-    channeld->compress_algorithm = GRPC_COMPRESS_NONE;
-    actual_compression_md = channeld->no_compression_md;
-    /* reset the flag compression bit */
-    send_ops->ops[begin_message_index].data.begin_message.flags &=
-        ~GRPC_WRITE_INTERNAL_COMPRESS;
-  } else {  /* DID compress */
-    actual_compression_md = channeld->compress_algorithm_md;
-    /* at this point, calld->slices contains the *compressed* slices from
-     * send_ops->ops[*]->data.slice. We now replace these input slices with the
-     * compressed ones. */
-    for (i = 0, j = 0; i < send_ops->nops; ++i) {
-      grpc_stream_op *sop = &send_ops->ops[i];
-      GPR_ASSERT(j < calld->slices.count);
-      switch (sop->type) {
-        case GRPC_OP_SLICE:
+      case GRPC_OP_SLICE:
+        if (did_compress) {
+          GPR_ASSERT(j < calld->slices.count);
           gpr_slice_unref(sop->data.slice);
           sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
-          break;
-        case GRPC_OP_BEGIN_MESSAGE:
-          sop->data.begin_message.length = calld->slices.length;
-          sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
-        case GRPC_NO_OP:
-        case GRPC_OP_METADATA:
-          ;  /* fallthrough, ignore */
-      }
+        }
+        break;
+      case GRPC_NO_OP:
+        ;  /* fallthrough */
     }
   }
-
-  grpc_metadata_batch_add_head(
-      &(send_ops->ops[metadata_op_index].data.metadata),
-      &calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md));
 }
 
 /* Called either:
@@ -189,6 +227,7 @@
 
   /* initialize members */
   gpr_slice_buffer_init(&calld->slices);
+  calld->has_compression_algorithm = 0;
 
   if (initial_op) {
     if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
@@ -209,17 +248,23 @@
                               const grpc_channel_args *args, grpc_mdctx *mdctx,
                               int is_first, int is_last) {
   channel_data *channeld = elem->channel_data;
+  grpc_compression_algorithm algo_idx;
   const grpc_compression_level clevel =
       grpc_channel_args_get_compression_level(args);
-  const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE;
 
-  channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer(
-      mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel));
-  channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel);
+  channeld->default_compression_algorithm =
+      grpc_compression_algorithm_for_level(clevel);
 
-  channeld->no_compression_md = grpc_mdelem_from_string_and_buffer(
-      mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg,
-      sizeof(none_alg));
+  channeld->mdstr_compression_algorithm_key =
+      grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
+
+  for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+    channeld->mdelem_compression_algorithms[algo_idx] =
+        grpc_mdelem_from_metadata_strings(
+            mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
+            grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx,
+                                   sizeof(algo_idx)));
+  }
 
   /* The first and the last filters tend to be implemented differently to
      handle the case that there's no 'next' filter to call on the up or down
@@ -231,8 +276,13 @@
 /* Destructor for channel data */
 static void destroy_channel_elem(grpc_channel_element *elem) {
   channel_data *channeld = elem->channel_data;
-  grpc_mdelem_unref(channeld->compress_algorithm_md);
-  grpc_mdelem_unref(channeld->no_compression_md);
+  grpc_compression_algorithm algo_idx;
+
+  grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
+  for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
+       ++algo_idx) {
+    grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
+  }
 }
 
 const grpc_channel_filter grpc_compress_filter = {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4125741..2b2d92c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -209,8 +209,8 @@
   /* Received call statuses from various sources */
   received_status status[STATUS_SOURCE_COUNT];
 
-  /** Compression level for the call */
-  grpc_compression_level compression_level;
+  /** Compression algorithm for the call */
+  grpc_compression_algorithm compression_algorithm;
 
   /* Contexts for various subsystems (security, tracing, ...). */
   grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -395,9 +395,9 @@
   }
 }
 
-static void set_compression_level(grpc_call *call,
-                                         grpc_compression_level clevel) {
-  call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+                                      grpc_compression_algorithm algo) {
+  call->compression_algorithm = algo;
 }
 
 static void set_status_details(grpc_call *call, status_source source,
@@ -651,12 +651,10 @@
   /* some aliases for readability */
   gpr_slice *slices = call->incoming_message.slices;
   const size_t nslices = call->incoming_message.count;
-  const grpc_compression_algorithm compression_algorithm =
-      grpc_compression_algorithm_for_level(call->compression_level);
 
-  if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) {
-    byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices,
-                                                         compression_algorithm);
+  if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
+    byte_buffer = grpc_raw_compressed_byte_buffer_create(
+        slices, nslices, call->compression_algorithm);
   } else {
     byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
   }
@@ -683,12 +681,11 @@
    * compression level should already be present in the call, as parsed off its
    * corresponding metadata. */
   if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
-      (call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) {
+      (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
     char *message = NULL;
     gpr_asprintf(
         &message, "Invalid compression algorithm (%s) for compressed message.",
-        grpc_compression_algorithm_name(
-            grpc_compression_algorithm_for_level(call->compression_level)));
+        grpc_compression_algorithm_name(call->compression_algorithm));
     cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1);
   }
   /* stash away parameters, and prepare for incoming slices */
@@ -1183,8 +1180,8 @@
     } else if (key == grpc_channel_get_message_string(call->channel)) {
       set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
     } else if (key ==
-               grpc_channel_get_compresssion_level_string(call->channel)) {
-      set_compression_level(call, decode_compression(md));
+               grpc_channel_get_compresssion_algorithm_string(call->channel)) {
+      set_compression_algorithm(call, decode_compression(md));
     } else {
       dest = &call->buffered_metadata[is_trailing];
       if (dest->count == dest->capacity) {
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 6353a83..e841a5d 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -64,7 +64,7 @@
   grpc_mdctx *metadata_context;
   /** mdstr for the grpc-status key */
   grpc_mdstr *grpc_status_string;
-  grpc_mdstr *grpc_compression_level_string;
+  grpc_mdstr *grpc_compression_algorithm_string;
   grpc_mdstr *grpc_message_string;
   grpc_mdstr *path_string;
   grpc_mdstr *authority_string;
@@ -99,8 +99,8 @@
   gpr_ref_init(&channel->refs, 1 + is_client);
   channel->metadata_context = mdctx;
   channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
-  channel->grpc_compression_level_string =
-      grpc_mdstr_from_string(mdctx, "grpc-compression-level");
+  channel->grpc_compression_algorithm_string =
+      grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
   channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
   for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
     char buf[GPR_LTOA_MIN_BUFSIZE];
@@ -202,7 +202,7 @@
     grpc_mdelem_unref(channel->grpc_status_elem[i]);
   }
   grpc_mdstr_unref(channel->grpc_status_string);
-  grpc_mdstr_unref(channel->grpc_compression_level_string);
+  grpc_mdstr_unref(channel->grpc_compression_algorithm_string);
   grpc_mdstr_unref(channel->grpc_message_string);
   grpc_mdstr_unref(channel->path_string);
   grpc_mdstr_unref(channel->authority_string);
@@ -261,8 +261,8 @@
   return channel->grpc_status_string;
 }
 
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
-  return channel->grpc_compression_level_string;
+grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) {
+  return channel->grpc_compression_algorithm_string;
 }
 
 grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index f838129..f4df06a 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,8 @@
 grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
                                                  int status_code);
 grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+    grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
 gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
 
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index fe41780..8cc4cb7 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -251,10 +251,10 @@
   config.tear_down_data(&f);
 }
 
-static void test_invoke_request_with_excepcionally_uncompressed_payload(
+static void test_invoke_request_with_exceptionally_uncompressed_payload(
     grpc_end2end_test_config config) {
   request_with_payload_template(
-      config, "test_invoke_request_with_excepcionally_uncompressed_payload",
+      config, "test_invoke_request_with_exceptionally_uncompressed_payload",
       GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE);
 }
 
@@ -276,7 +276,7 @@
 
 
 void grpc_end2end_tests(grpc_end2end_test_config config) {
-  test_invoke_request_with_excepcionally_uncompressed_payload(config);
+  test_invoke_request_with_exceptionally_uncompressed_payload(config);
   test_invoke_request_with_compressed_payload(config);
   test_invoke_request_with_uncompressed_payload(config);
 }