Revert "move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops"
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 98391a1..ea6775a 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,7 +817,6 @@
   // For intercepting recv_trailing_metadata.
   grpc_metadata_batch recv_trailing_metadata;
   grpc_transport_stream_stats collect_stats;
-  grpc_closure recv_trailing_metadata_ready;
   // For intercepting on_complete.
   grpc_closure on_complete;
 } subchannel_batch_data;
@@ -1193,24 +1192,35 @@
             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
             elem->channel_data, calld, num_batches, grpc_error_string(error));
   }
-  grpc_core::CallCombinerClosureList closures;
+  grpc_transport_stream_op_batch*
+      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_batches = 0;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batch->handler_private.extra_arg = calld;
-      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                        fail_pending_batch_in_call_combiner, batch,
-                        grpc_schedule_on_exec_ctx);
-      closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
-                   "pending_batches_fail");
+      batches[num_batches++] = batch;
       pending_batch_clear(calld, pending);
     }
   }
+  for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
+    grpc_transport_stream_op_batch* batch = batches[i];
+    batch->handler_private.extra_arg = calld;
+    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                      fail_pending_batch_in_call_combiner, batch,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &batch->handler_private.closure,
+                             GRPC_ERROR_REF(error), "pending_batches_fail");
+  }
   if (yield_call_combiner) {
-    closures.RunClosures(calld->call_combiner);
-  } else {
-    closures.RunClosuresWithoutYielding(calld->call_combiner);
+    if (num_batches > 0) {
+      // Note: This will release the call combiner.
+      grpc_transport_stream_op_batch_finish_with_failure(
+          batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
+    } else {
+      GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
+    }
   }
   GRPC_ERROR_UNREF(error);
 }
@@ -1245,22 +1255,30 @@
             " pending batches on subchannel_call=%p",
             chand, calld, num_batches, calld->subchannel_call);
   }
-  grpc_core::CallCombinerClosureList closures;
+  grpc_transport_stream_op_batch*
+      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_batches = 0;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batch->handler_private.extra_arg = calld->subchannel_call;
-      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                        resume_pending_batch_in_call_combiner, batch,
-                        grpc_schedule_on_exec_ctx);
-      closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
-                   "pending_batches_resume");
+      batches[num_batches++] = batch;
       pending_batch_clear(calld, pending);
     }
   }
+  for (size_t i = 1; i < num_batches; ++i) {
+    grpc_transport_stream_op_batch* batch = batches[i];
+    batch->handler_private.extra_arg = calld->subchannel_call;
+    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                      resume_pending_batch_in_call_combiner, batch,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &batch->handler_private.closure, GRPC_ERROR_NONE,
+                             "pending_batches_resume");
+  }
+  GPR_ASSERT(num_batches > 0);
   // Note: This will release the call combiner.
-  closures.RunClosures(calld->call_combiner);
+  grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
 }
 
 static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1275,10 +1293,7 @@
        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
            nullptr) &&
       (!batch->recv_message ||
-       batch->payload->recv_message.recv_message_ready == nullptr) &&
-      (!batch->recv_trailing_metadata ||
-       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
-           nullptr)) {
+       batch->payload->recv_message.recv_message_ready == nullptr)) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
               calld);
@@ -1287,27 +1302,75 @@
   }
 }
 
-// Returns a pointer to the first pending batch for which predicate(batch)
-// returns true, or null if not found.
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
-                                         const char* log_message,
-                                         Predicate predicate) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    grpc_transport_stream_op_batch* batch = pending->batch;
-    if (batch != nullptr && predicate(batch)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
-                calld, log_message, i);
-      }
-      return pending;
-    }
+// Returns true if all ops in the pending batch have been completed.
+static bool pending_batch_is_completed(
+    pending_batch* pending, call_data* calld,
+    subchannel_call_retry_state* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
   }
-  return nullptr;
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->completed_send_initial_metadata) {
+    return false;
+  }
+  if (pending->batch->send_message &&
+      retry_state->completed_send_message_count <
+          calld->send_messages->size()) {
+    return false;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->completed_send_trailing_metadata) {
+    return false;
+  }
+  if (pending->batch->recv_initial_metadata &&
+      !retry_state->completed_recv_initial_metadata) {
+    return false;
+  }
+  if (pending->batch->recv_message &&
+      retry_state->completed_recv_message_count <
+          retry_state->started_recv_message_count) {
+    return false;
+  }
+  if (pending->batch->recv_trailing_metadata &&
+      !retry_state->completed_recv_trailing_metadata) {
+    return false;
+  }
+  return true;
+}
+
+// Returns true if any op in the batch was not yet started.
+static bool pending_batch_is_unstarted(
+    pending_batch* pending, call_data* calld,
+    subchannel_call_retry_state* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
+  }
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->started_send_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->send_message &&
+      retry_state->started_send_message_count < calld->send_messages->size()) {
+    return true;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->started_send_trailing_metadata) {
+    return true;
+  }
+  if (pending->batch->recv_initial_metadata &&
+      !retry_state->started_recv_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->recv_message &&
+      retry_state->completed_recv_message_count ==
+          retry_state->started_recv_message_count) {
+    return true;
+  }
+  if (pending->batch->recv_trailing_metadata &&
+      !retry_state->started_recv_trailing_metadata) {
+    return true;
+  }
+  return false;
 }
 
 //
