Revert "Merge pull request #15746 from grpc/revert-15709-recv_trailing_metadata_ready2"

This reverts commit 3f9308ce1f8cb42c96901c1700f0b9dbb531f186, reversing
changes made to 92a0ae0b1081840d2c5a488f66bf6550c1a492f4.
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index f141aab..8a29c80 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -804,6 +804,7 @@
   // For intercepting recv_trailing_metadata.
   grpc_metadata_batch recv_trailing_metadata;
   grpc_transport_stream_stats collect_stats;
+  grpc_closure recv_trailing_metadata_ready;
   // For intercepting on_complete.
   grpc_closure on_complete;
 } subchannel_batch_data;
@@ -1179,35 +1180,24 @@
             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
             elem->channel_data, calld, num_batches, grpc_error_string(error));
   }
-  grpc_transport_stream_op_batch*
-      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
-  size_t num_batches = 0;
+  grpc_core::CallCombinerClosureList closures;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batches[num_batches++] = batch;
+      batch->handler_private.extra_arg = calld;
+      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                        fail_pending_batch_in_call_combiner, batch,
+                        grpc_schedule_on_exec_ctx);
+      closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+                   "pending_batches_fail");
       pending_batch_clear(calld, pending);
     }
   }
-  for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
-    grpc_transport_stream_op_batch* batch = batches[i];
-    batch->handler_private.extra_arg = calld;
-    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                      fail_pending_batch_in_call_combiner, batch,
-                      grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_START(calld->call_combiner,
-                             &batch->handler_private.closure,
-                             GRPC_ERROR_REF(error), "pending_batches_fail");
-  }
   if (yield_call_combiner) {
-    if (num_batches > 0) {
-      // Note: This will release the call combiner.
-      grpc_transport_stream_op_batch_finish_with_failure(
-          batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
-    } else {
-      GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
-    }
+    closures.RunClosures(calld->call_combiner);
+  } else {
+    closures.RunClosuresWithoutYielding(calld->call_combiner);
   }
   GRPC_ERROR_UNREF(error);
 }
@@ -1242,30 +1232,22 @@
             " pending batches on subchannel_call=%p",
             chand, calld, num_batches, calld->subchannel_call);
   }
-  grpc_transport_stream_op_batch*
-      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
-  size_t num_batches = 0;
+  grpc_core::CallCombinerClosureList closures;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batches[num_batches++] = batch;
+      batch->handler_private.extra_arg = calld->subchannel_call;
+      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                        resume_pending_batch_in_call_combiner, batch,
+                        grpc_schedule_on_exec_ctx);
+      closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+                   "pending_batches_resume");
       pending_batch_clear(calld, pending);
     }
   }
-  for (size_t i = 1; i < num_batches; ++i) {
-    grpc_transport_stream_op_batch* batch = batches[i];
-    batch->handler_private.extra_arg = calld->subchannel_call;
-    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                      resume_pending_batch_in_call_combiner, batch,
-                      grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_START(calld->call_combiner,
-                             &batch->handler_private.closure, GRPC_ERROR_NONE,
-                             "pending_batches_resume");
-  }
-  GPR_ASSERT(num_batches > 0);
   // Note: This will release the call combiner.
-  grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+  closures.RunClosures(calld->call_combiner);
 }
 
 static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1280,7 +1262,10 @@
        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
            nullptr) &&
       (!batch->recv_message ||
-       batch->payload->recv_message.recv_message_ready == nullptr)) {
+       batch->payload->recv_message.recv_message_ready == nullptr) &&
+      (!batch->recv_trailing_metadata ||
+       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+           nullptr)) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
               calld);
@@ -1289,75 +1274,27 @@
   }
 }
 
-// Returns true if all ops in the pending batch have been completed.
-static bool pending_batch_is_completed(
-    pending_batch* pending, call_data* calld,
-    subchannel_call_retry_state* retry_state) {
-  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
-    return false;
+// Returns a pointer to the first pending batch for which predicate(batch)
+// returns true, or null if not found.
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+                                         const char* log_message,
+                                         Predicate predicate) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    grpc_transport_stream_op_batch* batch = pending->batch;
+    if (batch != nullptr && predicate(batch)) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+                calld, log_message, i);
+      }
+      return pending;
+    }
   }
-  if (pending->batch->send_initial_metadata &&
-      !retry_state->completed_send_initial_metadata) {
-    return false;
-  }
-  if (pending->batch->send_message &&
-      retry_state->completed_send_message_count <
-          calld->send_messages->size()) {
-    return false;
-  }
-  if (pending->batch->send_trailing_metadata &&
-      !retry_state->completed_send_trailing_metadata) {
-    return false;
-  }
-  if (pending->batch->recv_initial_metadata &&
-      !retry_state->completed_recv_initial_metadata) {
-    return false;
-  }
-  if (pending->batch->recv_message &&
-      retry_state->completed_recv_message_count <
-          retry_state->started_recv_message_count) {
-    return false;
-  }
-  if (pending->batch->recv_trailing_metadata &&
-      !retry_state->completed_recv_trailing_metadata) {
-    return false;
-  }
-  return true;
-}
-
-// Returns true if any op in the batch was not yet started.
-static bool pending_batch_is_unstarted(
-    pending_batch* pending, call_data* calld,
-    subchannel_call_retry_state* retry_state) {
-  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
-    return false;
-  }
-  if (pending->batch->send_initial_metadata &&
-      !retry_state->started_send_initial_metadata) {
-    return true;
-  }
-  if (pending->batch->send_message &&
-      retry_state->started_send_message_count < calld->send_messages->size()) {
-    return true;
-  }
-  if (pending->batch->send_trailing_metadata &&
-      !retry_state->started_send_trailing_metadata) {
-    return true;
-  }
-  if (pending->batch->recv_initial_metadata &&
-      !retry_state->started_recv_initial_metadata) {
-    return true;
-  }
-  if (pending->batch->recv_message &&
-      retry_state->completed_recv_message_count ==
-          retry_state->started_recv_message_count) {
-    return true;
-  }
-  if (pending->batch->recv_trailing_metadata &&
-      !retry_state->started_recv_trailing_metadata) {
-    return true;
-  }
-  return false;
+  return nullptr;
 }
 
 //
@@ -1544,8 +1481,13 @@
 // subchannel_batch_data
 //
 
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount.  If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
-                                                int refcount) {
+                                                int refcount,
+                                                bool set_on_complete) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   subchannel_call_retry_state* retry_state =
       static_cast<subchannel_call_retry_state*>(
@@ -1558,9 +1500,11 @@
       GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
   batch_data->batch.payload = &retry_state->batch_payload;
   gpr_ref_init(&batch_data->refs, refcount);
-  GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
-                    grpc_schedule_on_exec_ctx);
-  batch_data->batch.on_complete = &batch_data->on_complete;
+  if (set_on_complete) {
+    GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+                      grpc_schedule_on_exec_ctx);
+    batch_data->batch.on_complete = &batch_data->on_complete;
+  }
   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
   return batch_data;
 }
@@ -1593,26 +1537,14 @@
 static void invoke_recv_initial_metadata_callback(void* arg,
                                                   grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
-  channel_data* chand =
-      static_cast<channel_data*>(batch_data->elem->channel_data);
-  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending batch.
-  pending_batch* pending = nullptr;
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
-    if (batch != nullptr && batch->recv_initial_metadata &&
-        batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
-            nullptr) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
-                "pending batch at index %" PRIuPTR,
-                chand, calld, i);
-      }
-      pending = &calld->pending_batches[i];
-      break;
-    }
-  }
+  pending_batch* pending = pending_batch_find(
+      batch_data->elem, "invoking recv_initial_metadata_ready for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_initial_metadata &&
+               batch->payload->recv_initial_metadata
+                       .recv_initial_metadata_ready != nullptr;
+      });
   GPR_ASSERT(pending != nullptr);
   // Return metadata.
   grpc_metadata_batch_move(
@@ -1648,10 +1580,19 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
+  retry_state->completed_recv_initial_metadata = true;
+  // If a retry was already dispatched, then we're not going to use the
+  // result of this recv_initial_metadata op, so do nothing.
+  if (retry_state->retry_dispatched) {
+    GRPC_CALL_COMBINER_STOP(
+        calld->call_combiner,
+        "recv_initial_metadata_ready after retry dispatched");
+    return;
+  }
   // If we got an error or a Trailers-Only response and have not yet gotten
-  // the recv_trailing_metadata on_complete callback, then defer
-  // propagating this callback back to the surface.  We can evaluate whether
-  // to retry when recv_trailing_metadata comes back.
+  // the recv_trailing_metadata_ready callback, then defer propagating this
+  // callback back to the surface.  We can evaluate whether to retry when
+  // recv_trailing_metadata comes back.
   if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
                     error != GRPC_ERROR_NONE) &&
                    !retry_state->completed_recv_trailing_metadata)) {
@@ -1676,9 +1617,9 @@
   }
   // Received valid initial metadata, so commit the call.
   retry_commit(elem, retry_state);
