Fix race condition in transport API

Specifically:

Receiving trailing and initial metadata had to be published in
lock-step.
=> If we wanted trailing metadata, we might not get initial metadata processed
   until messages arrived.
=> Compression code had no idea what codec to use.

To fix it, publish initial metadata as soon as it's ready (this is a
transport API change).

Requires changes to grpc_call to ensure ordering in processing initial
metadata and messages (one may be delayed).

Exposed at least some bugs in C++ where we never read initial metadata.

I expect at least one more similar bug.
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index a8db32b..c8aaf31 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -107,8 +107,8 @@
   if (op->recv_initial_metadata) {
     /* substitute our callback for the op callback */
     calld->recv_initial_metadata = op->recv_initial_metadata;
-    calld->on_done_recv = op->on_complete;
-    op->on_complete = &calld->finish_recv;
+    calld->on_done_recv = op->recv_initial_metadata_ready;
+    op->recv_initial_metadata_ready = &calld->finish_recv;
   }
 }
 
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 43eee04..1aa2720 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -127,8 +127,8 @@
   if (op->recv_initial_metadata != NULL) {
     /* substitute our callback for the higher callback */
     calld->recv_initial_metadata = op->recv_initial_metadata;
-    calld->on_done_recv = op->on_complete;
-    op->on_complete = &calld->hc_on_recv;
+    calld->on_done_recv = op->recv_initial_metadata_ready;
+    op->recv_initial_metadata_ready = &calld->hc_on_recv;
   }
 }
 
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index bb75323..370f8db 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -186,8 +186,8 @@
   if (op->recv_initial_metadata) {
     /* substitute our callback for the higher callback */
     calld->recv_initial_metadata = op->recv_initial_metadata;
-    calld->on_done_recv = op->on_complete;
-    op->on_complete = &calld->hs_on_recv;
+    calld->on_done_recv = op->recv_initial_metadata_ready;
+    op->recv_initial_metadata_ready = &calld->hs_on_recv;
   }
 }
 
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 3ad9fd9..81297c8 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -241,10 +241,8 @@
                         grpc_subchannel_call_holder *holder) {
   size_t i;
   for (i = 0; i < holder->waiting_ops_count; i++) {
-    grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
-                          NULL);
-    grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
-                          false, NULL);
+    grpc_transport_stream_op_finish_with_failure(exec_ctx,
+                                                 &holder->waiting_ops[i]);
   }
   holder->waiting_ops_count = 0;
 }
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 4c78711..3d8e5e8 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -176,8 +176,8 @@
   if (op->recv_initial_metadata != NULL) {
     /* substitute our callback for the higher callback */
     calld->recv_initial_metadata = op->recv_initial_metadata;
-    calld->on_done_recv = op->on_complete;
-    op->on_complete = &calld->auth_on_recv;
+    calld->on_done_recv = op->recv_initial_metadata_ready;
+    op->recv_initial_metadata_ready = &calld->auth_on_recv;
     calld->transport_op = *op;
   }
 }
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9495e74..1b117aa 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -159,6 +159,9 @@
   uint8_t receiving_message;
   uint8_t received_final_op;
 
+  /* have we received initial metadata */
+  bool has_initial_md_been_received;
+
   batch_control active_batches[MAX_CONCURRENT_BATCHES];
 
   /* first idx: is_receiving, second idx: is_trailing */
@@ -200,6 +203,7 @@
   gpr_slice receiving_slice;
   grpc_closure receiving_slice_ready;
   grpc_closure receiving_stream_ready;
+  grpc_closure receiving_initial_metadata_ready;
   uint32_t test_only_last_message_flags;
 
   union {
@@ -212,6 +216,11 @@
       int *cancelled;
     } server;
   } final_op;
+
+  struct {
+    void *bctlp;
+    bool success;
+  } saved_receiving_stream_ready_ctx;
 };
 
 #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -993,6 +1002,94 @@
   }
 }
 
