Initial attempt at adding support for max metadata size.  Currently, the
code does not seem to be properly causing the RPC to fail when the max
size is exceeded.
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 4c73730..b5203b6 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -152,6 +152,8 @@
    channel). If this parameter is specified and the underlying is not an SSL
    channel, it will just be ignored. */
 #define GRPC_SSL_TARGET_NAME_OVERRIDE_ARG "grpc.ssl_target_name_override"
+/* Maximum metadata size */
+#define GRPC_ARG_MAX_METADATA_SIZE "grpc.max_metadata_size"
 
 /** Result of a grpc call. If the caller satisfies the prerequisites of a
     particular operation, the grpc_call_error returned will be GRPC_CALL_OK.
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 01507f5..c24950a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -56,6 +56,8 @@
 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
 #define MAX_WINDOW 0x7fffffffu
 
+#define DEFAULT_MAX_METADATA_SIZE 16 * 1024
+
 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
 
 int grpc_http_trace = 0;
@@ -250,6 +252,7 @@
   t->global.ping_counter = 1;
   t->global.pings.next = t->global.pings.prev = &t->global.pings;
   t->parsing.is_client = is_client;
+  t->parsing.max_metadata_size = DEFAULT_MAX_METADATA_SIZE;
   t->parsing.deframe_state =
       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
   t->writing.is_client = is_client;
@@ -372,6 +375,18 @@
               &t->writing.hpack_compressor,
               (uint32_t)channel_args->args[i].value.integer);
         }
+      } else if (0 == strcmp(channel_args->args[i].key,
+                             GRPC_ARG_MAX_METADATA_SIZE)) {
+        if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
+          gpr_log(GPR_ERROR, "%s: must be an integer",
+                  GRPC_ARG_MAX_METADATA_SIZE);
+        } else if (channel_args->args[i].value.integer < 0) {
+          gpr_log(GPR_ERROR, "%s: must be non-negative",
+                  GRPC_ARG_MAX_METADATA_SIZE);
+        } else {
+          t->parsing.max_metadata_size =
+              (uint32_t)channel_args->args[i].value.integer;
+        }
       }
     }
   }
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index db21744..3e463a7 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -65,6 +65,7 @@
         gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
   }
   buffer->elems[buffer->count++].md = elem;
+  buffer->size += GRPC_MDELEM_LENGTH(elem);
 }
 
 void grpc_chttp2_incoming_metadata_buffer_set_deadline(
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 17ecf8e..7db5db8 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -42,6 +42,7 @@
   size_t capacity;
   gpr_timespec deadline;
   int published;
+  size_t size;  /* total size of metadata */
 } grpc_chttp2_incoming_metadata_buffer;
 
 /** assumes everything initially zeroed */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 98cd38a..d547a6e 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -229,6 +229,9 @@
   /** is this transport a client? (boolean) */
   uint8_t is_client;
 
+  /** max metadata size */
+  uint32_t max_metadata_size;
+
   /** were settings updated? */
   uint8_t settings_updated;
   /** was a settings ack received? */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e827a43..0cf4d87 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -624,8 +624,15 @@
         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
     GRPC_MDELEM_UNREF(md);
   } else {
-    grpc_chttp2_incoming_metadata_buffer_add(
-        &stream_parsing->metadata_buffer[0], md);
+    const size_t new_size = stream_parsing->metadata_buffer[0].size +
+                            GRPC_MDELEM_LENGTH(md);
+    if (new_size > transport_parsing->max_metadata_size) {
+      stream_parsing->seen_error = 1;
+      GRPC_MDELEM_UNREF(md);
+    } else {
+      grpc_chttp2_incoming_metadata_buffer_add(
+          &stream_parsing->metadata_buffer[0], md);
+    }
   }
 
   grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
@@ -652,8 +659,15 @@
     stream_parsing->seen_error = 1;
   }
 
-  grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->metadata_buffer[1],
-                                           md);
+  const size_t new_size = stream_parsing->metadata_buffer[1].size +
+                          GRPC_MDELEM_LENGTH(md);
+  if (new_size > transport_parsing->max_metadata_size) {
+    stream_parsing->seen_error = 1;
+    GRPC_MDELEM_UNREF(md);
+  } else {
+    grpc_chttp2_incoming_metadata_buffer_add(
+        &stream_parsing->metadata_buffer[1], md);
+  }
 
   grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
 
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 713d9e6..277c257 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -146,6 +146,8 @@
 const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
 
 #define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice))