+  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_initial_metadata_callback(batch_data, error);
-  GRPC_ERROR_UNREF(error);
 }
 
 //
@@ -1688,25 +1629,13 @@
 // Invokes recv_message_ready for a subchannel batch.
 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
-  channel_data* chand =
-      static_cast<channel_data*>(batch_data->elem->channel_data);
-  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending op.
-  pending_batch* pending = nullptr;
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
-    if (batch != nullptr && batch->recv_message &&
-        batch->payload->recv_message.recv_message_ready != nullptr) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: invoking recv_message_ready for "
-                "pending batch at index %" PRIuPTR,
-                chand, calld, i);
-      }
-      pending = &calld->pending_batches[i];
-      break;
-    }
-  }
+  pending_batch* pending = pending_batch_find(
+      batch_data->elem, "invoking recv_message_ready for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_message &&
+               batch->payload->recv_message.recv_message_ready != nullptr;
+      });
   GPR_ASSERT(pending != nullptr);
   // Return payload.
   *pending->batch->payload->recv_message.recv_message =
@@ -1738,10 +1667,18 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
+  ++retry_state->completed_recv_message_count;
+  // If a retry was already dispatched, then we're not going to use the
+  // result of this recv_message op, so do nothing.
+  if (retry_state->retry_dispatched) {
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "recv_message_ready after retry dispatched");
+    return;
+  }
   // If we got an error or the payload was nullptr and we have not yet gotten
-  // the recv_trailing_metadata on_complete callback, then defer
-  // propagating this callback back to the surface.  We can evaluate whether
-  // to retry when recv_trailing_metadata comes back.
+  // the recv_trailing_metadata_ready callback, then defer propagating this
+  // callback back to the surface.  We can evaluate whether to retry when
+  // recv_trailing_metadata comes back.
   if (GPR_UNLIKELY(
           (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
           !retry_state->completed_recv_trailing_metadata)) {
@@ -1764,133 +1701,268 @@
   }
   // Received a valid message, so commit the call.
   retry_commit(elem, retry_state);
+  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_message_callback(batch_data, error);
-  GRPC_ERROR_UNREF(error);
 }
 
 //
-// list of closures to execute in call combiner
+// recv_trailing_metadata handling
 //
 
-// Represents a closure that needs to run in the call combiner as part of
-// starting or completing a batch.
-typedef struct {
-  grpc_closure* closure;
-  grpc_error* error;
-  const char* reason;
-  bool free_reason = false;
-} closure_to_execute;
+// Sets *status and *server_pushback_md based on batch_data and error.
+static void get_call_status(subchannel_batch_data* batch_data,
+                            grpc_error* error, grpc_status_code* status,
+                            grpc_mdelem** server_pushback_md) {
+  grpc_call_element* elem = batch_data->elem;
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (error != GRPC_ERROR_NONE) {
+    grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
+                          nullptr);
+  } else {
+    grpc_metadata_batch* md_batch =
+        batch_data->batch.payload->recv_trailing_metadata
+            .recv_trailing_metadata;
+    GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+    *status =
+        grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+    if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+      *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
 
-static void execute_closures_in_call_combiner(grpc_call_element* elem,
-                                              const char* caller,
-                                              closure_to_execute* closures,
-                                              size_t num_closures) {
+// Adds recv_trailing_metadata_ready closure to closures.
+static void add_closure_for_recv_trailing_metadata_ready(
+    grpc_call_element* elem, subchannel_batch_data* batch_data,
+    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+  // Find pending batch.
+  pending_batch* pending = pending_batch_find(
+      elem, "invoking recv_trailing_metadata for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_trailing_metadata &&
+               batch->payload->recv_trailing_metadata
+                       .recv_trailing_metadata_ready != nullptr;
+      });
+  // If we generated the recv_trailing_metadata op internally via
+  // start_internal_recv_trailing_metadata(), then there will be no
+  // pending batch.
+  if (pending == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return;
+  }
+  // Return metadata.
+  grpc_metadata_batch_move(
+      &batch_data->recv_trailing_metadata,
+      pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+  // Add closure.
+  closures->Add(pending->batch->payload->recv_trailing_metadata
+                    .recv_trailing_metadata_ready,
+                error, "recv_trailing_metadata_ready for pending batch");
+  // Update bookkeeping.
+  pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+      nullptr;
+  maybe_clear_pending_batch(elem, pending);
+}
+
+// Adds any necessary closures for deferred recv_initial_metadata and
+// recv_message callbacks to closures.
+static void add_closures_for_deferred_recv_callbacks(
+    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
+    grpc_core::CallCombinerClosureList* closures) {
+  if (batch_data->batch.recv_trailing_metadata) {
+    // Add closure for deferred recv_initial_metadata_ready.
+    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
+                     nullptr)) {
+      GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+                        invoke_recv_initial_metadata_callback,
+                        retry_state->recv_initial_metadata_ready_deferred_batch,
+                        grpc_schedule_on_exec_ctx);
+      closures->Add(&batch_data->recv_initial_metadata_ready,
+                    retry_state->recv_initial_metadata_error,
+                    "resuming recv_initial_metadata_ready");
+      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+    }
+    // Add closure for deferred recv_message_ready.
+    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
+                     nullptr)) {
+      GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+                        invoke_recv_message_callback,
+                        retry_state->recv_message_ready_deferred_batch,
+                        grpc_schedule_on_exec_ctx);
+      closures->Add(&batch_data->recv_message_ready,
+                    retry_state->recv_message_error,
+                    "resuming recv_message_ready");
+      retry_state->recv_message_ready_deferred_batch = nullptr;
+    }
+  }
+}
+
+// Returns true if any op in the batch was not yet started.
+// Only looks at send ops, since recv ops are always started immediately.
+static bool pending_batch_is_unstarted(
+    pending_batch* pending, call_data* calld,
+    subchannel_call_retry_state* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
+  }
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->started_send_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->send_message &&
+      retry_state->started_send_message_count < calld->send_messages->size()) {
+    return true;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->started_send_trailing_metadata) {
+    return true;
+  }
+  return false;
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures.
+static void add_closures_to_fail_unstarted_pending_batches(
+    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  // Note that the call combiner will be yielded for each closure that
-  // we schedule.  We're already running in the call combiner, so one of
-  // the closures can be scheduled directly, but the others will
-  // have to re-enter the call combiner.
-  if (num_closures > 0) {
-    if (grpc_client_channel_trace.enabled()) {
-      gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
-              calld, caller, closures[0].reason);
-    }
-    GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
-    if (closures[0].free_reason) {
-      gpr_free(const_cast<char*>(closures[0].reason));
-    }
-    for (size_t i = 1; i < num_closures; ++i) {
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
       if (grpc_client_channel_trace.enabled()) {
         gpr_log(GPR_INFO,
-                "chand=%p calld=%p: %s starting closure in call combiner: %s",
-                chand, calld, caller, closures[i].reason);
+                "chand=%p calld=%p: failing unstarted pending batch at index "
+                "%" PRIuPTR,
+                chand, calld, i);
       }
-      GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
-                               closures[i].error, closures[i].reason);
-      if (closures[i].free_reason) {
-        gpr_free(const_cast<char*>(closures[i].reason));
-      }
+      closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+                    "failing on_complete for pending batch");
+      pending->batch->on_complete = nullptr;
+      maybe_clear_pending_batch(elem, pending);
     }
-  } else {
-    if (grpc_client_channel_trace.enabled()) {
-      gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
-              calld, caller);
-    }
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
   }