@@ -1494,13 +1557,8 @@
 // subchannel_batch_data
 //
 
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount.  If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
-                                                int refcount,
-                                                bool set_on_complete) {
+                                                int refcount) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   subchannel_call_retry_state* retry_state =
       static_cast<subchannel_call_retry_state*>(
@@ -1513,11 +1571,9 @@
       GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
   batch_data->batch.payload = &retry_state->batch_payload;
   gpr_ref_init(&batch_data->refs, refcount);
-  if (set_on_complete) {
-    GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
-                      grpc_schedule_on_exec_ctx);
-    batch_data->batch.on_complete = &batch_data->on_complete;
-  }
+  GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+                    grpc_schedule_on_exec_ctx);
+  batch_data->batch.on_complete = &batch_data->on_complete;
   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
   return batch_data;
 }
@@ -1550,14 +1606,26 @@
 static void invoke_recv_initial_metadata_callback(void* arg,
                                                   grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+  channel_data* chand =
+      static_cast<channel_data*>(batch_data->elem->channel_data);
+  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending batch.
-  pending_batch* pending = pending_batch_find(
-      batch_data->elem, "invoking recv_initial_metadata_ready for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_initial_metadata &&
-               batch->payload->recv_initial_metadata
-                       .recv_initial_metadata_ready != nullptr;
-      });
+  pending_batch* pending = nullptr;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+    if (batch != nullptr && batch->recv_initial_metadata &&
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
+            nullptr) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
+                "pending batch at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      pending = &calld->pending_batches[i];
+      break;
+    }
+  }
   GPR_ASSERT(pending != nullptr);
   // Return metadata.
   grpc_metadata_batch_move(
@@ -1593,19 +1661,10 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
-  retry_state->completed_recv_initial_metadata = true;
-  // If a retry was already dispatched, then we're not going to use the
-  // result of this recv_initial_metadata op, so do nothing.
-  if (retry_state->retry_dispatched) {
-    GRPC_CALL_COMBINER_STOP(
-        calld->call_combiner,
-        "recv_initial_metadata_ready after retry dispatched");
-    return;
-  }
   // If we got an error or a Trailers-Only response and have not yet gotten
-  // the recv_trailing_metadata_ready callback, then defer propagating this
-  // callback back to the surface.  We can evaluate whether to retry when
-  // recv_trailing_metadata comes back.
+  // the recv_trailing_metadata on_complete callback, then defer
+  // propagating this callback back to the surface.  We can evaluate whether
+  // to retry when recv_trailing_metadata comes back.
   if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
                     error != GRPC_ERROR_NONE) &&
                    !retry_state->completed_recv_trailing_metadata)) {
@@ -1630,9 +1689,9 @@
   }
   // Received valid initial metadata, so commit the call.
   retry_commit(elem, retry_state);
-  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_initial_metadata_callback(batch_data, error);
+  GRPC_ERROR_UNREF(error);
 }
 
 //
@@ -1642,13 +1701,25 @@
 // Invokes recv_message_ready for a subchannel batch.
 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+  channel_data* chand =
+      static_cast<channel_data*>(batch_data->elem->channel_data);
+  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending op.
-  pending_batch* pending = pending_batch_find(
-      batch_data->elem, "invoking recv_message_ready for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_message &&
-               batch->payload->recv_message.recv_message_ready != nullptr;
-      });
+  pending_batch* pending = nullptr;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+    if (batch != nullptr && batch->recv_message &&
+        batch->payload->recv_message.recv_message_ready != nullptr) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: invoking recv_message_ready for "
+                "pending batch at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      pending = &calld->pending_batches[i];
+      break;
+    }
+  }
   GPR_ASSERT(pending != nullptr);
   // Return payload.
   *pending->batch->payload->recv_message.recv_message =
