WIP in *_end2end_test.cc. Tests pass. Fixed leaks and introduced concept of compression request thru MD
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 5e10875..88954e2 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -38,6 +38,7 @@
 #include <memory>
 #include <string>
 
+#include <grpc/compression.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
 #include <grpc++/config.h>
@@ -107,6 +108,17 @@
     creds_ = creds;
   }
 
+  grpc_compression_level get_compression_level() const {
+    return compression_level_;
+  }
+  void set_compression_level(grpc_compression_level level);
+
+  grpc_compression_algorithm get_compression_algorithm() const {
+    return compression_algorithm_;
+  }
+  void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
+
   void TryCancel();
 
  private:
@@ -157,6 +169,9 @@
   std::multimap<grpc::string, grpc::string> send_initial_metadata_;
   std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
   std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+  grpc_compression_level compression_level_;
+  grpc_compression_algorithm compression_algorithm_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 326b6a1..a2f0a2f 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -36,6 +36,7 @@
 
 #include <map>
 
+#include <grpc/compression.h>
 #include <grpc/support/time.h>
 #include <grpc++/config.h>
 #include <grpc++/time.h>
@@ -97,6 +98,16 @@
     return client_metadata_;
   }
 
+  grpc_compression_level get_compression_level() const {
+    return compression_level_;
+  }
+  void set_compression_level(grpc_compression_level level);
+
+  grpc_compression_algorithm get_compression_algorithm() const {
+    return compression_algorithm_;
+  }
+  void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
  private:
   friend class ::grpc::Server;
   template <class W, class R>
@@ -142,6 +153,9 @@
   std::multimap<grpc::string, grpc::string> client_metadata_;
   std::multimap<grpc::string, grpc::string> initial_metadata_;
   std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+  grpc_compression_level compression_level_;
+  grpc_compression_algorithm compression_algorithm_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc/compression.h b/include/grpc/compression.h
index 1cff5d2..dd7e1d0 100644
--- a/include/grpc/compression.h
+++ b/include/grpc/compression.h
@@ -34,6 +34,10 @@
 #ifndef GRPC_COMPRESSION_H
 #define GRPC_COMPRESSION_H
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /** To be used in channel arguments */
 #define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level"
 
@@ -76,4 +80,8 @@
 grpc_compression_algorithm grpc_compression_algorithm_for_level(
     grpc_compression_level level);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* GRPC_COMPRESSION_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index f5fe87d..6100a90 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -50,7 +50,8 @@
 } call_data;
 
 typedef struct channel_data {
-  grpc_mdstr *mdstr_compression_algorithm_key;
+  grpc_mdstr *mdstr_request_compression_algorithm_key;
+  grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
   grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
   grpc_compression_algorithm default_compression_algorithm;
 } channel_data;
@@ -72,14 +73,14 @@
 }
 
 /** For each \a md element from the incoming metadata, filter out the entry for
- * "grpc-compression-algorithm", using its value to populate the call data's
+ * "grpc-encoding", using its value to populate the call data's
  * compression_algorithm field. */
 static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
   grpc_call_element *elem = user_data;
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
 