+  GRPC_ERROR_UNREF(error);
+}
+
+// Runs necessary closures upon completion of a call attempt.
+static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
+                                            grpc_error* error) {
+  grpc_call_element* elem = batch_data->elem;
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  subchannel_call_retry_state* retry_state =
+      static_cast<subchannel_call_retry_state*>(
+          grpc_connected_subchannel_call_get_parent_data(
+              batch_data->subchannel_call));
+  // Construct list of closures to execute.
+  grpc_core::CallCombinerClosureList closures;
+  // First, add closure for recv_trailing_metadata_ready.
+  add_closure_for_recv_trailing_metadata_ready(
+      elem, batch_data, GRPC_ERROR_REF(error), &closures);
+  // If there are deferred recv_initial_metadata_ready or recv_message_ready
+  // callbacks, add them to closures.
+  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+  // Add closures to fail any pending batches that have not yet been started.
+  add_closures_to_fail_unstarted_pending_batches(
+      elem, retry_state, GRPC_ERROR_REF(error), &closures);
+  // Don't need batch_data anymore.
+  batch_data_unref(batch_data);
+  // Schedule all of the closures identified above.
+  // Note: This will release the call combiner.
+  closures.RunClosures(calld->call_combiner);
+  GRPC_ERROR_UNREF(error);
+}
+
+// Intercepts recv_trailing_metadata_ready callback for retries.
+// Commits the call and returns the trailing metadata up the stack.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+  subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+  grpc_call_element* elem = batch_data->elem;
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (grpc_client_channel_trace.enabled()) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+            chand, calld, grpc_error_string(error));
+  }
+  subchannel_call_retry_state* retry_state =
+      static_cast<subchannel_call_retry_state*>(
+          grpc_connected_subchannel_call_get_parent_data(
+              batch_data->subchannel_call));
+  retry_state->completed_recv_trailing_metadata = true;
+  // Get the call's status and check for server pushback metadata.
+  grpc_status_code status = GRPC_STATUS_OK;
+  grpc_mdelem* server_pushback_md = nullptr;
+  get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+                  &server_pushback_md);
+  if (grpc_client_channel_trace.enabled()) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+            calld, grpc_status_code_to_string(status));
+  }
+  // Check if we should retry.
+  if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+    // Unref batch_data for deferred recv_initial_metadata_ready or
+    // recv_message_ready callbacks, if any.
+    if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+      batch_data_unref(batch_data);
+      GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+    }
+    if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+      batch_data_unref(batch_data);
+      GRPC_ERROR_UNREF(retry_state->recv_message_error);
+    }
+    batch_data_unref(batch_data);
+    return;
+  }
+  // Not retrying, so commit the call.
+  retry_commit(elem, retry_state);
+  // Run any necessary closures.
+  run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
 }
 
 //
 // on_complete callback handling
 //
 
-// Updates retry_state to reflect the ops completed in batch_data.
-static void update_retry_state_for_completed_batch(
-    subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state) {
-  if (batch_data->batch.send_initial_metadata) {
-    retry_state->completed_send_initial_metadata = true;
+// Adds the on_complete closure for the pending batch completed in
+// batch_data to closures.
+static void add_closure_for_completed_pending_batch(
+    grpc_call_element* elem, subchannel_batch_data* batch_data,
+    subchannel_call_retry_state* retry_state, grpc_error* error,
+    grpc_core::CallCombinerClosureList* closures) {
+  pending_batch* pending = pending_batch_find(
+      elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+        // Match the pending batch with the same set of send ops as the
+        // subchannel batch we've just completed.
+        return batch->on_complete != nullptr &&
+               batch_data->batch.send_initial_metadata ==
+                   batch->send_initial_metadata &&
+               batch_data->batch.send_message == batch->send_message &&
+               batch_data->batch.send_trailing_metadata ==
+                   batch->send_trailing_metadata;
+      });
+  // If batch_data is a replay batch, then there will be no pending
+  // batch to complete.
+  if (pending == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return;
   }
-  if (batch_data->batch.send_message) {
-    ++retry_state->completed_send_message_count;
-  }
-  if (batch_data->batch.send_trailing_metadata) {
-    retry_state->completed_send_trailing_metadata = true;
-  }
-  if (batch_data->batch.recv_initial_metadata) {
-    retry_state->completed_recv_initial_metadata = true;
-  }
-  if (batch_data->batch.recv_message) {
-    ++retry_state->completed_recv_message_count;
-  }
-  if (batch_data->batch.recv_trailing_metadata) {
-    retry_state->completed_recv_trailing_metadata = true;
-  }
-}
-
-// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures, updating *num_closures as needed.
-static void add_closures_for_deferred_recv_callbacks(
-    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
-    closure_to_execute* closures, size_t* num_closures) {
-  if (batch_data->batch.recv_trailing_metadata) {
-    // Add closure for deferred recv_initial_metadata_ready.
-    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
-                     nullptr)) {
-      closure_to_execute* closure = &closures[(*num_closures)++];
-      closure->closure = GRPC_CLOSURE_INIT(
-          &batch_data->recv_initial_metadata_ready,
-          invoke_recv_initial_metadata_callback,
-          retry_state->recv_initial_metadata_ready_deferred_batch,
-          grpc_schedule_on_exec_ctx);
-      closure->error = retry_state->recv_initial_metadata_error;
-      closure->reason = "resuming recv_initial_metadata_ready";
-      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
-    }
-    // Add closure for deferred recv_message_ready.
-    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
-                     nullptr)) {
-      closure_to_execute* closure = &closures[(*num_closures)++];
-      closure->closure = GRPC_CLOSURE_INIT(
-          &batch_data->recv_message_ready, invoke_recv_message_callback,
-          retry_state->recv_message_ready_deferred_batch,
-          grpc_schedule_on_exec_ctx);
-      closure->error = retry_state->recv_message_error;
-      closure->reason = "resuming recv_message_ready";
-      retry_state->recv_message_ready_deferred_batch = nullptr;
-    }
-  }
+  // Add closure.
+  closures->Add(pending->batch->on_complete, error,
+                "on_complete for pending batch");
+  pending->batch->on_complete = nullptr;
+  maybe_clear_pending_batch(elem, pending);
 }
 
 // If there are any cached ops to replay or pending ops to start on the
 // subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches(), updating *num_closures as needed.
+// start_retriable_subchannel_batches().
 static void add_closures_for_replay_or_pending_send_ops(
     grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state, closure_to_execute* closures,
-    size_t* num_closures) {
+    subchannel_call_retry_state* retry_state,
+    grpc_core::CallCombinerClosureList* closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   bool have_pending_send_message_ops =
@@ -1916,95 +1988,14 @@
               "chand=%p calld=%p: starting next batch for pending send op(s)",
               chand, calld);
     }