+static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
+                                  bool success) {
+  grpc_call *call = bctl->call;
+  if (call->receiving_stream == NULL) {
+    *call->receiving_buffer = NULL;
+    call->receiving_message = 0;
+    if (gpr_unref(&bctl->steps_to_complete)) {
+      post_batch_completion(exec_ctx, bctl);
+    }
+  } else if (call->receiving_stream->length >
+             grpc_channel_get_max_message_length(call->channel)) {
+    cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
+                       "Max message size exceeded");
+    grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+    call->receiving_stream = NULL;
+    *call->receiving_buffer = NULL;
+    call->receiving_message = 0;
+    if (gpr_unref(&bctl->steps_to_complete)) {
+      post_batch_completion(exec_ctx, bctl);
+    }
+  } else {
+    call->test_only_last_message_flags = call->receiving_stream->flags;
+    if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+        (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+      *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
+          NULL, 0, call->compression_algorithm);
+    } else {
+      *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
+    }
+    grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
+                      bctl);
+    continue_receiving_slices(exec_ctx, bctl);
+    /* early out */
+    return;
+  }
+}
+
+static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
+                                   bool success) {
+  batch_control *bctl = bctlp;
+  grpc_call *call = bctl->call;
+
+  gpr_mu_lock(&bctl->call->mu);
+  if (bctl->call->has_initial_md_been_received) {
+    gpr_mu_unlock(&bctl->call->mu);
+    process_data_after_md(exec_ctx, bctlp, success);
+  } else {
+    call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
+    call->saved_receiving_stream_ready_ctx.success = success;
+    gpr_mu_unlock(&bctl->call->mu);
+  }
+}
+
+static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
+                                             void *bctlp, bool success) {
+  batch_control *bctl = bctlp;
+  grpc_call *call = bctl->call;
+
+  gpr_mu_lock(&call->mu);
+
+  grpc_metadata_batch *md =
+      &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+  grpc_metadata_batch_filter(md, recv_initial_filter, call);
+  call->has_initial_md_been_received = true;
+
+  if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+          0 &&
+      !call->is_client) {
+    GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+    set_deadline_alarm(exec_ctx, call, md->deadline);
+    GPR_TIMER_END("set_deadline_alarm", 0);
+  }
+
+  if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+    grpc_closure *saved_rsr_closure = grpc_closure_create(
+        receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
+    grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
+                          call->saved_receiving_stream_ready_ctx.success, NULL);
+    call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+  }
+
+  gpr_mu_unlock(&call->mu);
+
+  if (gpr_unref(&bctl->steps_to_complete)) {
+    post_batch_completion(exec_ctx, bctl);
+  }
+}
+
 static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
   batch_control *bctl = bctlp;
   grpc_call *call = bctl->call;
@@ -1011,19 +1108,6 @@
     grpc_metadata_batch_destroy(
         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
   }
-  if (bctl->recv_initial_metadata) {
-    grpc_metadata_batch *md =
-        &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
-    grpc_metadata_batch_filter(md, recv_initial_filter, call);
-
-    if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
-            0 &&
-        !call->is_client) {
-      GPR_TIMER_BEGIN("set_deadline_alarm", 0);
-      set_deadline_alarm(exec_ctx, call, md->deadline);
-      GPR_TIMER_END("set_deadline_alarm", 0);
-    }
-  }
   if (bctl->recv_final_op) {
     grpc_metadata_batch *md =
         &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
@@ -1065,45 +1149,6 @@
   }
 }
 
-static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
-                                   bool success) {
-  batch_control *bctl = bctlp;
-  grpc_call *call = bctl->call;
-
-  if (call->receiving_stream == NULL) {
-    *call->receiving_buffer = NULL;
-    call->receiving_message = 0;
-    if (gpr_unref(&bctl->steps_to_complete)) {
-      post_batch_completion(exec_ctx, bctl);
-    }
-  } else if (call->receiving_stream->length >
-             grpc_channel_get_max_message_length(call->channel)) {
-    cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
-                       "Max message size exceeded");
-    grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
-    call->receiving_stream = NULL;
-    *call->receiving_buffer = NULL;
-    call->receiving_message = 0;
-    if (gpr_unref(&bctl->steps_to_complete)) {
-      post_batch_completion(exec_ctx, bctl);
-    }
-  } else {
-    call->test_only_last_message_flags = call->receiving_stream->flags;
-    if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
-        (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
-      *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
-          NULL, 0, call->compression_algorithm);
-    } else {
-      *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
-    }
-    grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
-                      bctl);
-    continue_receiving_slices(exec_ctx, bctl);
-    /* early out */
-    return;
-  }
-}
-
 static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
                                         grpc_call *call, const grpc_op *ops,
                                         size_t nops, void *notify_tag,
