Allocate retry payload fields with subchannel call instead of with each batch.
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 34ea97e..520431e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -794,6 +794,15 @@
   // The batch to use in the subchannel call.
   // Its payload field points to subchannel_call_retry_state.batch_payload.
   grpc_transport_stream_op_batch batch;
+  // For intercepting on_complete.
+  grpc_closure on_complete;
+} subchannel_batch_data;
+
+// Retry state associated with a subchannel call.
+// Stored in the parent_data of the subchannel call object.
+typedef struct {
+  // subchannel_batch_data.batch.payload points to this.
+  grpc_transport_stream_op_batch_payload batch_payload;
   // For send_initial_metadata.
   // Note that we need to make a copy of the initial metadata for each
   // subchannel call instead of just referring to the copy in call_data,
@@ -818,15 +827,6 @@
   grpc_metadata_batch recv_trailing_metadata;
   grpc_transport_stream_stats collect_stats;
   grpc_closure recv_trailing_metadata_ready;
-  // For intercepting on_complete.
-  grpc_closure on_complete;
-} subchannel_batch_data;
-
-// Retry state associated with a subchannel call.
-// Stored in the parent_data of the subchannel call object.
-typedef struct {
-  // subchannel_batch_data.batch.payload points to this.
-  grpc_transport_stream_op_batch_payload batch_payload;
   // These fields indicate which ops have been started and completed on
   // this subchannel call.
   size_t started_send_message_count;
@@ -1524,17 +1524,21 @@
 
 static void batch_data_unref(subchannel_batch_data* batch_data) {
   if (gpr_unref(&batch_data->refs)) {
-    if (batch_data->send_initial_metadata_storage != nullptr) {
-      grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
+    subchannel_call_retry_state* retry_state =
+        static_cast<subchannel_call_retry_state*>(
+            grpc_connected_subchannel_call_get_parent_data(
+                batch_data->subchannel_call));
+    if (batch_data->batch.send_initial_metadata) {
+      grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
     }
-    if (batch_data->send_trailing_metadata_storage != nullptr) {
-      grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
+    if (batch_data->batch.send_trailing_metadata) {
+      grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
     }
     if (batch_data->batch.recv_initial_metadata) {
-      grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
+      grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
     }
     if (batch_data->batch.recv_trailing_metadata) {
-      grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
+      grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
     }
     GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
     call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
@@ -1560,8 +1564,12 @@
       });
   GPR_ASSERT(pending != nullptr);
   // Return metadata.
+  subchannel_call_retry_state* retry_state =
+      static_cast<subchannel_call_retry_state*>(
+          grpc_connected_subchannel_call_get_parent_data(
+              batch_data->subchannel_call));
   grpc_metadata_batch_move(
-      &batch_data->recv_initial_metadata,
+      &retry_state->recv_initial_metadata,
       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
   // Update bookkeeping.
   // Note: Need to do this before invoking the callback, since invoking
@@ -1606,7 +1614,7 @@
   // the recv_trailing_metadata_ready callback, then defer propagating this
   // callback back to the surface.  We can evaluate whether to retry when
   // recv_trailing_metadata comes back.
-  if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
+  if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
                     error != GRPC_ERROR_NONE) &&
                    !retry_state->completed_recv_trailing_metadata)) {
     if (grpc_client_channel_trace.enabled()) {
@@ -1651,8 +1659,12 @@
       });
   GPR_ASSERT(pending != nullptr);
   // Return payload.
+  subchannel_call_retry_state* retry_state =
+      static_cast<subchannel_call_retry_state*>(
+          grpc_connected_subchannel_call_get_parent_data(
+              batch_data->subchannel_call));
   *pending->batch->payload->recv_message.recv_message =
-      std::move(batch_data->recv_message);
+      std::move(retry_state->recv_message);
   // Update bookkeeping.
   // Note: Need to do this before invoking the callback, since invoking
   // the callback will result in yielding the call combiner.
@@ -1693,7 +1705,7 @@
   // callback back to the surface.  We can evaluate whether to retry when
   // recv_trailing_metadata comes back.
   if (GPR_UNLIKELY(
-          (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
+          (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
           !retry_state->completed_recv_trailing_metadata)) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_INFO,
@@ -1766,8 +1778,12 @@
     return;
   }
   // Return metadata.
+  subchannel_call_retry_state* retry_state =
+      static_cast<subchannel_call_retry_state*>(
+          grpc_connected_subchannel_call_get_parent_data(
+              batch_data->subchannel_call));
   grpc_metadata_batch_move(
-      &batch_data->recv_trailing_metadata,
+      &retry_state->recv_trailing_metadata,
       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
   // Add closure.
   closures->Add(pending->batch->payload->recv_trailing_metadata
@@ -1788,11 +1804,11 @@
     // Add closure for deferred recv_initial_metadata_ready.
     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
                      nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+      GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
                         invoke_recv_initial_metadata_callback,
                         retry_state->recv_initial_metadata_ready_deferred_batch,
                         grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_initial_metadata_ready,
+      closures->Add(&retry_state->recv_initial_metadata_ready,
                     retry_state->recv_initial_metadata_error,
                     "resuming recv_initial_metadata_ready");
       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
@@ -1800,11 +1816,11 @@
     // Add closure for deferred recv_message_ready.
     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
                      nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+      GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
                         invoke_recv_message_callback,
                         retry_state->recv_message_ready_deferred_batch,
                         grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_message_ready,
+      closures->Add(&retry_state->recv_message_ready,
                     retry_state->recv_message_error,
                     "resuming recv_message_ready");
       retry_state->recv_message_ready_deferred_batch = nullptr;
@@ -2120,28 +2136,28 @@
   //
   // If we've already completed one or more attempts, add the
   // grpc-retry-attempts header.
-  batch_data->send_initial_metadata_storage =
+  retry_state->send_initial_metadata_storage =
       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
           calld->arena, sizeof(grpc_linked_mdelem) *
                             (calld->send_initial_metadata.list.count +
                              (calld->num_attempts_completed > 0))));
   grpc_metadata_batch_copy(&calld->send_initial_metadata,
-                           &batch_data->send_initial_metadata,
-                           batch_data->send_initial_metadata_storage);
-  if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named
+                           &retry_state->send_initial_metadata,
+                           retry_state->send_initial_metadata_storage);
+  if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
                        .grpc_previous_rpc_attempts != nullptr)) {
-    grpc_metadata_batch_remove(
-        &batch_data->send_initial_metadata,
-        batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
+    grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
+                               retry_state->send_initial_metadata.idx.named
+                                   .grpc_previous_rpc_attempts);
   }
   if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
     grpc_mdelem retry_md = grpc_mdelem_from_slices(
         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
         *retry_count_strings[calld->num_attempts_completed - 1]);
     grpc_error* error = grpc_metadata_batch_add_tail(
-        &batch_data->send_initial_metadata,
-        &batch_data->send_initial_metadata_storage[calld->send_initial_metadata
-                                                       .list.count],
+        &retry_state->send_initial_metadata,
+        &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
+                                                        .list.count],
         retry_md);
     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