@@ -1680,18 +1751,10 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
-  ++retry_state->completed_recv_message_count;
-  // If a retry was already dispatched, then we're not going to use the
-  // result of this recv_message op, so do nothing.
-  if (retry_state->retry_dispatched) {
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
-                            "recv_message_ready after retry dispatched");
-    return;
-  }
   // If we got an error or the payload was nullptr and we have not yet gotten
-  // the recv_trailing_metadata_ready callback, then defer propagating this
-  // callback back to the surface.  We can evaluate whether to retry when
-  // recv_trailing_metadata comes back.
+  // the recv_trailing_metadata on_complete callback, then defer
+  // propagating this callback back to the surface.  We can evaluate whether
+  // to retry when recv_trailing_metadata comes back.
   if (GPR_UNLIKELY(
           (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
           !retry_state->completed_recv_trailing_metadata)) {
@@ -1714,241 +1777,133 @@
   }
   // Received a valid message, so commit the call.
   retry_commit(elem, retry_state);
-  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_message_callback(batch_data, error);
-}
-
-//
-// recv_trailing_metadata handling
-//
-
-// Adds recv_trailing_metadata_ready closure to closures.
-static void add_closure_for_recv_trailing_metadata_ready(
-    grpc_call_element* elem, subchannel_batch_data* batch_data,
-    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
-  // Find pending batch.
-  pending_batch* pending = pending_batch_find(
-      elem, "invoking recv_trailing_metadata for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_trailing_metadata &&
-               batch->payload->recv_trailing_metadata
-                       .recv_trailing_metadata_ready != nullptr;
-      });
-  // If we generated the recv_trailing_metadata op internally via
-  // start_internal_recv_trailing_metadata(), then there will be no
-  // pending batch.
-  if (pending == nullptr) {
-    GRPC_ERROR_UNREF(error);
-    return;
-  }
-  // Return metadata.
-  grpc_metadata_batch_move(
-      &batch_data->recv_trailing_metadata,
-      pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
-  // Add closure.
-  closures->Add(pending->batch->payload->recv_trailing_metadata
-                    .recv_trailing_metadata_ready,
-                error, "recv_trailing_metadata_ready for pending batch");
-  // Update bookkeeping.
-  pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-      nullptr;
-  maybe_clear_pending_batch(elem, pending);
-}
-
-// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures.
-static void add_closures_for_deferred_recv_callbacks(
-    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
-  if (batch_data->batch.recv_trailing_metadata) {
-    // Add closure for deferred recv_initial_metadata_ready.
-    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
-                     nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
-                        invoke_recv_initial_metadata_callback,
-                        retry_state->recv_initial_metadata_ready_deferred_batch,
-                        grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_initial_metadata_ready,
-                    retry_state->recv_initial_metadata_error,
-                    "resuming recv_initial_metadata_ready");
-      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
-    }
-    // Add closure for deferred recv_message_ready.
-    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
-                     nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
-                        invoke_recv_message_callback,
-                        retry_state->recv_message_ready_deferred_batch,
-                        grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_message_ready,
-                    retry_state->recv_message_error,
-                    "resuming recv_message_ready");
-      retry_state->recv_message_ready_deferred_batch = nullptr;
-    }
-  }
-}
-
-// Returns true if any op in the batch was not yet started.
-// Only looks at send ops, since recv ops are always started immediately.
-static bool pending_batch_is_unstarted(
-    pending_batch* pending, call_data* calld,
-    subchannel_call_retry_state* retry_state) {
-  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
-    return false;
-  }
-  if (pending->batch->send_initial_metadata &&
-      !retry_state->started_send_initial_metadata) {
-    return true;
-  }
-  if (pending->batch->send_message &&
-      retry_state->started_send_message_count < calld->send_messages->size()) {
-    return true;
-  }
-  if (pending->batch->send_trailing_metadata &&
-      !retry_state->started_send_trailing_metadata) {
-    return true;
-  }
-  return false;
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures.
-static void add_closures_to_fail_unstarted_pending_batches(
-    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: failing unstarted pending batch at index "
-                "%" PRIuPTR,
-                chand, calld, i);
-      }
-      closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
-                    "failing on_complete for pending batch");
-      pending->batch->on_complete = nullptr;
-      maybe_clear_pending_batch(elem, pending);
-    }
-  }
   GRPC_ERROR_UNREF(error);
 }
 