-    closure_to_execute* closure = &closures[(*num_closures)++];
-    closure->closure = GRPC_CLOSURE_INIT(
-        &batch_data->batch.handler_private.closure,
-        start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
-    closure->error = GRPC_ERROR_NONE;
-    closure->reason = "starting next batch for send_* op(s)";
+    GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+                      start_retriable_subchannel_batches, elem,
+                      grpc_schedule_on_exec_ctx);
+    closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+                  "starting next batch for send_* op(s)");
   }
 }
 
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures, updating *num_closures as needed.
-static void add_closures_for_completed_pending_batches(
-    grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state, grpc_error* error,
-    closure_to_execute* closures, size_t* num_closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    if (pending_batch_is_completed(pending, calld, retry_state)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
-                chand, calld, i);
-      }
-      // Copy the trailing metadata to return it to the surface.
-      if (batch_data->batch.recv_trailing_metadata) {
-        grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
-                                 pending->batch->payload->recv_trailing_metadata
-                                     .recv_trailing_metadata);
-      }
-      closure_to_execute* closure = &closures[(*num_closures)++];
-      closure->closure = pending->batch->on_complete;
-      closure->error = GRPC_ERROR_REF(error);
-      closure->reason = "on_complete for pending batch";
-      pending->batch->on_complete = nullptr;
-      maybe_clear_pending_batch(elem, pending);
-    }
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures, updating
-// *num_closures as needed.
-static void add_closures_to_fail_unstarted_pending_batches(
-    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: failing unstarted pending batch at index "
-                "%" PRIuPTR,
-                chand, calld, i);
-      }
-      if (pending->batch->recv_initial_metadata) {
-        closure_to_execute* closure = &closures[(*num_closures)++];
-        closure->closure = pending->batch->payload->recv_initial_metadata
-                               .recv_initial_metadata_ready;
-        closure->error = GRPC_ERROR_REF(error);
-        closure->reason =
-            "failing recv_initial_metadata_ready for pending batch";
-        pending->batch->payload->recv_initial_metadata
-            .recv_initial_metadata_ready = nullptr;
-      }
-      if (pending->batch->recv_message) {
-        *pending->batch->payload->recv_message.recv_message = nullptr;
-        closure_to_execute* closure = &closures[(*num_closures)++];
-        closure->closure =
-            pending->batch->payload->recv_message.recv_message_ready;
-        closure->error = GRPC_ERROR_REF(error);
-        closure->reason = "failing recv_message_ready for pending batch";
-        pending->batch->payload->recv_message.recv_message_ready = nullptr;
-      }
-      closure_to_execute* closure = &closures[(*num_closures)++];
-      closure->closure = pending->batch->on_complete;
-      closure->error = GRPC_ERROR_REF(error);
-      closure->reason = "failing on_complete for pending batch";
-      pending->batch->on_complete = nullptr;
-      maybe_clear_pending_batch(elem, pending);
-    }
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
 // Callback used to intercept on_complete from subchannel calls.
 // Called only when retries are enabled.
 static void on_complete(void* arg, grpc_error* error) {
@@ -2022,136 +2013,49 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
-  // If we have previously completed recv_trailing_metadata, then the
-  // call is finished.
-  bool call_finished = retry_state->completed_recv_trailing_metadata;
-  // Record whether we were already committed before receiving this callback.
-  const bool previously_committed = calld->retry_committed;
   // Update bookkeeping in retry_state.
-  update_retry_state_for_completed_batch(batch_data, retry_state);
-  if (call_finished) {
-    if (grpc_client_channel_trace.enabled()) {
-      gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
-              calld);
-    }
-  } else {
-    // Check if this batch finished the call, and if so, get its status.
-    // The call is finished if either (a) this callback was invoked with
-    // an error or (b) we receive status.
-    grpc_status_code status = GRPC_STATUS_OK;
-    grpc_mdelem* server_pushback_md = nullptr;
-    if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {  // Case (a).
-      call_finished = true;
-      grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
-                            nullptr);
-    } else if (batch_data->batch.recv_trailing_metadata) {  // Case (b).
-      call_finished = true;
-      grpc_metadata_batch* md_batch =
-          batch_data->batch.payload->recv_trailing_metadata
-              .recv_trailing_metadata;
-      GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
-      status = grpc_get_status_code_from_metadata(
-          md_batch->idx.named.grpc_status->md);
-      if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
-        server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
-      }
-    }
-    // If the call just finished, check if we should retry.
-    if (call_finished) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
-                calld, grpc_status_code_to_string(status));
-      }
-      if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
-        // Unref batch_data for deferred recv_initial_metadata_ready or
-        // recv_message_ready callbacks, if any.
-        if (batch_data->batch.recv_trailing_metadata &&
-            retry_state->recv_initial_metadata_ready_deferred_batch !=
-                nullptr) {
-          batch_data_unref(batch_data);
-          GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
-        }
-        if (batch_data->batch.recv_trailing_metadata &&
-            retry_state->recv_message_ready_deferred_batch != nullptr) {
-          batch_data_unref(batch_data);
-          GRPC_ERROR_UNREF(retry_state->recv_message_error);
-        }
-        // Track number of pending subchannel send batches and determine if
-        // this was the last one.
-        bool last_callback_complete = false;
-        if (batch_data->batch.send_initial_metadata ||
-            batch_data->batch.send_message ||
-            batch_data->batch.send_trailing_metadata) {
-          --calld->num_pending_retriable_subchannel_send_batches;
-          last_callback_complete =
-              calld->num_pending_retriable_subchannel_send_batches == 0;
-        }
-        batch_data_unref(batch_data);
-        // If we just completed the last subchannel send batch, unref the
-        // call stack.
-        if (last_callback_complete) {
-          GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
-        }
-        return;
-      }
-      // Not retrying, so commit the call.
-      retry_commit(elem, retry_state);
-    }
+  if (batch_data->batch.send_initial_metadata) {
+    retry_state->completed_send_initial_metadata = true;
   }
-  // If we were already committed before receiving this callback, free
-  // cached data for send ops that we've just completed.  (If the call has
-  // just now finished, the call to retry_commit() above will have freed all
-  // cached send ops, so we don't need to do it here.)
-  if (previously_committed) {
+  if (batch_data->batch.send_message) {
+    ++retry_state->completed_send_message_count;
+  }
+  if (batch_data->batch.send_trailing_metadata) {
+    retry_state->completed_send_trailing_metadata = true;
+  }
+  // If the call is committed, free cached data for send ops that we've just
+  // completed.
+  if (calld->retry_committed) {
     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
   }
-  // Call not being retried.
   // Construct list of closures to execute.
-  // Max number of closures is number of pending batches plus one for
-  // each of:
-  // - recv_initial_metadata_ready (either deferred or unstarted)
-  // - recv_message_ready (either deferred or unstarted)
-  // - starting a new batch for pending send ops
-  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
-  size_t num_closures = 0;
-  // If there are deferred recv_initial_metadata_ready or recv_message_ready
-  // callbacks, add them to closures.
-  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
-                                           &num_closures);
-  // Find pending batches whose ops are now complete and add their
-  // on_complete callbacks to closures.
-  add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
-                                             GRPC_ERROR_REF(error), closures,
-                                             &num_closures);
-  // Add closures to handle any pending batches that have not yet been started.
-  // If the call is finished, we fail these batches; otherwise, we add a
-  // callback to start_retriable_subchannel_batches() to start them on
-  // the subchannel call.
-  if (call_finished) {
-    add_closures_to_fail_unstarted_pending_batches(
-        elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
-  } else {
-    add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
-                                                closures, &num_closures);
+  grpc_core::CallCombinerClosureList closures;
+  // If a retry was already dispatched, that means we saw
+  // recv_trailing_metadata before this, so we do nothing here.
+  // Otherwise, invoke the callback to return the result to the surface.
+  if (!retry_state->retry_dispatched) {
+    // Add closure for the completed pending batch, if any.
+    add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
+                                            GRPC_ERROR_REF(error), &closures);
+    // If needed, add a callback to start any replay or pending send ops on
+    // the subchannel call.
+    if (!retry_state->completed_recv_trailing_metadata) {
+      add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+                                                  &closures);
+    }
   }
   // Track number of pending subchannel send batches and determine if this
   // was the last one.
-  bool last_callback_complete = false;
-  if (batch_data->batch.send_initial_metadata ||
-      batch_data->batch.send_message ||
-      batch_data->batch.send_trailing_metadata) {
-    --calld->num_pending_retriable_subchannel_send_batches;
-    last_callback_complete =
-        calld->num_pending_retriable_subchannel_send_batches == 0;
-  }
+  --calld->num_pending_retriable_subchannel_send_batches;
+  const bool last_send_batch_complete =
+      calld->num_pending_retriable_subchannel_send_batches == 0;
   // Don't need batch_data anymore.
   batch_data_unref(batch_data);
   // Schedule all of the closures identified above.
   // Note: This yeilds the call combiner.
-  execute_closures_in_call_combiner(elem, "on_complete", closures,
-                                    num_closures);
-  // If we just completed the last subchannel send batch, unref the call stack.
-  if (last_callback_complete) {
+  closures.RunClosures(calld->call_combiner);
+  // If this was the last subchannel send batch, unref the call stack.
+  if (last_send_batch_complete) {
     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
   }
 }
@@ -2172,27 +2076,22 @@
 
 // Adds a closure to closures that will execute batch in the call combiner.
 static void add_closure_for_subchannel_batch(
-    call_data* calld, grpc_transport_stream_op_batch* batch,
-    closure_to_execute* closures, size_t* num_closures) {
+    grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+    grpc_core::CallCombinerClosureList* closures) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
   batch->handler_private.extra_arg = calld->subchannel_call;
   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
                     start_batch_in_call_combiner, batch,
                     grpc_schedule_on_exec_ctx);
