Progress towards transport using metadata batches
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 110a4b5..63fbf67 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -292,6 +292,12 @@
   stream_link links[STREAM_LIST_COUNT];
   gpr_uint8 included[STREAM_LIST_COUNT];
 
+  /* incoming metadata */
+  grpc_linked_mdelem *incoming_metadata;
+  size_t incoming_metadata_count;
+  size_t incoming_metadata_capacity;
+  gpr_timespec incoming_deadline;
+
   /* sops from application */
   grpc_stream_op_buffer outgoing_sopb;
   /* sops that have passed flow control to be written */
@@ -593,6 +599,10 @@
   s->cancelled = 0;
   s->allow_window_updates = 0;
   s->published_close = 0;
+  s->incoming_metadata_count = 0;
+  s->incoming_metadata_capacity = 0;
+  s->incoming_metadata = NULL;
+  s->incoming_deadline = gpr_inf_future;
   memset(&s->links, 0, sizeof(s->links));
   memset(&s->included, 0, sizeof(s->included));
   grpc_sopb_init(&s->outgoing_sopb);
@@ -1057,6 +1067,14 @@
   }
 }
 
+static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
+  if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
+    s->incoming_metadata_capacity = GPR_MAX(8, 2 * s->incoming_metadata_capacity);
+    s->incoming_metadata = gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) * s->incoming_metadata_capacity);
+  }
+  s->incoming_metadata[s->incoming_metadata_count++].md = elem;
+}
+
 static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
                                 grpc_status_code local_status,
                                 grpc_chttp2_error_code error_code,
@@ -1076,9 +1094,7 @@
       stream_list_join(t, s, CANCELLED);
 
       gpr_ltoa(local_status, buffer);
-      grpc_sopb_add_metadata(
-          &s->parser.incoming_sopb,
-          grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+      add_incoming_metadata(t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
 
       stream_list_join(t, s, PENDING_CALLBACKS);
     }
@@ -1254,11 +1270,10 @@
       }
       grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
     }
-    grpc_sopb_add_deadline(&s->parser.incoming_sopb,
-                           gpr_time_add(gpr_now(), *cached_timeout));
+    s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
     grpc_mdelem_unref(md);
   } else {
-    grpc_sopb_add_metadata(&s->parser.incoming_sopb, md);
+    add_incoming_metadata(t, s, md);
   }
 }
 
@@ -1434,6 +1449,37 @@
   return window_update < MAX_WINDOW - window;
 }
 
+static void free_md(void *p, grpc_op_error result) {
+  gpr_free(p);
+}
+
+static void add_metadata_batch(transport *t) {
+  grpc_metadata_batch b;
+  stream *s = t->incoming_stream;
+  size_t i;
+
+  b.list.head = &s->incoming_metadata[0];
+  b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
+  b.garbage.head = b.garbage.tail = NULL;
+  b.deadline = s->incoming_deadline;
+
+  for (i = 1; i < s->incoming_metadata_count; i++) {
+    s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
+    s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
+  }
+  s->incoming_metadata[0].prev = NULL;
+  s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
+
+  grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
+  grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, s->incoming_metadata);
+
+  /* reset */
+  s->incoming_deadline = gpr_inf_future;
+  s->incoming_metadata = NULL;
+  s->incoming_metadata_count = 0;
+  s->incoming_metadata_capacity = 0;
+}
+
 static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
   grpc_chttp2_parse_state st;
   size_t i;
@@ -1448,8 +1494,7 @@
         stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
       }
       if (st.metadata_boundary) {
-        grpc_sopb_add_metadata_boundary(
-            &t->incoming_stream->parser.incoming_sopb);
+        add_metadata_batch(t);
         stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
       }
       if (st.ack_settings) {