-// Intercepts recv_trailing_metadata_ready callback for retries.
-// Commits the call and returns the trailing metadata up the stack.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
-  subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
-  grpc_call_element* elem = batch_data->elem;
+//
+// list of closures to execute in call combiner
+//
+
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+  grpc_closure* closure;
+  grpc_error* error;
+  const char* reason;
+  bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+                                              const char* caller,
+                                              closure_to_execute* closures,
+                                              size_t num_closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
-            chand, calld, grpc_error_string(error));
-  }
-  subchannel_call_retry_state* retry_state =
-      static_cast<subchannel_call_retry_state*>(
-          grpc_connected_subchannel_call_get_parent_data(
-              batch_data->subchannel_call));
-  retry_state->completed_recv_trailing_metadata = true;
-  // Get the call's status and check for server pushback metadata.
-  grpc_status_code status = GRPC_STATUS_OK;
-  grpc_mdelem* server_pushback_md = nullptr;
-  if (error != GRPC_ERROR_NONE) {
-    grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
-                          nullptr);
+  // Note that the call combiner will be yielded for each closure that
+  // we schedule.  We're already running in the call combiner, so one of
+  // the closures can be scheduled directly, but the others will
+  // have to re-enter the call combiner.
+  if (num_closures > 0) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
+              calld, caller, closures[0].reason);
+    }
+    GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+    if (closures[0].free_reason) {
+      gpr_free(const_cast<char*>(closures[0].reason));
+    }
+    for (size_t i = 1; i < num_closures; ++i) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: %s starting closure in call combiner: %s",
+                chand, calld, caller, closures[i].reason);
+      }
+      GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+                               closures[i].error, closures[i].reason);
+      if (closures[i].free_reason) {
+        gpr_free(const_cast<char*>(closures[i].reason));
+      }
+    }
   } else {
-    grpc_metadata_batch* md_batch =
-        batch_data->batch.payload->recv_trailing_metadata
-            .recv_trailing_metadata;
-    GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
-    status =
-        grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
-    if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
-      server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
+              calld, caller);
     }
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
   }
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
-            calld, grpc_status_code_to_string(status));
-  }
-  // Check if we should retry.
-  if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
-    // Unref batch_data for deferred recv_initial_metadata_ready or
-    // recv_message_ready callbacks, if any.
-    if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
-      batch_data_unref(batch_data);
-      GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
-    }
-    if (retry_state->recv_message_ready_deferred_batch != nullptr) {
-      batch_data_unref(batch_data);
-      GRPC_ERROR_UNREF(retry_state->recv_message_error);
-    }
-    batch_data_unref(batch_data);
-    return;
-  }
-  // Not retrying, so commit the call.
-  retry_commit(elem, retry_state);
-  // Construct list of closures to execute.
-  grpc_core::CallCombinerClosureList closures;
-  // First, add closure for recv_trailing_metadata_ready.
-  add_closure_for_recv_trailing_metadata_ready(
-      elem, batch_data, GRPC_ERROR_REF(error), &closures);
-  // If there are deferred recv_initial_metadata_ready or recv_message_ready
-  // callbacks, add them to closures.
-  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
-  // Add closures to fail any pending batches that have not yet been started.
-  add_closures_to_fail_unstarted_pending_batches(
-      elem, retry_state, GRPC_ERROR_REF(error), &closures);
-  // Don't need batch_data anymore.
-  batch_data_unref(batch_data);
-  // Schedule all of the closures identified above.
-  // Note: This will release the call combiner.
-  closures.RunClosures(calld->call_combiner);
 }
 
 //
 // on_complete callback handling
 //
 
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures.
-static void add_closure_for_completed_pending_batch(
-    grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state, grpc_error* error,
-    grpc_core::CallCombinerClosureList* closures) {
-  pending_batch* pending = pending_batch_find(
-      elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
-        return batch->on_complete != nullptr &&
-               batch_data->batch.send_initial_metadata ==
-                   batch->send_initial_metadata &&
-               batch_data->batch.send_message == batch->send_message &&
-               batch_data->batch.send_trailing_metadata ==
-                   batch->send_trailing_metadata;
-      });
-  // If batch_data is a replay batch, then there will be no pending
-  // batch to complete.
-  if (pending == nullptr) {
-    GRPC_ERROR_UNREF(error);
-    return;
+// Updates retry_state to reflect the ops completed in batch_data.
+static void update_retry_state_for_completed_batch(
+    subchannel_batch_data* batch_data,
+    subchannel_call_retry_state* retry_state) {
+  if (batch_data->batch.send_initial_metadata) {
+    retry_state->completed_send_initial_metadata = true;
   }
-  // Add closure.
-  closures->Add(pending->batch->on_complete, error,
-                "on_complete for pending batch");
-  pending->batch->on_complete = nullptr;
-  maybe_clear_pending_batch(elem, pending);
+  if (batch_data->batch.send_message) {
+    ++retry_state->completed_send_message_count;
+  }
+  if (batch_data->batch.send_trailing_metadata) {
+    retry_state->completed_send_trailing_metadata = true;
+  }
+  if (batch_data->batch.recv_initial_metadata) {
+    retry_state->completed_recv_initial_metadata = true;
+  }
+  if (batch_data->batch.recv_message) {
+    ++retry_state->completed_recv_message_count;
+  }
+  if (batch_data->batch.recv_trailing_metadata) {
+    retry_state->completed_recv_trailing_metadata = true;
+  }
+}
+
+// Adds any necessary closures for deferred recv_initial_metadata and
+// recv_message callbacks to closures, updating *num_closures as needed.
+static void add_closures_for_deferred_recv_callbacks(
+    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
+    closure_to_execute* closures, size_t* num_closures) {
+  if (batch_data->batch.recv_trailing_metadata) {
+    // Add closure for deferred recv_initial_metadata_ready.
+    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
+                     nullptr)) {
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = GRPC_CLOSURE_INIT(
+          &batch_data->recv_initial_metadata_ready,
+          invoke_recv_initial_metadata_callback,
+          retry_state->recv_initial_metadata_ready_deferred_batch,
+          grpc_schedule_on_exec_ctx);
+      closure->error = retry_state->recv_initial_metadata_error;
+      closure->reason = "resuming recv_initial_metadata_ready";
+      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+    }
+    // Add closure for deferred recv_message_ready.
+    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
+                     nullptr)) {
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = GRPC_CLOSURE_INIT(
+          &batch_data->recv_message_ready, invoke_recv_message_callback,
+          retry_state->recv_message_ready_deferred_batch,
+          grpc_schedule_on_exec_ctx);
+      closure->error = retry_state->recv_message_error;
+      closure->reason = "resuming recv_message_ready";
+      retry_state->recv_message_ready_deferred_batch = nullptr;
+    }
+  }
 }
 
 // If there are any cached ops to replay or pending ops to start on the
 // subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches().