-  closure_to_execute* closure = &closures[(*num_closures)++];
-  closure->closure = &batch->handler_private.closure;
-  closure->error = GRPC_ERROR_NONE;
-  // If the tracer is enabled, we log a more detailed message, which
-  // requires dynamic allocation.  This will be freed in
-  // start_retriable_subchannel_batches().
   if (grpc_client_channel_trace.enabled()) {
     char* batch_str = grpc_transport_stream_op_batch_string(batch);
-    gpr_asprintf(const_cast<char**>(&closure->reason),
-                 "starting batch in call combiner: %s", batch_str);
+    gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+            calld, batch_str);
     gpr_free(batch_str);
-    closure->free_reason = true;
-  } else {
-    closure->reason = "start_subchannel_batch";
   }
+  closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+                "start_subchannel_batch");
 }
 
 // Adds retriable send_initial_metadata op to batch_data.
@@ -2328,9 +2227,13 @@
   grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
       &batch_data->recv_trailing_metadata;
-  batch_data->batch.collect_stats = true;
-  batch_data->batch.payload->collect_stats.collect_stats =
+  batch_data->batch.payload->recv_trailing_metadata.collect_stats =
       &batch_data->collect_stats;
+  GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+                    recv_trailing_metadata_ready, batch_data,
+                    grpc_schedule_on_exec_ctx);
+  batch_data->batch.payload->recv_trailing_metadata
+      .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
 }
 
 // Helper function used to start a recv_trailing_metadata batch.  This
@@ -2351,9 +2254,11 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Create batch_data with 2 refs, since this batch will be unreffed twice:
-  // once when the subchannel batch returns, and again when we actually get
-  // a recv_trailing_metadata op from the surface.
-  subchannel_batch_data* batch_data = batch_data_create(elem, 2);
+  // once for the recv_trailing_metadata_ready callback when the subchannel
+  // batch returns, and again when we actually get a recv_trailing_metadata
+  // op from the surface.
+  subchannel_batch_data* batch_data =
+      batch_data_create(elem, 2, false /* set_on_complete */);
   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
   retry_state->recv_trailing_metadata_internal_batch = batch_data;
   // Note: This will release the call combiner.
@@ -2378,7 +2283,7 @@
               "send_initial_metadata op",
               chand, calld);
     }
-    replay_batch_data = batch_data_create(elem, 1);
+    replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
     add_retriable_send_initial_metadata_op(calld, retry_state,
                                            replay_batch_data);
   }
@@ -2395,7 +2300,8 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data = batch_data_create(elem, 1);
+      replay_batch_data =
+          batch_data_create(elem, 1, true /* set_on_complete */);
     }
     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
   }
@@ -2414,7 +2320,8 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data = batch_data_create(elem, 1);
+      replay_batch_data =
+          batch_data_create(elem, 1, true /* set_on_complete */);
     }
     add_retriable_send_trailing_metadata_op(calld, retry_state,
                                             replay_batch_data);
@@ -2426,7 +2333,7 @@
 // *num_batches as needed.
 static void add_subchannel_batches_for_pending_batches(
     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    closure_to_execute* closures, size_t* num_closures) {
+    grpc_core::CallCombinerClosureList* closures) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
@@ -2482,13 +2389,11 @@
         if (retry_state->completed_recv_trailing_metadata) {
           subchannel_batch_data* batch_data =
               retry_state->recv_trailing_metadata_internal_batch;
-          closure_to_execute* closure = &closures[(*num_closures)++];
-          closure->closure = &batch_data->on_complete;
           // Batches containing recv_trailing_metadata always succeed.
-          closure->error = GRPC_ERROR_NONE;
-          closure->reason =
-              "re-executing on_complete for recv_trailing_metadata "
-              "to propagate internally triggered result";
+          closures->Add(
+              &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+              "re-executing recv_trailing_metadata_ready to propagate "
+              "internally triggered result");
         } else {
           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
         }
@@ -2500,14 +2405,19 @@
     if (calld->method_params == nullptr ||
         calld->method_params->retry_policy() == nullptr ||
         calld->retry_committed) {
-      add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
+      add_closure_for_subchannel_batch(elem, batch, closures);
       pending_batch_clear(calld, pending);
       continue;
     }
     // Create batch with the right number of callbacks.
-    const int num_callbacks =
-        1 + batch->recv_initial_metadata + batch->recv_message;
-    subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
+    const bool has_send_ops = batch->send_initial_metadata ||
+                              batch->send_message ||
+                              batch->send_trailing_metadata;
+    const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+                              batch->recv_message +
+                              batch->recv_trailing_metadata;
+    subchannel_batch_data* batch_data = batch_data_create(
+        elem, num_callbacks, has_send_ops /* set_on_complete */);
     // Cache send ops if needed.
     maybe_cache_send_ops_for_batch(calld, pending);
     // send_initial_metadata.
@@ -2534,11 +2444,9 @@
     }
     // recv_trailing_metadata.
     if (batch->recv_trailing_metadata) {
-      GPR_ASSERT(batch->collect_stats);
       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
     }
-    add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
-                                     num_closures);
+    add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (batch->send_initial_metadata || batch->send_message ||
@@ -2566,15 +2474,13 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Construct list of closures to execute, one for each pending batch.
-  // We can start up to 6 batches.
-  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
-  size_t num_closures = 0;
+  grpc_core::CallCombinerClosureList closures;
   // Replay previously-returned send_* ops if needed.
   subchannel_batch_data* replay_batch_data =
       maybe_create_subchannel_batch_for_replay(elem, retry_state);
   if (replay_batch_data != nullptr) {
-    add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
-                                     &num_closures);
+    add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
+                                     &closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2583,17 +2489,16 @@
     ++calld->num_pending_retriable_subchannel_send_batches;
   }
   // Now add pending batches.
-  add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
-                                             &num_closures);
+  add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
   // Start batches on subchannel call.
   if (grpc_client_channel_trace.enabled()) {
     gpr_log(GPR_INFO,
             "chand=%p calld=%p: starting %" PRIuPTR
             " retriable batches on subchannel_call=%p",
-            chand, calld, num_closures, calld->subchannel_call);
+            chand, calld, closures.size(), calld->subchannel_call);
   }
-  execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
-                                    closures, num_closures);
+  // Note: This will yield the call combiner.
+  closures.RunClosures(calld->call_combiner);
 }
 
 //
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index caa5472..456f9b8 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,21 +128,25 @@
   }
 }
 
-// Callback run when the call is complete.
-static void on_complete(void* arg, grpc_error* error) {
+// Callback run when we receive trailing metadata.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
   cancel_timer_if_needed(deadline_state);
-  // Invoke the next callback.
-  GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
+  // Invoke the original callback.
+  GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
+                   GRPC_ERROR_REF(error));
 }
 
-// Inject our own on_complete callback into op.
-static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
-                                  grpc_transport_stream_op_batch* op) {
-  deadline_state->next_on_complete = op->on_complete;
-  GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
+// Inject our own recv_trailing_metadata_ready callback into op.
+static void inject_recv_trailing_metadata_ready(
+    grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
+  deadline_state->original_recv_trailing_metadata_ready =
+      op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+  GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
+                    recv_trailing_metadata_ready, deadline_state,
                     grpc_schedule_on_exec_ctx);
-  op->on_complete = &deadline_state->on_complete;
+  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+      &deadline_state->recv_trailing_metadata_ready;
 }
 
 // Callback and associated state for starting the timer after call stack
@@ -226,7 +230,7 @@
     // Make sure we know when the call is complete, so that we can cancel
     // the timer.
     if (op->recv_trailing_metadata) {
-      inject_on_complete_cb(deadline_state, op);
+      inject_recv_trailing_metadata_ready(deadline_state, op);
     }
   }
 }
@@ -322,7 +326,7 @@
     // the client never sends trailing metadata, because this is the
     // hook that tells us when the call is complete on the server side.
     if (op->recv_trailing_metadata) {
-      inject_on_complete_cb(&calld->base.deadline_state, op);
+      inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
     }
   }
   // Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 13207cb..1d797f4 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@
   grpc_deadline_timer_state timer_state;
   grpc_timer timer;
   grpc_closure timer_callback;
-  // Closure to invoke when the call is complete.
+  // Closure to invoke when we receive trailing metadata.
   // We use this to cancel the timer.