@@ -2152,7 +2168,7 @@
   retry_state->started_send_initial_metadata = true;
   batch_data->batch.send_initial_metadata = true;
   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
-      &batch_data->send_initial_metadata;
+      &retry_state->send_initial_metadata;
   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
       calld->send_initial_metadata_flags;
   batch_data->batch.payload->send_initial_metadata.peer_string =
@@ -2173,10 +2189,10 @@
   grpc_core::ByteStreamCache* cache =
       (*calld->send_messages)[retry_state->started_send_message_count];
   ++retry_state->started_send_message_count;
-  batch_data->send_message.Init(cache);
+  retry_state->send_message.Init(cache);
   batch_data->batch.send_message = true;
   batch_data->batch.payload->send_message.send_message.reset(
-      batch_data->send_message.get());
+      retry_state->send_message.get());
 }
 
 // Adds retriable send_trailing_metadata op to batch_data.
@@ -2186,17 +2202,17 @@
   // We need to make a copy of the metadata batch for each attempt, since
   // the filters in the subchannel stack may modify this batch, and we don't
   // want those modifications to be passed forward to subsequent attempts.
-  batch_data->send_trailing_metadata_storage =
+  retry_state->send_trailing_metadata_storage =
       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
           calld->arena, sizeof(grpc_linked_mdelem) *
                             calld->send_trailing_metadata.list.count));
   grpc_metadata_batch_copy(&calld->send_trailing_metadata,
-                           &batch_data->send_trailing_metadata,
-                           batch_data->send_trailing_metadata_storage);
+                           &retry_state->send_trailing_metadata,
+                           retry_state->send_trailing_metadata_storage);
   retry_state->started_send_trailing_metadata = true;
   batch_data->batch.send_trailing_metadata = true;
   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