+// start_retriable_subchannel_batches(), updating *num_closures as needed.
 static void add_closures_for_replay_or_pending_send_ops(
     grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
+    subchannel_call_retry_state* retry_state, closure_to_execute* closures,
+    size_t* num_closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   bool have_pending_send_message_ops =
@@ -1974,14 +1929,95 @@
               "chand=%p calld=%p: starting next batch for pending send op(s)",
               chand, calld);
     }
-    GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
-                      start_retriable_subchannel_batches, elem,
-                      grpc_schedule_on_exec_ctx);
-    closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
-                  "starting next batch for send_* op(s)");
+    closure_to_execute* closure = &closures[(*num_closures)++];
+    closure->closure = GRPC_CLOSURE_INIT(
+        &batch_data->batch.handler_private.closure,
+        start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
+    closure->error = GRPC_ERROR_NONE;
+    closure->reason = "starting next batch for send_* op(s)";
   }
 }
 
+// For any pending batch completed in batch_data, adds the necessary
+// completion closures to closures, updating *num_closures as needed.
+static void add_closures_for_completed_pending_batches(
+    grpc_call_element* elem, subchannel_batch_data* batch_data,
+    subchannel_call_retry_state* retry_state, grpc_error* error,
+    closure_to_execute* closures, size_t* num_closures) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    if (pending_batch_is_completed(pending, calld, retry_state)) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      // Copy the trailing metadata to return it to the surface.
+      if (batch_data->batch.recv_trailing_metadata) {
+        grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
+                                 pending->batch->payload->recv_trailing_metadata
+                                     .recv_trailing_metadata);
+      }
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = pending->batch->on_complete;
+      closure->error = GRPC_ERROR_REF(error);
+      closure->reason = "on_complete for pending batch";
+      pending->batch->on_complete = nullptr;
+      maybe_clear_pending_batch(elem, pending);
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures, updating
+// *num_closures as needed.
+static void add_closures_to_fail_unstarted_pending_batches(
+    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+    grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: failing unstarted pending batch at index "
+                "%" PRIuPTR,
+                chand, calld, i);
+      }
+      if (pending->batch->recv_initial_metadata) {
+        closure_to_execute* closure = &closures[(*num_closures)++];
+        closure->closure = pending->batch->payload->recv_initial_metadata
+                               .recv_initial_metadata_ready;
+        closure->error = GRPC_ERROR_REF(error);
+        closure->reason =
+            "failing recv_initial_metadata_ready for pending batch";
+        pending->batch->payload->recv_initial_metadata
+            .recv_initial_metadata_ready = nullptr;
+      }
+      if (pending->batch->recv_message) {
+        *pending->batch->payload->recv_message.recv_message = nullptr;
+        closure_to_execute* closure = &closures[(*num_closures)++];
+        closure->closure =
+            pending->batch->payload->recv_message.recv_message_ready;
+        closure->error = GRPC_ERROR_REF(error);
+        closure->reason = "failing recv_message_ready for pending batch";
+        pending->batch->payload->recv_message.recv_message_ready = nullptr;
+      }
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = pending->batch->on_complete;
+      closure->error = GRPC_ERROR_REF(error);
+      closure->reason = "failing on_complete for pending batch";
+      pending->batch->on_complete = nullptr;
+      maybe_clear_pending_batch(elem, pending);
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
 // Callback used to intercept on_complete from subchannel calls.
 // Called only when retries are enabled.
 static void on_complete(void* arg, grpc_error* error) {
@@ -1999,48 +2035,135 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
+  // If we have previously completed recv_trailing_metadata, then the
+  // call is finished.
+  bool call_finished = retry_state->completed_recv_trailing_metadata;
+  // Record whether we were already committed before receiving this callback.
+  const bool previously_committed = calld->retry_committed;
   // Update bookkeeping in retry_state.
-  if (batch_data->batch.send_initial_metadata) {
-    retry_state->completed_send_initial_metadata = true;
+  update_retry_state_for_completed_batch(batch_data, retry_state);
+  if (call_finished) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
+              calld);
+    }
+  } else {
+    // Check if this batch finished the call, and if so, get its status.
+    // The call is finished if either (a) this callback was invoked with
+    // an error or (b) we receive status.
+    grpc_status_code status = GRPC_STATUS_OK;
+    grpc_mdelem* server_pushback_md = nullptr;
+    if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {  // Case (a).
+      call_finished = true;
+      grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
+                            nullptr);
+    } else if (batch_data->batch.recv_trailing_metadata) {  // Case (b).
+      call_finished = true;
+      grpc_metadata_batch* md_batch =
+          batch_data->batch.payload->recv_trailing_metadata
+              .recv_trailing_metadata;
+      GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+      status = grpc_get_status_code_from_metadata(
+          md_batch->idx.named.grpc_status->md);
+      if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+        server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+      }
+    }
+    // If the call just finished, check if we should retry.
+    if (call_finished) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+                calld, grpc_status_code_to_string(status));
+      }
+      if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+        // Unref batch_data for deferred recv_initial_metadata_ready or
+        // recv_message_ready callbacks, if any.
+        if (batch_data->batch.recv_trailing_metadata &&
+            retry_state->recv_initial_metadata_ready_deferred_batch !=
+                nullptr) {
+          batch_data_unref(batch_data);
+          GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+        }
+        if (batch_data->batch.recv_trailing_metadata &&
+            retry_state->recv_message_ready_deferred_batch != nullptr) {
+          batch_data_unref(batch_data);
+          GRPC_ERROR_UNREF(retry_state->recv_message_error);
+        }
+        // Track number of pending subchannel send batches and determine if
+        // this was the last one.
+        bool last_callback_complete = false;
+        if (batch_data->batch.send_initial_metadata ||
+            batch_data->batch.send_message ||
+            batch_data->batch.send_trailing_metadata) {
+          --calld->num_pending_retriable_subchannel_send_batches;
+          last_callback_complete =
+              calld->num_pending_retriable_subchannel_send_batches == 0;
+        }
+        batch_data_unref(batch_data);
+        // If we just completed the last subchannel send batch, unref the
+        // call stack.
+        if (last_callback_complete) {
+          GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+        }
+        return;
+      }
+      // Not retrying, so commit the call.
+      retry_commit(elem, retry_state);
+    }
   }