-  grpc_closure on_complete;
-  // The original on_complete closure, which we chain to after our own
-  // closure is invoked.
-  grpc_closure* next_on_complete;
+  grpc_closure recv_trailing_metadata_ready;
+  // The original recv_trailing_metadata_ready closure, which we chain to
+  // after our own closure is invoked.
+  grpc_closure* original_recv_trailing_metadata_ready;
 } grpc_deadline_state;
 
 //
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index fe7f2ba..4585892 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -56,8 +56,8 @@
   grpc_closure recv_initial_metadata_ready;
   // State for handling recv_trailing_metadata ops.
   grpc_metadata_batch* recv_trailing_metadata;
-  grpc_closure* original_recv_trailing_metadata_on_complete;
-  grpc_closure recv_trailing_metadata_on_complete;
+  grpc_closure* original_recv_trailing_metadata_ready;
+  grpc_closure recv_trailing_metadata_ready;
   // State for handling send_message ops.
   grpc_transport_stream_op_batch* send_message_batch;
   size_t send_message_bytes_read;
@@ -154,8 +154,7 @@
   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
 }
 
-static void recv_trailing_metadata_on_complete(void* user_data,
-                                               grpc_error* error) {
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   if (error == GRPC_ERROR_NONE) {
@@ -164,7 +163,7 @@
   } else {
     GRPC_ERROR_REF(error);
   }
-  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
+  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
 }
 
 static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -313,8 +312,10 @@
     /* substitute our callback for the higher callback */
     calld->recv_trailing_metadata =
         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
-    calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
-    batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+    calld->original_recv_trailing_metadata_ready =
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+    batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+        &calld->recv_trailing_metadata_ready;
   }
 
   grpc_error* error = GRPC_ERROR_NONE;
@@ -421,8 +422,8 @@
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
-                    recv_trailing_metadata_on_complete, elem,
+  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+                    recv_trailing_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
   GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
                     elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 1a924e6..365a247 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1148,12 +1148,10 @@
   }
 }
 
-/* Flag that this closure barrier wants stats to be updated before finishing */
-#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
 /* Flag that this closure barrier may be covering a write in a pollset, and so
    we should not complete this closure until we can prove that the write got
    scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
 /* First bit of the reference count, stored in the high order bits (with the low
    bits being used for flags defined above) */
 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1205,10 +1203,6 @@
         grpc_error_add_child(closure->error_data.error, error);
   }
   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
-    if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
-      grpc_transport_move_stats(&s->stats, s->collecting_stats);
-      s->collecting_stats = nullptr;
-    }
     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
       GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1350,9 +1344,14 @@
   }
 
   grpc_closure* on_complete = op->on_complete;
+  // TODO(roth): This is a hack needed because we use data inside of the
+  // closure itself to do the barrier calculation (i.e., to ensure that
+  // we don't schedule the closure until all ops in the batch have been
+  // completed).  This can go away once we move to a new C++ closure API
+  // that provides the ability to create a barrier closure.
   if (on_complete == nullptr) {
-    on_complete =
-        GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
+    on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+                                    nullptr, grpc_schedule_on_exec_ctx);
   }
 
   /* use final_data as a barrier until enqueue time; the inital counter is
@@ -1360,12 +1359,6 @@
   on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
   on_complete->error_data.error = GRPC_ERROR_NONE;
 
-  if (op->collect_stats) {
-    GPR_ASSERT(s->collecting_stats == nullptr);
-    s->collecting_stats = op_payload->collect_stats.collect_stats;
-    on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
-  }
-
   if (op->cancel_stream) {
     GRPC_STATS_INC_HTTP2_OP_CANCEL();
     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1599,8 +1592,11 @@
 
   if (op->recv_trailing_metadata) {
     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+    GPR_ASSERT(s->collecting_stats == nullptr);
+    s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
-    s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
+    s->recv_trailing_metadata_finished =
+        op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
     s->recv_trailing_metadata =
         op_payload->recv_trailing_metadata.recv_trailing_metadata;
     s->final_metadata_requested = true;
@@ -1959,11 +1955,12 @@
     }
     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
         s->recv_trailing_metadata_finished != nullptr) {
+      grpc_transport_move_stats(&s->stats, s->collecting_stats);
+      s->collecting_stats = nullptr;
       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
                                                    s->recv_trailing_metadata);
-      grpc_chttp2_complete_closure_step(
-          t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
-          "recv_trailing_metadata_finished");
+      null_then_run_closure(&s->recv_trailing_metadata_finished,
+                            GRPC_ERROR_NONE);
     }
   }
 }
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 5d614c1..436997a 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,6 +925,10 @@
       result = false;
     }
     /* Check if every op that was asked for is done. */
+    /* TODO(muxi): We should not consider the recv ops here, since they
+     * have their own callbacks.  We should invoke a batch's on_complete
+     * as soon as all of the batch's send ops are complete, even if
+     * there are still recv ops pending. */
     else if (curr_op->send_initial_metadata &&
              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
       CRONET_LOG(GPR_DEBUG, "Because");
@@ -1280,12 +1284,20 @@
              op_can_be_run(stream_op, s, &oas->state,
                            OP_RECV_TRAILING_METADATA)) {
     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
-    if (oas->s->state.rs.trailing_metadata_valid) {
+    grpc_error* error = GRPC_ERROR_NONE;
+    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+      error = GRPC_ERROR_REF(stream_state->cancel_error);
+    } else if (stream_state->state_op_done[OP_FAILED]) {
+      error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
+    } else if (oas->s->state.rs.trailing_metadata_valid) {
       grpc_chttp2_incoming_metadata_buffer_publish(
           &oas->s->state.rs.trailing_metadata,
           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
       stream_state->rs.trailing_metadata_valid = false;
     }
+    GRPC_CLOSURE_SCHED(
+        stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+        error);
     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
     result = ACTION_TAKEN_NO_CALLBACK;
   } else if (stream_op->cancel_stream &&
@@ -1398,6 +1410,11 @@
       GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
                          GRPC_ERROR_CANCELLED);
     }
+    if (op->recv_trailing_metadata) {
+      GRPC_CLOSURE_SCHED(
+          op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+          GRPC_ERROR_CANCELLED);
+    }
     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
     return;
   }
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 6e5aa5a..ac83c7c 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,7 +120,6 @@
   struct inproc_stream* stream_list_next;
 } inproc_stream;
 
-static grpc_closure do_nothing_closure;
 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
 static void op_state_machine(void* arg, grpc_error* error);
 
@@ -373,6 +372,10 @@
                                          const char* msg) {
   int is_sm = static_cast<int>(op == s->send_message_op);
   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
+  // TODO(vjpai): We should not consider the recv ops here, since they
+  // have their own callbacks.  We should invoke a batch's on_complete
+  // as soon as all of the batch's send ops are complete, even if there
+  // are still recv ops pending.
   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
   int is_rm = static_cast<int>(op == s->recv_message_op);
   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -496,6 +499,11 @@
     s->send_trailing_md_op = nullptr;
   }
   if (s->recv_trailing_md_op) {
+    INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
+               s, error);
+    GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+                           .recv_trailing_metadata_ready,
+                       GRPC_ERROR_REF(error));
     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
                s, error);
     complete_if_batch_end_locked(
@@ -639,6 +647,12 @@
       s->trailing_md_sent = true;
       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
         INPROC_LOG(GPR_INFO,
+                   "op_state_machine %p scheduling trailing-metadata-ready", s);
+        GRPC_CLOSURE_SCHED(
+            s->recv_trailing_md_op->payload->recv_trailing_metadata
+                .recv_trailing_metadata_ready,
+            GRPC_ERROR_NONE);
+        INPROC_LOG(GPR_INFO,
                    "op_state_machine %p scheduling trailing-md-on-complete", s);
         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
                            GRPC_ERROR_NONE);
@@ -711,6 +725,12 @@
   }
   if (s->recv_trailing_md_op && s->t->is_client && other &&
       other->send_message_op) {
+    INPROC_LOG(GPR_INFO,
+               "op_state_machine %p scheduling trailing-metadata-ready %p", s,
+               GRPC_ERROR_NONE);
+    GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+                           .recv_trailing_metadata_ready,
+                       GRPC_ERROR_NONE);
     maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
   }
   if (s->to_read_trailing_md_filled) {
@@ -766,6 +786,10 @@
         INPROC_LOG(GPR_INFO,
                    "op_state_machine %p scheduling trailing-md-on-complete %p",
                    s, new_err);
+        GRPC_CLOSURE_SCHED(
+            s->recv_trailing_md_op->payload->recv_trailing_metadata
+                .recv_trailing_metadata_ready,
+            GRPC_ERROR_REF(new_err));
         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
                            GRPC_ERROR_REF(new_err));
         s->recv_trailing_md_op = nullptr;