@@ -1273,9 +1318,14 @@
         }
         call->received_initial_metadata = 1;
         call->buffered_metadata[0] = op->data.recv_initial_metadata;
+        grpc_closure_init(&call->receiving_initial_metadata_ready,
+                          receiving_initial_metadata_ready, bctl);
         bctl->recv_initial_metadata = 1;
         stream_op.recv_initial_metadata =
             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+        stream_op.recv_initial_metadata_ready =
+            &call->receiving_initial_metadata_ready;
+        num_completion_callbacks_needed++;
         break;
       case GRPC_OP_RECV_MESSAGE:
         /* Flag validation: currently allow no flags */
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 705996c..537069e 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -78,8 +78,7 @@
   } else if (op->recv_trailing_metadata != NULL) {
     fill_metadata(elem, op->recv_trailing_metadata);
   }
-  grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
-  grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+  grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
 }
 
 static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 42cffcc..fb5e0d4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -596,8 +596,8 @@
 
   if (op->recv_initial_metadata != NULL) {
     calld->recv_initial_metadata = op->recv_initial_metadata;
-    calld->on_done_recv_initial_metadata = op->on_complete;
-    op->on_complete = &calld->server_on_recv_initial_metadata;
+    calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+    op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
   }
 }
 
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index c611496..0e1e2c4 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -385,7 +385,7 @@
   grpc_closure *send_trailing_metadata_finished;
 
   grpc_metadata_batch *recv_initial_metadata;
-  grpc_closure *recv_initial_metadata_finished;
+  grpc_closure *recv_initial_metadata_ready;
   grpc_byte_stream **recv_message;
   grpc_closure *recv_message_ready;
   grpc_metadata_batch *recv_trailing_metadata;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 9298573..617d988 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -544,7 +544,7 @@
   GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
   GPR_ASSERT(s->global.send_message_finished == NULL);
   GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
-  GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
+  GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL);
   GPR_ASSERT(s->global.recv_message_ready == NULL);
   GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
   grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
@@ -863,9 +863,9 @@
   }
 
   if (op->recv_initial_metadata != NULL) {
-    GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
-    stream_global->recv_initial_metadata_finished =
-        add_closure_barrier(on_complete);
+    GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL);
+    stream_global->recv_initial_metadata_ready =
+        op->recv_initial_metadata_ready;
     stream_global->recv_initial_metadata = op->recv_initial_metadata;
     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
   }
@@ -1009,13 +1009,14 @@
   grpc_byte_stream *bs;
   while (
       grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
-    if (stream_global->recv_initial_metadata_finished != NULL &&
+    if (stream_global->recv_initial_metadata_ready != NULL &&
         stream_global->published_initial_metadata) {
       grpc_chttp2_incoming_metadata_buffer_publish(
           &stream_global->received_initial_metadata,
           stream_global->recv_initial_metadata);
-      grpc_chttp2_complete_closure_step(
-          exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
+      grpc_exec_ctx_enqueue(
+          exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL);
+      stream_global->recv_initial_metadata_ready = NULL;
     }
     if (stream_global->recv_message_ready != NULL) {
       if (stream_global->incoming_frames.head != NULL) {
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 08d6856..6e154b6 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -126,6 +126,7 @@
 void grpc_transport_stream_op_finish_with_failure(
     grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
   grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+  grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL);
   grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
 }
 
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f5cac77..8902c5d 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -92,6 +92,8 @@
 
   /** Receive initial metadata from the stream, into provided metadata batch. */
   grpc_metadata_batch *recv_initial_metadata;
+  /** Should be enqueued when initial metadata is ready to be processed. */
+  grpc_closure *recv_initial_metadata_ready;
 
   /** Receive message data from the stream, into provided byte stream. */
   grpc_byte_stream **recv_message;
@@ -103,7 +105,8 @@
   grpc_metadata_batch *recv_trailing_metadata;
 
   /** Should be enqueued when all requested operations (excluding recv_message
-     which has its own closure) in a given batch have been completed. */
+      and recv_initial_metadata which have their own closures) in a given batch
+      have been completed. */
   grpc_closure *on_complete;
 
   /** If != GRPC_STATUS_OK, cancel this stream */