-  if (md->key == channeld->mdstr_compression_algorithm_key) {
+  if (md->key == channeld->mdstr_request_compression_algorithm_key) {
     const char *md_c_str = grpc_mdstr_as_c_string(md->value);
     if (!grpc_compression_algorithm_parse(md_c_str,
                                           &calld->compression_algorithm)) {
@@ -184,7 +185,6 @@
         break;
       case GRPC_OP_SLICE:
         if (did_compress) {
-          gpr_slice_unref(sop->data.slice);
           if (j < calld->slices.count) {
             sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
           }
@@ -259,7 +259,10 @@
   channeld->default_compression_algorithm =
       grpc_compression_algorithm_for_level(clevel);
 
-  channeld->mdstr_compression_algorithm_key =
+  channeld->mdstr_request_compression_algorithm_key =
+      grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+
+  channeld->mdstr_outgoing_compression_algorithm_key =
       grpc_mdstr_from_string(mdctx, "grpc-encoding");
 
   for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
@@ -267,7 +270,8 @@
     GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
     channeld->mdelem_compression_algorithms[algo_idx] =
         grpc_mdelem_from_metadata_strings(
-            mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
+            mdctx,
+            grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
             grpc_mdstr_from_string(mdctx, algorith_name));
   }
 
@@ -283,7 +287,8 @@
   channel_data *channeld = elem->channel_data;
   grpc_compression_algorithm algo_idx;
 
-  grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
+  grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
+  grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
   for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
        ++algo_idx) {
     grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index ea66796..3a196eb 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -36,6 +36,8 @@
 
 #include "src/core/channel/channel_stack.h"
 
+#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request"
+
 /** Message-level compression filter.
  *
  * See <grpc/compression.h> for the available compression levels.
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 37dadec..5f489c0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1243,7 +1243,7 @@
     } else if (key == grpc_channel_get_message_string(call->channel)) {
       set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
     } else if (key ==
-               grpc_channel_get_compresssion_algorithm_string(call->channel)) {
+               grpc_channel_get_compression_algorithm_string(call->channel)) {
       set_compression_algorithm(call, decode_compression(md));
     } else {
       dest = &call->buffered_metadata[is_trailing];
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3dcb22..cab99e7 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -273,7 +273,7 @@
   return channel->grpc_status_string;
 }
 
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
     grpc_channel *channel) {
   return channel->grpc_compression_algorithm_string;
 }
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 8d0fe81..66924ad 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,7 @@
 grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
                                                  int status_code);
 grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
     grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
 gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index be46c54..cfa869e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -244,10 +244,7 @@
   if (grpc_channel_args_is_census_enabled(args)) {
     filters[n++] = &grpc_client_census_filter;
     } */
-  if (grpc_channel_args_get_compression_level(args) >
-      GRPC_COMPRESS_LEVEL_NONE) {
-    filters[n++] = &grpc_compress_filter;
-  }
+  filters[n++] = &grpc_compress_filter;
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
   channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 72cdd49..0eba554 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -34,9 +34,12 @@
 #include <grpc++/client_context.h>
 
 #include <grpc/grpc.h>
+#include <grpc/support/string_util.h>
 #include <grpc++/credentials.h>
 #include <grpc++/time.h>
 
+#include "src/core/channel/compress_filter.h"
+
 namespace grpc {
 
 ClientContext::ClientContext()
@@ -75,6 +78,24 @@
   }
 }
 
+void ClientContext::set_compression_level(grpc_compression_level level) {
+  const grpc_compression_algorithm algorithm_for_level =
+      grpc_compression_algorithm_for_level(level);
+  set_compression_algorithm(algorithm_for_level);
+}
+
+void ClientContext::set_compression_algorithm(
+    grpc_compression_algorithm algorithm) {
+  char* algorithm_name = NULL;
+  if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+    gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+            algorithm);
+    abort();
+  }
+  GPR_ASSERT(algorithm_name != NULL);
+  AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
 void ClientContext::TryCancel() {
   if (call_) {
     grpc_call_cancel(call_);
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 268e4f6..337f680 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -103,7 +103,9 @@
       : byte_count_(0), backup_count_(0) {
     grpc_byte_buffer_reader_init(&reader_, buffer);
   }
-  ~GrpcBufferReader() GRPC_OVERRIDE {}
+  ~GrpcBufferReader() GRPC_OVERRIDE {
+    grpc_byte_buffer_reader_destroy(&reader_);
+  }
 
   bool Next(const void** data, int* size) GRPC_OVERRIDE {
     if (backup_count_ > 0) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 699895a..087e28d 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -39,6 +39,8 @@
 #include <grpc++/impl/sync.h>
 #include <grpc++/time.h>
 
+#include "src/core/channel/compress_filter.h"
+
 namespace grpc {
 
 // CompletionOp
@@ -146,4 +148,22 @@
   return completion_op_ && completion_op_->CheckCancelled(cq_);
 }
 
+void ServerContext::set_compression_level(grpc_compression_level level) {
+  const grpc_compression_algorithm algorithm_for_level =
+      grpc_compression_algorithm_for_level(level);
+  set_compression_algorithm(algorithm_for_level);
+}
+
+void ServerContext::set_compression_algorithm(
+    grpc_compression_algorithm algorithm) {
+  char* algorithm_name = NULL;
+  if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+    gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+            algorithm);
+    abort();
+  }
+  GPR_ASSERT(algorithm_name != NULL);
+  AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
 }  // namespace grpc
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index ca16bc7..a605745 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -45,6 +45,7 @@
 
 #include "test/core/end2end/cq_verifier.h"
 #include "src/core/channel/channel_args.h"
+#include "src/core/channel/compress_filter.h"
 
 enum { TIMEOUT = 200000 };
 
@@ -240,6 +241,7 @@
 
   cq_verifier_destroy(cqv);
 
+  gpr_slice_unref(request_payload_slice);
   grpc_byte_buffer_destroy(request_payload);
   grpc_byte_buffer_destroy(request_payload_recv);
 
@@ -279,13 +281,13 @@
   grpc_metadata gzip_compression_override;
   grpc_metadata none_compression_override;
 
-  gzip_compression_override.key = "grpc-encoding";
+  gzip_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
   gzip_compression_override.value = "gzip";
   gzip_compression_override.value_length = 4;
   memset(&gzip_compression_override.internal_data, 0,
          sizeof(gzip_compression_override.internal_data));
 
-  none_compression_override.key = "grpc-encoding";
+  none_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
   none_compression_override.value = "none";
   none_compression_override.value_length = 4;
   memset(&none_compression_override.internal_data, 0,
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 45ba8b0..49070a7 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -226,10 +226,11 @@
                     int num_rpcs) {
   EchoRequest request;
   EchoResponse response;
-  request.set_message("Hello");
+  request.set_message("Hello hello hello hello");
 
   for (int i = 0; i < num_rpcs; ++i) {
     ClientContext context;
+    context.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
     Status s = stub->Echo(&context, request, &response);
     EXPECT_EQ(response.message(), request.message());
     EXPECT_TRUE(s.ok());
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index b9d47b3..e9d86cc 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -227,6 +227,7 @@
   GenericServerContext srv_ctx;
   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
 
+  cli_ctx.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
   send_request.set_message("Hello");
   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
       generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));