-  if (batch_data->batch.send_message) {
-    ++retry_state->completed_send_message_count;
-  }
-  if (batch_data->batch.send_trailing_metadata) {
-    retry_state->completed_send_trailing_metadata = true;
-  }
-  // If the call is committed, free cached data for send ops that we've just
-  // completed.
-  if (calld->retry_committed) {
+  // If we were already committed before receiving this callback, free
+  // cached data for send ops that we've just completed.  (If the call has
+  // just now finished, the call to retry_commit() above will have freed all
+  // cached send ops, so we don't need to do it here.)
+  if (previously_committed) {
     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
   }
+  // Call not being retried.
   // Construct list of closures to execute.
-  grpc_core::CallCombinerClosureList closures;
-  // If a retry was already dispatched, that means we saw
-  // recv_trailing_metadata before this, so we do nothing here.
-  // Otherwise, invoke the callback to return the result to the surface.
-  if (!retry_state->retry_dispatched) {
-    // Add closure for the completed pending batch, if any.
-    add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
-                                            GRPC_ERROR_REF(error), &closures);
-    // If needed, add a callback to start_retriable_subchannel_batches() to
-    // start any replay or pending send ops on the subchannel call.
-    if (!retry_state->completed_recv_trailing_metadata) {
-      add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
-                                                  &closures);
-    }
+  // Max number of closures is number of pending batches plus one for
+  // each of:
+  // - recv_initial_metadata_ready (either deferred or unstarted)
+  // - recv_message_ready (either deferred or unstarted)
+  // - starting a new batch for pending send ops
+  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
+  size_t num_closures = 0;
+  // If there are deferred recv_initial_metadata_ready or recv_message_ready
+  // callbacks, add them to closures.
+  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
+                                           &num_closures);
+  // Find pending batches whose ops are now complete and add their
+  // on_complete callbacks to closures.
+  add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
+                                             GRPC_ERROR_REF(error), closures,
+                                             &num_closures);
+  // Add closures to handle any pending batches that have not yet been started.
+  // If the call is finished, we fail these batches; otherwise, we add a
+  // callback to start_retriable_subchannel_batches() to start them on
+  // the subchannel call.
+  if (call_finished) {
+    add_closures_to_fail_unstarted_pending_batches(
+        elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
+  } else {
+    add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+                                                closures, &num_closures);
   }
   // Track number of pending subchannel send batches and determine if this
   // was the last one.