+#define GRPC_MDELEM_LENGTH(e) (GRPC_MDSTR_LENGTH(e->key) + \
+                               GRPC_MDSTR_LENGTH(e->value))
 
 int grpc_mdstr_is_legal_header(grpc_mdstr *s);
 int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s);
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index 77afe58..5f1a332 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -107,6 +107,14 @@
   return has_metadata(array->metadata, array->count, key, value);
 }
 
+int contains_metadata_key(grpc_metadata_array *array, const char *key) {
+  for (size_t i = 0; i < array->count; ++i) {
+    if (strcmp(array->metadata[i].key, key) == 0)
+      return 1;
+  }
+  return 0;
+}
+
 static gpr_slice merge_slices(gpr_slice *slices, size_t nslices) {
   size_t i;
   size_t len = 0;
diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h
index b3e07c4..a540659 100644
--- a/test/core/end2end/cq_verifier.h
+++ b/test/core/end2end/cq_verifier.h
@@ -62,5 +62,6 @@
 int byte_buffer_eq_string(grpc_byte_buffer *byte_buffer, const char *string);
 int contains_metadata(grpc_metadata_array *array, const char *key,
                       const char *value);
+int contains_metadata_key(grpc_metadata_array *array, const char *key);
 
 #endif /* GRPC_TEST_CORE_END2END_CQ_VERIFIER_H */
diff --git a/test/core/end2end/tests/large_metadata.c b/test/core/end2end/tests/large_metadata.c
index 0e5d6b4..f09b55a 100644
--- a/test/core/end2end/tests/large_metadata.c
+++ b/test/core/end2end/tests/large_metadata.c
@@ -98,7 +98,8 @@
 }
 
 /* Request with a large amount of metadata.*/
-static void test_request_with_large_metadata(grpc_end2end_test_config config) {
+static void test_request_with_large_metadata(grpc_end2end_test_config config,
+                                             int allow_large_metadata) {
   grpc_call *c;
   grpc_call *s;
   gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
@@ -106,8 +107,16 @@
       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
   gpr_timespec deadline = five_seconds_time();
   grpc_metadata meta;
-  grpc_end2end_test_fixture f =
-      begin_test(config, "test_request_with_large_metadata", NULL, NULL);
+  const char *test_name = allow_large_metadata
+                          ? "test_request_with_large_metadata_allowed"
+                          : "test_request_with_large_metadata_not_allowed";
+  const size_t large_size = 64 * 1024;
+  grpc_arg arg = { GRPC_ARG_INTEGER, GRPC_ARG_MAX_METADATA_SIZE,
+                   { .integer=(int)large_size + 1024 } };
+  grpc_channel_args args = { 1, &arg };
+  grpc_channel_args* use_args = allow_large_metadata ? &args : NULL;
+  grpc_end2end_test_fixture f = begin_test(
+      config, test_name, use_args, use_args);
   cq_verifier *cqv = cq_verifier_create(f.cq);
   grpc_op ops[6];
   grpc_op *op;
@@ -121,7 +130,6 @@
   char *details = NULL;
   size_t details_capacity = 0;
   int was_cancelled = 2;
-  const size_t large_size = 64 * 1024;
 
   c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
                                "/foo", "foo.test.google.fr", deadline, NULL);
@@ -214,13 +222,19 @@
   cq_expect_completion(cqv, tag(1), 1);
   cq_verify(cqv);
 
+// FIXME: why is this assert passing with allow_large_metadata=false?
   GPR_ASSERT(status == GRPC_STATUS_OK);
   GPR_ASSERT(0 == strcmp(details, "xyz"));
   GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
   GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
   GPR_ASSERT(was_cancelled == 0);
-  GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, "hello world"));
-  GPR_ASSERT(contains_metadata(&request_metadata_recv, "key", meta.value));
+  if (allow_large_metadata) {
+    GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, "hello world"));
+    GPR_ASSERT(contains_metadata(&request_metadata_recv, "key", meta.value));
+  } else {
+    GPR_ASSERT(request_payload_recv == NULL);
+    GPR_ASSERT(!contains_metadata_key(&request_metadata_recv, "key"));
+  }
 
   gpr_free(details);
   grpc_metadata_array_destroy(&initial_metadata_recv);
@@ -243,7 +257,8 @@
 }
 
 void large_metadata(grpc_end2end_test_config config) {
-  test_request_with_large_metadata(config);
+  test_request_with_large_metadata(config, 1);
+  test_request_with_large_metadata(config, 0);
 }
 
 void large_metadata_pre_init(void) {}