-      &batch_data->send_trailing_metadata;
+      &retry_state->send_trailing_metadata;
 }
 
 // Adds retriable recv_initial_metadata op to batch_data.
@@ -2205,16 +2221,16 @@
     subchannel_batch_data* batch_data) {
   retry_state->started_recv_initial_metadata = true;
   batch_data->batch.recv_initial_metadata = true;
-  grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
+  grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
-      &batch_data->recv_initial_metadata;
+      &retry_state->recv_initial_metadata;
   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
-      &batch_data->trailing_metadata_available;
-  GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+      &retry_state->trailing_metadata_available;
+  GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
                     recv_initial_metadata_ready, batch_data,
                     grpc_schedule_on_exec_ctx);
   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
-      &batch_data->recv_initial_metadata_ready;
+      &retry_state->recv_initial_metadata_ready;
 }
 
 // Adds retriable recv_message op to batch_data.
@@ -2224,11 +2240,11 @@
   ++retry_state->started_recv_message_count;
   batch_data->batch.recv_message = true;
   batch_data->batch.payload->recv_message.recv_message =
-      &batch_data->recv_message;
-  GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
+      &retry_state->recv_message;
+  GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
                     batch_data, grpc_schedule_on_exec_ctx);
   batch_data->batch.payload->recv_message.recv_message_ready =
-      &batch_data->recv_message_ready;
+      &retry_state->recv_message_ready;
 }
 
 // Adds retriable recv_trailing_metadata op to batch_data.
@@ -2237,16 +2253,17 @@
     subchannel_batch_data* batch_data) {
   retry_state->started_recv_trailing_metadata = true;
   batch_data->batch.recv_trailing_metadata = true;
-  grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
+  grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
-      &batch_data->recv_trailing_metadata;
+      &retry_state->recv_trailing_metadata;
   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
-      &batch_data->collect_stats;
-  GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+      &retry_state->collect_stats;
+  GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
                     recv_trailing_metadata_ready, batch_data,
                     grpc_schedule_on_exec_ctx);
   batch_data->batch.payload->recv_trailing_metadata
-      .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
+      .recv_trailing_metadata_ready =
+      &retry_state->recv_trailing_metadata_ready;
 }
 
 // Helper function used to start a recv_trailing_metadata batch.  This
@@ -2400,11 +2417,9 @@
         // started subchannel batch, since we'll propagate the
         // completion when it completes.
         if (retry_state->completed_recv_trailing_metadata) {
-          subchannel_batch_data* batch_data =
-              retry_state->recv_trailing_metadata_internal_batch;
           // Batches containing recv_trailing_metadata always succeed.
           closures->Add(
-              &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+              &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
               "re-executing recv_trailing_metadata_ready to propagate "
               "internally triggered result");
         } else {