@@ -859,6 +883,9 @@
     // couldn't complete that because we hadn't yet sent out trailing
     // md, now's the chance
     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+      GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+                             .recv_trailing_metadata_ready,
+                         GRPC_ERROR_REF(s->cancel_self_error));
       complete_if_batch_end_locked(
           s, s->cancel_self_error, s->recv_trailing_md_op,
           "cancel_stream scheduling trailing-md-on-complete");
@@ -873,6 +900,8 @@
   return ret;
 }
 
+static void do_nothing(void* arg, grpc_error* error) {}
+
 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
                               grpc_transport_stream_op_batch* op) {
   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -892,8 +921,14 @@
   }
   grpc_error* error = GRPC_ERROR_NONE;
   grpc_closure* on_complete = op->on_complete;
+  // TODO(roth): This is a hack needed because we use data inside of the
+  // closure itself to do the barrier calculation (i.e., to ensure that
+  // we don't schedule the closure until all ops in the batch have been
+  // completed).  This can go away once we move to a new C++ closure API
+  // that provides the ability to create a barrier closure.
   if (on_complete == nullptr) {
-    on_complete = &do_nothing_closure;
+    on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+                                    nullptr, grpc_schedule_on_exec_ctx);
   }
 
   if (op->cancel_stream) {
@@ -1026,6 +1061,15 @@
         GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
                            GRPC_ERROR_REF(error));
       }
+      if (op->recv_trailing_metadata) {
+        INPROC_LOG(
+            GPR_INFO,
+            "perform_stream_op error %p scheduling trailing-metadata-ready %p",
+            s, error);
+        GRPC_CLOSURE_SCHED(
+            op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+            GRPC_ERROR_REF(error));
+      }
     }
     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
                error);
@@ -1129,12 +1173,8 @@
 /*******************************************************************************
  * GLOBAL INIT AND DESTROY
  */
-static void do_nothing(void* arg, grpc_error* error) {}
-
 void grpc_inproc_transport_init(void) {
   grpc_core::ExecCtx exec_ctx;
-  GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
-                    grpc_schedule_on_exec_ctx);
   g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
 
   grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029..e2ea334 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@
   callback_state on_complete[6];  // Max number of pending batches.
   callback_state recv_initial_metadata_ready;
   callback_state recv_message_ready;
+  callback_state recv_trailing_metadata_ready;
 } call_data;
 
 static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@
     intercept_callback(calld, state, false, "recv_message_ready",
                        &batch->payload->recv_message.recv_message_ready);
   }
+  if (batch->recv_trailing_metadata) {
+    callback_state* state = &calld->recv_trailing_metadata_ready;
+    intercept_callback(
+        calld, state, false, "recv_trailing_metadata_ready",
+        &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+  }
   if (batch->cancel_stream) {
     // There can be more than one cancellation batch in flight at any
     // given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@
         static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
     intercept_callback(calld, state, true, "on_complete (cancel_stream)",
                        &batch->on_complete);
-  } else {
+  } else if (batch->on_complete != nullptr) {
     callback_state* state = get_state_for_batch(calld, batch);
     intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
   }
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index f36f6cb..0d2586e 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -99,6 +99,8 @@
   void push_back(T&& value) { emplace_back(std::move(value)); }
 
   size_t size() const { return size_; }
+  bool empty() const { return size_ == 0; }
+
   size_t capacity() const { return capacity_; }
 
   void clear() {
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 0ccd08e..641fa18 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,6 +26,7 @@
 #include <grpc/support/atm.h>
 
 #include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
 #include "src/core/lib/iomgr/closure.h"
 
 // A simple, lock-free mechanism for serializing activity related to a
@@ -109,4 +110,83 @@
 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
                                grpc_error* error);
 
+namespace grpc_core {
+
+// Helper for running a list of closures in a call combiner.
+//
+// Each callback running in the call combiner will eventually be
+// returned to the surface, at which point the surface will yield the
+// call combiner.  So when we are running in the call combiner and have
+// more than one callback to return to the surface, we need to re-enter
+// the call combiner for all but one of those callbacks.
+class CallCombinerClosureList {
+ public:
+  CallCombinerClosureList() {}
+
+  // Adds a closure to the list.  The closure must eventually result in
+  // the call combiner being yielded.
+  void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
+    closures_.emplace_back(closure, error, reason);
+  }
+
+  // Runs all closures in the call combiner and yields the call combiner.
+  //
+  // All but one of the closures in the list will be scheduled via
+  // GRPC_CALL_COMBINER_START(), and the remaining closure will be
+  // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
+  // yielding the call combiner.  If the list is empty, then the call
+  // combiner will be yielded immediately.
+  void RunClosures(grpc_call_combiner* call_combiner) {
+    if (closures_.empty()) {
+      GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
+      return;
+    }
+    for (size_t i = 1; i < closures_.size(); ++i) {
+      auto& closure = closures_[i];
+      GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+                               closure.reason);
+    }
+    if (grpc_call_combiner_trace.enabled()) {
+      gpr_log(GPR_INFO,
+              "CallCombinerClosureList executing closure while already "
+              "holding call_combiner %p: closure=%p error=%s reason=%s",
+              call_combiner, closures_[0].closure,
+              grpc_error_string(closures_[0].error), closures_[0].reason);
+    }
+    // This will release the call combiner.
+    GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
+    closures_.clear();
+  }
+
+  // Runs all closures in the call combiner, but does NOT yield the call
+  // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
+  void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
+    for (size_t i = 0; i < closures_.size(); ++i) {
+      auto& closure = closures_[i];
+      GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+                               closure.reason);
+    }
+    closures_.clear();
+  }
+
+  size_t size() const { return closures_.size(); }
+
+ private:
+  struct CallCombinerClosure {
+    grpc_closure* closure;
+    grpc_error* error;
+    const char* reason;
+
+    CallCombinerClosure(grpc_closure* closure, grpc_error* error,
+                        const char* reason)
+        : closure(closure), error(error), reason(reason) {}
+  };
+
+  // There are generally a maximum of 6 closures to run in the call
+  // combiner, one for each pending op.
+  InlinedVector<CallCombinerClosure, 6> closures_;
+};
+
+}  // namespace grpc_core
+
 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 34a4944..f14c723 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,9 +283,10 @@
     if (c->scheduled) {
       gpr_log(GPR_ERROR,
               "Closure already scheduled. (closure: %p, created: [%s:%d], "
-              "previously scheduled at: [%s: %d] run?: %s",
+              "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
+              "run?: %s",
               c, c->file_created, c->line_created, c->file_initiated,
-              c->line_initiated, c->run ? "true" : "false");
+              c->line_initiated, file, line, c->run ? "true" : "false");
       abort();
     }
     c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 1cf8ea9..8b224b6 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,6 +233,7 @@
   grpc_closure receiving_slice_ready;
   grpc_closure receiving_stream_ready;
   grpc_closure receiving_initial_metadata_ready;
+  grpc_closure receiving_trailing_metadata_ready;
   uint32_t test_only_last_message_flags;
 
   grpc_closure release_call;