-  --calld->num_pending_retriable_subchannel_send_batches;
-  const bool last_callback_complete =
-      calld->num_pending_retriable_subchannel_send_batches == 0;
+  bool last_callback_complete = false;
+  if (batch_data->batch.send_initial_metadata ||
+      batch_data->batch.send_message ||
+      batch_data->batch.send_trailing_metadata) {
+    --calld->num_pending_retriable_subchannel_send_batches;
+    last_callback_complete =
+        calld->num_pending_retriable_subchannel_send_batches == 0;
+  }
   // Don't need batch_data anymore.
   batch_data_unref(batch_data);
   // Schedule all of the closures identified above.
   // Note: This yeilds the call combiner.
-  closures.RunClosures(calld->call_combiner);
-  // If this was the last subchannel send batch, unref the call stack.
+  execute_closures_in_call_combiner(elem, "on_complete", closures,
+                                    num_closures);
+  // If we just completed the last subchannel send batch, unref the call stack.
   if (last_callback_complete) {
     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
   }
@@ -2062,22 +2185,27 @@
 
 // Adds a closure to closures that will execute batch in the call combiner.
 static void add_closure_for_subchannel_batch(
-    grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
-    grpc_core::CallCombinerClosureList* closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
+    call_data* calld, grpc_transport_stream_op_batch* batch,
+    closure_to_execute* closures, size_t* num_closures) {
   batch->handler_private.extra_arg = calld->subchannel_call;
   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
                     start_batch_in_call_combiner, batch,
                     grpc_schedule_on_exec_ctx);
+  closure_to_execute* closure = &closures[(*num_closures)++];
+  closure->closure = &batch->handler_private.closure;
+  closure->error = GRPC_ERROR_NONE;
+  // If the tracer is enabled, we log a more detailed message, which
+  // requires dynamic allocation.  This will be freed in
+  // start_retriable_subchannel_batches().
   if (grpc_client_channel_trace.enabled()) {
     char* batch_str = grpc_transport_stream_op_batch_string(batch);
-    gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
-            calld, batch_str);
+    gpr_asprintf(const_cast<char**>(&closure->reason),
+                 "starting batch in call combiner: %s", batch_str);
     gpr_free(batch_str);
+    closure->free_reason = true;
+  } else {
+    closure->reason = "start_subchannel_batch";
   }
-  closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
-                "start_subchannel_batch");
 }
 
 // Adds retriable send_initial_metadata op to batch_data.
@@ -2213,13 +2341,9 @@
   grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
       &batch_data->recv_trailing_metadata;
-  batch_data->batch.payload->recv_trailing_metadata.collect_stats =
+  batch_data->batch.collect_stats = true;
+  batch_data->batch.payload->collect_stats.collect_stats =
       &batch_data->collect_stats;
-  GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
-                    recv_trailing_metadata_ready, batch_data,
-                    grpc_schedule_on_exec_ctx);
-  batch_data->batch.payload->recv_trailing_metadata
-      .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
 }
 
 // Helper function used to start a recv_trailing_metadata batch.  This