@@ -270,8 +271,17 @@
 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
 
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
-#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
+/* Given a size, round up to the next multiple of sizeof(void*) */
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+  (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
+
+#define CALL_STACK_FROM_CALL(call)   \
+  (grpc_call_stack*)((char*)(call) + \
+                     ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+#define CALL_FROM_CALL_STACK(call_stack) \
+  (grpc_call*)(((char*)(call_stack)) -   \
+               ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+
 #define CALL_ELEM_FROM_CALL(call, idx) \
   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
 #define CALL_FROM_TOP_ELEM(top_elem) \
@@ -342,8 +352,9 @@
   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
   gpr_arena* arena = gpr_arena_create(initial_size);
-  call = static_cast<grpc_call*>(gpr_arena_alloc(
-      arena, sizeof(grpc_call) + channel_stack->call_stack_size));
+  call = static_cast<grpc_call*>(
+      gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+                                 channel_stack->call_stack_size));
   gpr_ref_init(&call->ext_ref, 1);
   call->arena = arena;
   grpc_call_combiner_init(&call->call_combiner);
@@ -1209,7 +1220,6 @@
 
   if (bctl->op.send_initial_metadata) {
     grpc_metadata_batch_destroy(
-
         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
   }
   if (bctl->op.send_message) {
@@ -1217,14 +1227,9 @@
   }
   if (bctl->op.send_trailing_metadata) {
     grpc_metadata_batch_destroy(
-
         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
   }
   if (bctl->op.recv_trailing_metadata) {
-    grpc_metadata_batch* md =
-        &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-    recv_trailing_filter(call, md);
-
     /* propagate cancellation to any interested children */
     gpr_atm_rel_store(&call->received_final_op_atm, 1);
     parent_call* pc = get_parent_call(call);
@@ -1246,7 +1251,6 @@
       }
       gpr_mu_unlock(&pc->child_list_mu);
     }
-
     if (call->is_client) {
       get_final_status(call, set_status_value_directly,
                        call->final_op.client.status,
@@ -1256,7 +1260,6 @@
       get_final_status(call, set_cancelled_value,
                        call->final_op.server.cancelled, nullptr, nullptr);
     }
-
     GRPC_ERROR_UNREF(error);
     error = GRPC_ERROR_NONE;
   }
@@ -1538,6 +1541,19 @@
   finish_batch_step(bctl);
 }
 
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+  batch_control* bctl = static_cast<batch_control*>(bctlp);
+  grpc_call* call = bctl->call;
+  GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+  add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+  if (error == GRPC_ERROR_NONE) {
+    grpc_metadata_batch* md =
+        &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+    recv_trailing_filter(call, md);
+  }
+  finish_batch_step(bctl);
+}
+
 static void finish_batch(void* bctlp, grpc_error* error) {
   batch_control* bctl = static_cast<batch_control*>(bctlp);
   grpc_call* call = bctl->call;
@@ -1558,7 +1574,8 @@
   size_t i;
   const grpc_op* op;
   batch_control* bctl;
-  int num_completion_callbacks_needed = 1;
+  bool has_send_ops = false;
+  int num_recv_ops = 0;
   grpc_call_error error = GRPC_CALL_OK;
   grpc_transport_stream_op_batch* stream_op;
   grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1664,6 +1681,7 @@
           stream_op_payload->send_initial_metadata.peer_string =
               &call->peer_string;
         }
+        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_MESSAGE: {
@@ -1693,6 +1711,7 @@
             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
         stream_op_payload->send_message.send_message.reset(
             call->sending_stream.get());
+        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1713,6 +1732,7 @@
         call->sent_final_op = true;
         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1777,6 +1797,7 @@
         }
         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+        has_send_ops = true;
         break;
       }
       case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1804,7 +1825,7 @@
           stream_op_payload->recv_initial_metadata.peer_string =
               &call->peer_string;
         }
-        num_completion_callbacks_needed++;
+        ++num_recv_ops;
         break;
       }
       case GRPC_OP_RECV_MESSAGE: {
@@ -1826,7 +1847,7 @@
                           grpc_schedule_on_exec_ctx);
         stream_op_payload->recv_message.recv_message_ready =
             &call->receiving_stream_ready;
-        num_completion_callbacks_needed++;
+        ++num_recv_ops;
         break;
       }
       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1852,11 +1873,16 @@
         call->final_op.client.error_string =
             op->data.recv_status_on_client.error_string;
         stream_op->recv_trailing_metadata = true;
-        stream_op->collect_stats = true;
         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op_payload->collect_stats.collect_stats =
+        stream_op_payload->recv_trailing_metadata.collect_stats =
             &call->final_info.stats.transport_stream_stats;
+        GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+                          receiving_trailing_metadata_ready, bctl,
+                          grpc_schedule_on_exec_ctx);
+        stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+            &call->receiving_trailing_metadata_ready;
+        ++num_recv_ops;
         break;
       }
       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1877,11 +1903,16 @@
         call->final_op.server.cancelled =
             op->data.recv_close_on_server.cancelled;
         stream_op->recv_trailing_metadata = true;
-        stream_op->collect_stats = true;
         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op_payload->collect_stats.collect_stats =
+        stream_op_payload->recv_trailing_metadata.collect_stats =
             &call->final_info.stats.transport_stream_stats;
+        GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+                          receiving_trailing_metadata_ready, bctl,
+                          grpc_schedule_on_exec_ctx);
+        stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+            &call->receiving_trailing_metadata_ready;
+        ++num_recv_ops;
         break;
       }
     }
@@ -1891,13 +1922,15 @@
   if (!is_notify_tag_closure) {
     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
   }
-  gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+  gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
 
-  GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
-                    grpc_schedule_on_exec_ctx);
-  stream_op->on_complete = &bctl->finish_batch;
+  if (has_send_ops) {
+    GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+                      grpc_schedule_on_exec_ctx);
+    stream_op->on_complete = &bctl->finish_batch;
+  }
+
   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
-
   execute_batch(call, stream_op, &bctl->start_batch);
 
 done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 039d603..cbdb77c 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,21 +212,32 @@
   if (batch->send_message) {
     batch->payload->send_message.send_message.reset();
   }
-  if (batch->recv_message) {
-    GRPC_CALL_COMBINER_START(
-        call_combiner, batch->payload->recv_message.recv_message_ready,
-        GRPC_ERROR_REF(error), "failing recv_message_ready");
-  }
-  if (batch->recv_initial_metadata) {
-    GRPC_CALL_COMBINER_START(
-        call_combiner,
-        batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
-        GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
-  }
-  GRPC_CLOSURE_SCHED(batch->on_complete, error);
   if (batch->cancel_stream) {
     GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
   }
+  // Construct a list of closures to execute.
+  grpc_core::CallCombinerClosureList closures;
+  if (batch->recv_initial_metadata) {
+    closures.Add(
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
+        GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
+  }
+  if (batch->recv_message) {
+    closures.Add(batch->payload->recv_message.recv_message_ready,
+                 GRPC_ERROR_REF(error), "failing recv_message_ready");
+  }
+  if (batch->recv_trailing_metadata) {
+    closures.Add(
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+        GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
+  }
+  if (batch->on_complete != nullptr) {
+    closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
+                 "failing on_complete");
+  }
+  // Execute closures.
+  closures.RunClosures(call_combiner);
+  GRPC_ERROR_UNREF(error);
 }
 
 typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b2e252d..585b9df 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,9 +122,15 @@
 /* Transport stream op: a set of operations to perform on a transport
    against a single stream */
 typedef struct grpc_transport_stream_op_batch {
-  /** Should be enqueued when all requested operations (excluding recv_message
-      and recv_initial_metadata which have their own closures) in a given batch
-      have been completed. */
+  /** Should be scheduled when all of the non-recv operations in the batch
+      are complete.
+
+      The recv ops (recv_initial_metadata, recv_message, and
+      recv_trailing_metadata) each have their own callbacks.  If a batch
+      contains both recv ops and non-recv ops, on_complete should be
+      scheduled as soon as the non-recv ops are complete, regardless of
+      whether or not the recv ops are complete.  If a batch contains
+      only recv ops, on_complete can be null. */
   grpc_closure* on_complete;
 
   /** Values for the stream op (fields set are determined by flags above) */
@@ -149,9 +155,6 @@
    */
   bool recv_trailing_metadata : 1;
 
-  /** Collect any stats into provided buffer, zero internal stat counters */
-  bool collect_stats : 1;
-
   /** Cancel this stream with the provided error */
   bool cancel_stream : 1;
 
@@ -219,11 +222,10 @@
 
   struct {
     grpc_metadata_batch* recv_trailing_metadata;
-  } recv_trailing_metadata;
-
-  struct {
     grpc_transport_stream_stats* collect_stats;
-  } collect_stats;
+    /** Should be enqueued when initial metadata is ready to be processed. */
+    grpc_closure* recv_trailing_metadata_ready;
+  } recv_trailing_metadata;
 
   /** Forcefully close this stream.
       The HTTP2 semantics should be:
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 25ab492..8c7db64 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,13 +120,6 @@
     gpr_strvec_add(&b, tmp);
   }
 
-  if (op->collect_stats) {
-    gpr_strvec_add(&b, gpr_strdup(" "));
-    gpr_asprintf(&tmp, "COLLECT_STATS:%p",
-                 op->payload->collect_stats.collect_stats);
-    gpr_strvec_add(&b, tmp);
-  }
-
   out = gpr_strvec_flatten(&b, nullptr);
   gpr_strvec_destroy(&b);