@@ -2240,11 +2364,9 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Create batch_data with 2 refs, since this batch will be unreffed twice:
-  // once for the recv_trailing_metadata_ready callback when the subchannel
-  // batch returns, and again when we actually get a recv_trailing_metadata
-  // op from the surface.
-  subchannel_batch_data* batch_data =
-      batch_data_create(elem, 2, false /* set_on_complete */);
+  // once when the subchannel batch returns, and again when we actually get
+  // a recv_trailing_metadata op from the surface.
+  subchannel_batch_data* batch_data = batch_data_create(elem, 2);
   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
   retry_state->recv_trailing_metadata_internal_batch = batch_data;
   // Note: This will release the call combiner.
@@ -2269,7 +2391,7 @@
               "send_initial_metadata op",
               chand, calld);
     }
-    replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
+    replay_batch_data = batch_data_create(elem, 1);
     add_retriable_send_initial_metadata_op(calld, retry_state,
                                            replay_batch_data);
   }
@@ -2286,8 +2408,7 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data =
-          batch_data_create(elem, 1, true /* set_on_complete */);
+      replay_batch_data = batch_data_create(elem, 1);
     }
     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
   }
@@ -2306,8 +2427,7 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data =
-          batch_data_create(elem, 1, true /* set_on_complete */);
+      replay_batch_data = batch_data_create(elem, 1);
     }
     add_retriable_send_trailing_metadata_op(calld, retry_state,
                                             replay_batch_data);
@@ -2319,7 +2439,7 @@
 // *num_batches as needed.
 static void add_subchannel_batches_for_pending_batches(
     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
+    closure_to_execute* closures, size_t* num_closures) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
@@ -2375,11 +2495,13 @@
         if (retry_state->completed_recv_trailing_metadata) {
           subchannel_batch_data* batch_data =
               retry_state->recv_trailing_metadata_internal_batch;
+          closure_to_execute* closure = &closures[(*num_closures)++];
+          closure->closure = &batch_data->on_complete;
           // Batches containing recv_trailing_metadata always succeed.
-          closures->Add(
-              &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
-              "re-executing recv_trailing_metadata_ready to propagate "
-              "internally triggered result");
+          closure->error = GRPC_ERROR_NONE;
+          closure->reason =
+              "re-executing on_complete for recv_trailing_metadata "
+              "to propagate internally triggered result";
         } else {
           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
         }
@@ -2391,19 +2513,14 @@
     if (calld->method_params == nullptr ||
         calld->method_params->retry_policy() == nullptr ||
         calld->retry_committed) {
-      add_closure_for_subchannel_batch(elem, batch, closures);
+      add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
       pending_batch_clear(calld, pending);
       continue;
     }
     // Create batch with the right number of callbacks.
-    const bool has_send_ops = batch->send_initial_metadata ||
-                              batch->send_message ||
-                              batch->send_trailing_metadata;
-    const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
-                              batch->recv_message +
-                              batch->recv_trailing_metadata;
-    subchannel_batch_data* batch_data = batch_data_create(
-        elem, num_callbacks, has_send_ops /* set_on_complete */);
+    const int num_callbacks =
+        1 + batch->recv_initial_metadata + batch->recv_message;
+    subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
     // Cache send ops if needed.
     maybe_cache_send_ops_for_batch(calld, pending);
     // send_initial_metadata.
@@ -2430,9 +2547,11 @@
     }
     // recv_trailing_metadata.
     if (batch->recv_trailing_metadata) {
+      GPR_ASSERT(batch->collect_stats);
       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
     }
-    add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
+    add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+                                     num_closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (batch->send_initial_metadata || batch->send_message ||
@@ -2460,13 +2579,15 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Construct list of closures to execute, one for each pending batch.
-  grpc_core::CallCombinerClosureList closures;
+  // We can start up to 6 batches.
+  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_closures = 0;
   // Replay previously-returned send_* ops if needed.
   subchannel_batch_data* replay_batch_data =
       maybe_create_subchannel_batch_for_replay(elem, retry_state);
   if (replay_batch_data != nullptr) {
-    add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
-                                     &closures);
+    add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+                                     &num_closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2475,16 +2596,17 @@
     ++calld->num_pending_retriable_subchannel_send_batches;
   }
   // Now add pending batches.
-  add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
+  add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+                                             &num_closures);
   // Start batches on subchannel call.
   if (grpc_client_channel_trace.enabled()) {
     gpr_log(GPR_INFO,
             "chand=%p calld=%p: starting %" PRIuPTR
             " retriable batches on subchannel_call=%p",
-            chand, calld, closures.size(), calld->subchannel_call);
+            chand, calld, num_closures, calld->subchannel_call);
   }
-  // Note: This will yield the call combiner.
-  closures.RunClosures(calld->call_combiner);
+  execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+                                    closures, num_closures);
 }
 
 //