Merge pull request #13971 from apolcyn/upmerge_v18x

Upmerge v1.8.x into master
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 3f3334d..e99022a 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -553,7 +553,6 @@
       }
       grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
                                        chand->interested_parties);
-      grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy);
       GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
     }
     chand->lb_policy = new_lb_policy;
@@ -659,7 +658,6 @@
       if (chand->lb_policy != nullptr) {
         grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
                                          chand->interested_parties);
-        grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
         GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
         chand->lb_policy = nullptr;
       }
@@ -794,7 +792,6 @@
   if (chand->lb_policy != nullptr) {
     grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
                                      chand->interested_parties);
-    grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
     GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
   }
   gpr_free(chand->info_lb_policy_name);
@@ -855,10 +852,12 @@
   grpc_subchannel_call* subchannel_call;
   grpc_error* error;
 
-  grpc_lb_policy_pick_state pick;
+  grpc_lb_policy* lb_policy;  // Holds ref while LB pick is pending.
   grpc_closure lb_pick_closure;
   grpc_closure lb_pick_cancel_closure;
 
+  grpc_connected_subchannel* connected_subchannel;
+  grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
   grpc_polling_entity* pollent;
 
   grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES];
@@ -867,6 +866,8 @@
 
   grpc_transport_stream_op_batch* initial_metadata_batch;
 
+  grpc_linked_mdelem lb_token_mdelem;
+
   grpc_closure on_complete;
   grpc_closure* original_on_complete;
 } call_data;
@@ -1004,16 +1005,16 @@
   channel_data* chand = (channel_data*)elem->channel_data;
   call_data* calld = (call_data*)elem->call_data;
   const grpc_connected_subchannel_call_args call_args = {
-      calld->pollent,                       // pollent
-      calld->path,                          // path
-      calld->call_start_time,               // start_time
-      calld->deadline,                      // deadline
-      calld->arena,                         // arena
-      calld->pick.subchannel_call_context,  // context
-      calld->call_combiner                  // call_combiner
+      calld->pollent,                  // pollent
+      calld->path,                     // path
+      calld->call_start_time,          // start_time
+      calld->deadline,                 // deadline
+      calld->arena,                    // arena
+      calld->subchannel_call_context,  // context
+      calld->call_combiner             // call_combiner
   };
   grpc_error* new_error = grpc_connected_subchannel_create_call(
-      calld->pick.connected_subchannel, &call_args, &calld->subchannel_call);
+      calld->connected_subchannel, &call_args, &calld->subchannel_call);
   if (grpc_client_channel_trace.enabled()) {
     gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
             chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@@ -1031,7 +1032,7 @@
 static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
   call_data* calld = (call_data*)elem->call_data;
   channel_data* chand = (channel_data*)elem->channel_data;
-  if (calld->pick.connected_subchannel == nullptr) {
+  if (calld->connected_subchannel == nullptr) {
     // Failed to create subchannel.
     GRPC_ERROR_UNREF(calld->error);
     calld->error = error == GRPC_ERROR_NONE
@@ -1070,16 +1071,13 @@
   grpc_call_element* elem = (grpc_call_element*)arg;
   channel_data* chand = (channel_data*)elem->channel_data;
   call_data* calld = (call_data*)elem->call_data;
-  // Note: chand->lb_policy may have changed since we started our pick,
-  // in which case we will be cancelling the pick on a policy other than
-  // the one we started it on.  However, this will just be a no-op.
-  if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
+  if (calld->lb_policy != nullptr) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
-              chand, calld, chand->lb_policy);
+              chand, calld, calld->lb_policy);
     }
-    grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick,
-                                      GRPC_ERROR_REF(error));
+    grpc_lb_policy_cancel_pick_locked(
+        calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error));
   }
   GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
 }
@@ -1094,6 +1092,9 @@
     gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
             chand, calld);
   }
+  GPR_ASSERT(calld->lb_policy != nullptr);
+  GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
+  calld->lb_policy = nullptr;
   async_pick_done_locked(elem, GRPC_ERROR_REF(error));
 }
 
@@ -1127,21 +1128,26 @@
       initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
     }
   }
-  calld->pick.initial_metadata =
+  const grpc_lb_policy_pick_args inputs = {
       calld->initial_metadata_batch->payload->send_initial_metadata
-          .send_initial_metadata;
-  calld->pick.initial_metadata_flags = initial_metadata_flags;
+          .send_initial_metadata,
+      initial_metadata_flags, &calld->lb_token_mdelem};
+  // Keep a ref to the LB policy in calld while the pick is pending.
+  GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
+  calld->lb_policy = chand->lb_policy;
   GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
                     grpc_combiner_scheduler(chand->combiner));
-  calld->pick.on_complete = &calld->lb_pick_closure;
-  const bool pick_done =
-      grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
+  const bool pick_done = grpc_lb_policy_pick_locked(
+      chand->lb_policy, &inputs, &calld->connected_subchannel,
+      calld->subchannel_call_context, nullptr, &calld->lb_pick_closure);
   if (pick_done) {
     /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
               chand, calld);
     }
+    GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
+    calld->lb_policy = nullptr;
   } else {
     GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
     grpc_call_combiner_set_notify_on_cancel(
@@ -1283,7 +1289,7 @@
   grpc_call_element* elem = (grpc_call_element*)arg;
   call_data* calld = (call_data*)elem->call_data;
   channel_data* chand = (channel_data*)elem->channel_data;
-  GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
+  GPR_ASSERT(calld->connected_subchannel == nullptr);
   if (chand->lb_policy != nullptr) {
     // We already have an LB policy, so ask it for a pick.
     if (pick_callback_start_locked(elem)) {
@@ -1461,14 +1467,15 @@
     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
                                "client_channel_destroy_call");
   }
+  GPR_ASSERT(calld->lb_policy == nullptr);
   GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
-  if (calld->pick.connected_subchannel != nullptr) {
-    GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked");
+  if (calld->connected_subchannel != nullptr) {
+    GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked");
   }
   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
-    if (calld->pick.subchannel_call_context[i].value != nullptr) {
-      calld->pick.subchannel_call_context[i].destroy(
-          calld->pick.subchannel_call_context[i].value);
+    if (calld->subchannel_call_context[i].value != nullptr) {
+      calld->subchannel_call_context[i].destroy(
+          calld->subchannel_call_context[i].value);
     }
   }
   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index cc4fe7e..7a5a8de 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -19,6 +19,8 @@
 #include "src/core/ext/filters/client_channel/lb_policy.h"
 #include "src/core/lib/iomgr/combiner.h"
 
+#define WEAK_REF_BITS 16
+
 grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
     false, "lb_policy_refcount");
 
@@ -26,60 +28,91 @@
                          const grpc_lb_policy_vtable* vtable,
                          grpc_combiner* combiner) {
   policy->vtable = vtable;
-  gpr_ref_init(&policy->refs, 1);
+  gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
   policy->interested_parties = grpc_pollset_set_create();
   policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
 }
 
 #ifndef NDEBUG
-void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line,
-                        const char* reason) {
-  if (grpc_trace_lb_policy_refcount.enabled()) {
-    gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
-    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-            "LB_POLICY:%p   ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
-            old_refs, old_refs + 1, reason);
-  }
+#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
+#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char* purpose
+#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
+#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose
 #else
-void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) {
+#define REF_FUNC_EXTRA_ARGS
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_FUNC_PASS_ARGS(new_reason)
+#define REF_MUTATE_PASS_ARGS(x)
 #endif
-  gpr_ref(&lb_policy->refs);
+
+static gpr_atm ref_mutate(grpc_lb_policy* c, gpr_atm delta,
+                          int barrier REF_MUTATE_EXTRA_ARGS) {
+  gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+                            : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
+#ifndef NDEBUG
+  if (grpc_trace_lb_policy_refcount.enabled()) {
+    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+            "LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
+            purpose, old_val, old_val + delta, reason);
+  }
+#endif
+  return old_val;
 }
 
-#ifndef NDEBUG
-void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line,
-                          const char* reason) {
-  if (grpc_trace_lb_policy_refcount.enabled()) {
-    gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
-    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-            "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
-            old_refs, old_refs - 1, reason);
+void grpc_lb_policy_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
+  ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
+}
+
+static void shutdown_locked(void* arg, grpc_error* error) {
+  grpc_lb_policy* policy = (grpc_lb_policy*)arg;
+  policy->vtable->shutdown_locked(policy);
+  GRPC_LB_POLICY_WEAK_UNREF(policy, "strong-unref");
+}
+
+void grpc_lb_policy_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
+  gpr_atm old_val =
+      ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
+                 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
+  gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
+  gpr_atm check = 1 << WEAK_REF_BITS;
+  if ((old_val & mask) == check) {
+    GRPC_CLOSURE_SCHED(
+        GRPC_CLOSURE_CREATE(shutdown_locked, policy,
+                            grpc_combiner_scheduler(policy->combiner)),
+        GRPC_ERROR_NONE);
+  } else {
+    grpc_lb_policy_weak_unref(policy REF_FUNC_PASS_ARGS("strong-unref"));
   }
-#else
-void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) {
-#endif
-  if (gpr_unref(&lb_policy->refs)) {
-    grpc_pollset_set_destroy(lb_policy->interested_parties);
-    grpc_combiner* combiner = lb_policy->combiner;
-    lb_policy->vtable->destroy(lb_policy);
+}
+
+void grpc_lb_policy_weak_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
+  ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
+}
+
+void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
+  gpr_atm old_val =
+      ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
+  if (old_val == 1) {
+    grpc_pollset_set_destroy(policy->interested_parties);
+    grpc_combiner* combiner = policy->combiner;
+    policy->vtable->destroy(policy);
     GRPC_COMBINER_UNREF(combiner, "lb_policy");
   }
 }
 
-void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
-                                    grpc_lb_policy* new_policy) {
-  policy->vtable->shutdown_locked(policy, new_policy);
-}
-
 int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
-                               grpc_lb_policy_pick_state* pick) {
-  return policy->vtable->pick_locked(policy, pick);
+                               const grpc_lb_policy_pick_args* pick_args,
+                               grpc_connected_subchannel** target,
+                               grpc_call_context_element* context,
+                               void** user_data, grpc_closure* on_complete) {
+  return policy->vtable->pick_locked(policy, pick_args, target, context,
+                                     user_data, on_complete);
 }
 
 void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
-                                       grpc_lb_policy_pick_state* pick,
+                                       grpc_connected_subchannel** target,
                                        grpc_error* error) {
-  policy->vtable->cancel_pick_locked(policy, pick, error);
+  policy->vtable->cancel_pick_locked(policy, target, error);
 }
 
 void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 1176a05..3572c97 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -33,7 +33,7 @@
 
 struct grpc_lb_policy {
   const grpc_lb_policy_vtable* vtable;
-  gpr_refcount refs;
+  gpr_atm ref_pair;
   /* owned pointer to interested parties in load balancing decisions */
   grpc_pollset_set* interested_parties;
   /* combiner under which lb_policy actions take place */
@@ -42,42 +42,32 @@
   grpc_closure* request_reresolution;
 };
 
-/// State used for an LB pick.
-typedef struct grpc_lb_policy_pick_state {
-  /// Initial metadata associated with the picking call.
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+  /** Initial metadata associated with the picking call. */
   grpc_metadata_batch* initial_metadata;
-  /// Bitmask used for selective cancelling. See \a
-  /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
-  /// grpc_types.h.
+  /** Bitmask used for selective cancelling. See \a
+   * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
+   * grpc_types.h */
   uint32_t initial_metadata_flags;
-  /// Storage for LB token in \a initial_metadata, or NULL if not used.
-  grpc_linked_mdelem lb_token_mdelem_storage;
-  /// Closure to run when pick is complete, if not completed synchronously.
-  grpc_closure* on_complete;
-  /// Will be set to the selected subchannel, or NULL on failure or when
-  /// the LB policy decides to drop the call.
-  grpc_connected_subchannel* connected_subchannel;
-  /// Will be populated with context to pass to the subchannel call, if needed.
-  grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
-  /// Upon success, \a *user_data will be set to whatever opaque information
-  /// may need to be propagated from the LB policy, or NULL if not needed.
-  void** user_data;
-  /// Next pointer.  For internal use by LB policy.
-  struct grpc_lb_policy_pick_state* next;
-} grpc_lb_policy_pick_state;
+  /** Storage for LB token in \a initial_metadata, or NULL if not used */
+  grpc_linked_mdelem* lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
 
 struct grpc_lb_policy_vtable {
   void (*destroy)(grpc_lb_policy* policy);
-
-  /// \see grpc_lb_policy_shutdown_locked().
-  void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy);
+  void (*shutdown_locked)(grpc_lb_policy* policy);
 
   /** \see grpc_lb_policy_pick */
-  int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick);
+  int (*pick_locked)(grpc_lb_policy* policy,
+                     const grpc_lb_policy_pick_args* pick_args,
+                     grpc_connected_subchannel** target,
+                     grpc_call_context_element* context, void** user_data,
+                     grpc_closure* on_complete);
 
   /** \see grpc_lb_policy_cancel_pick */
   void (*cancel_pick_locked)(grpc_lb_policy* policy,
-                             grpc_lb_policy_pick_state* pick,
+                             grpc_connected_subchannel** target,
                              grpc_error* error);
 
   /** \see grpc_lb_policy_cancel_picks */
@@ -113,19 +103,37 @@
 };
 
 #ifndef NDEBUG
+
+/* Strong references: the policy will shutdown when they reach zero */
 #define GRPC_LB_POLICY_REF(p, r) \
   grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
 #define GRPC_LB_POLICY_UNREF(p, r) \
   grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
+
+/* Weak references: they don't prevent the shutdown of the LB policy. When no
+ * strong references are left but there are still weak ones, shutdown is called.
+ * Once the weak reference also reaches zero, the LB policy is destroyed. */
+#define GRPC_LB_POLICY_WEAK_REF(p, r) \
+  grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_WEAK_UNREF(p, r) \
+  grpc_lb_policy_weak_unref((p), __FILE__, __LINE__, (r))
 void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line,
                         const char* reason);
 void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line,
                           const char* reason);
-#else  // !NDEBUG
+void grpc_lb_policy_weak_ref(grpc_lb_policy* policy, const char* file, int line,
+                             const char* reason);
+void grpc_lb_policy_weak_unref(grpc_lb_policy* policy, const char* file,
+                               int line, const char* reason);
+#else
 #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
 #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
+#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
+#define GRPC_LB_POLICY_WEAK_UNREF(p, r) grpc_lb_policy_weak_unref((p))
 void grpc_lb_policy_ref(grpc_lb_policy* policy);
 void grpc_lb_policy_unref(grpc_lb_policy* policy);
+void grpc_lb_policy_weak_ref(grpc_lb_policy* policy);
+void grpc_lb_policy_weak_unref(grpc_lb_policy* policy);
 #endif
 
 /** called by concrete implementations to initialize the base struct */
@@ -133,24 +141,28 @@
                          const grpc_lb_policy_vtable* vtable,
                          grpc_combiner* combiner);
 
-/// Shuts down \a policy.
-/// If \a new_policy is non-null, any pending picks will be restarted
-/// on that policy; otherwise, they will be failed.
-void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
-                                    grpc_lb_policy* new_policy);
+/** Finds an appropriate subchannel for a call, based on \a pick_args.
 
-/** Finds an appropriate subchannel for a call, based on data in \a pick.
-    \a pick must remain alive until the pick is complete.
+    \a target will be set to the selected subchannel, or NULL on failure
+    or when the LB policy decides to drop the call.
+
+    Upon success, \a user_data will be set to whatever opaque information
+    may need to be propagated from the LB policy, or NULL if not needed.
+    \a context will be populated with context to pass to the subchannel
+    call, if needed.
 
     If the pick succeeds and a result is known immediately, a non-zero
-    value will be returned.  Otherwise, \a pick->on_complete will be invoked
+    value will be returned.  Otherwise, \a on_complete will be invoked
     once the pick is complete with its error argument set to indicate
     success or failure.
 
     Any IO should be done under the \a interested_parties \a grpc_pollset_set
     in the \a grpc_lb_policy struct. */
 int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
-                               grpc_lb_policy_pick_state* pick);
+                               const grpc_lb_policy_pick_args* pick_args,
+                               grpc_connected_subchannel** target,
+                               grpc_call_context_element* context,
+                               void** user_data, grpc_closure* on_complete);
 
 /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
     against one of the connected subchannels managed by \a policy. */
@@ -158,11 +170,11 @@
                                     grpc_closure* on_initiate,
                                     grpc_closure* on_ack);
 
-/** Cancel picks for \a pick.
+/** Cancel picks for \a target.
     The \a on_complete callback of the pending picks will be invoked with \a
     *target set to NULL. */
 void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
-                                       grpc_lb_policy_pick_state* pick,
+                                       grpc_connected_subchannel** target,
                                        grpc_error* error);
 
 /** Cancel all pending picks for which their \a initial_metadata_flags (as given
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 5849ac9..1317cdc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -54,7 +54,7 @@
  *    operations in progress over the old RR instance. This is done by
  *    decreasing the reference count on the old policy. The moment no more
  *    references are held on the old RR policy, it'll be destroyed and \a
- *    on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ *    glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
  *    state. At this point we can transition to a new RR instance safely, which
  *    is done once again via \a rr_handover_locked().
  *
@@ -128,48 +128,187 @@
 
 grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
 
-struct glb_lb_policy;
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static grpc_error* initial_metadata_add_lb_token(
+    grpc_metadata_batch* initial_metadata,
+    grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
+  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+                                      lb_token);
+}
+
+static void destroy_client_stats(void* arg) {
+  grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
+}
+
+typedef struct wrapped_rr_closure_arg {
+  /* the closure instance using this struct as argument */
+  grpc_closure wrapper_closure;
+
+  /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+   * calls against the internal RR instance, respectively. */
+  grpc_closure* wrapped_closure;
+
+  /* the pick's initial metadata, kept in order to append the LB token for the
+   * pick */
+  grpc_metadata_batch* initial_metadata;
+
+  /* the picked target, used to determine which LB token to add to the pick's
+   * initial metadata */
+  grpc_connected_subchannel** target;
+
+  /* the context to be populated for the subchannel call */
+  grpc_call_context_element* context;
+
+  /* Stats for client-side load reporting. Note that this holds a
+   * reference, which must be either passed on via context or unreffed. */
+  grpc_grpclb_client_stats* client_stats;
+
+  /* the LB token associated with the pick */
+  grpc_mdelem lb_token;
+
+  /* storage for the lb token initial metadata mdelem */
+  grpc_linked_mdelem* lb_token_mdelem_storage;
+
+  /* The RR instance related to the closure */
+  grpc_lb_policy* rr_policy;
+
+  /* The grpclb instance that created the wrapping. This instance is not owned,
+   * reference counts are untouched. It's used only for logging purposes. */
+  grpc_lb_policy* glb_policy;
+
+  /* heap memory to be freed upon closure execution. */
+  void* free_when_done;
+} wrapped_rr_closure_arg;
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+static void wrapped_rr_closure(void* arg, grpc_error* error) {
+  wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
+
+  GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
+  GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+
+  if (wc_arg->rr_policy != nullptr) {
+    /* if *target is nullptr, no pick has been made by the RR policy (eg, all
+     * addresses failed to connect). There won't be any user_data/token
+     * available */
+    if (*wc_arg->target != nullptr) {
+      if (!GRPC_MDISNULL(wc_arg->lb_token)) {
+        initial_metadata_add_lb_token(wc_arg->initial_metadata,
+                                      wc_arg->lb_token_mdelem_storage,
+                                      GRPC_MDELEM_REF(wc_arg->lb_token));
+      } else {
+        gpr_log(
+            GPR_ERROR,
+            "[grpclb %p] No LB token for connected subchannel pick %p (from RR "
+            "instance %p).",
+            wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
+        abort();
+      }
+      // Pass on client stats via context. Passes ownership of the reference.
+      GPR_ASSERT(wc_arg->client_stats != nullptr);
+      wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
+      wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+    } else {
+      grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+    }
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
+              wc_arg->rr_policy);
+    }
+    GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
+  }
+  GPR_ASSERT(wc_arg->free_when_done != nullptr);
+  gpr_free(wc_arg->free_when_done);
+}
 
 namespace {
-
-/// Linked list of pending pick requests. It stores all information needed to
-/// eventually call (Round Robin's) pick() on them. They mainly stay pending
-/// waiting for the RR policy to be created.
-///
-/// Note that when a pick is sent to the RR policy, we inject our own
-/// on_complete callback, so that we can intercept the result before
-/// invoking the original on_complete callback.  This allows us to set the
-/// LB token metadata and add client_stats to the call context.
-/// See \a pending_pick_complete() for details.
+/* Linked list of pending pick requests. It stores all information needed to
+ * eventually call (Round Robin's) pick() on them. They mainly stay pending
+ * waiting for the RR policy to be created/updated.
+ *
+ * One particularity is the wrapping of the user-provided \a on_complete closure
+ * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
+ * order to correctly unref the RR policy instance upon completion of the pick.
+ * See \a wrapped_rr_closure for details. */
 struct pending_pick {
-  // Our on_complete closure and the original one.
-  grpc_closure on_complete;
-  grpc_closure* original_on_complete;
-  // The original pick.
-  grpc_lb_policy_pick_state* pick;
-  // Stats for client-side load reporting. Note that this holds a
-  // reference, which must be either passed on via context or unreffed.
-  grpc_grpclb_client_stats* client_stats;
-  // The LB token associated with the pick.  This is set via user_data in
-  // the pick.
-  grpc_mdelem lb_token;
-  // The grpclb instance that created the wrapping. This instance is not owned,
-  // reference counts are untouched. It's used only for logging purposes.
-  glb_lb_policy* glb_policy;
-  // Next pending pick.
   struct pending_pick* next;
-};
 
-/// A linked list of pending pings waiting for the RR policy to be created.
-struct pending_ping {
-  grpc_closure* on_initiate;
-  grpc_closure* on_ack;
-  struct pending_ping* next;
-};
+  /* original pick()'s arguments */
+  grpc_lb_policy_pick_args pick_args;
 
+  /* output argument where to store the pick()ed connected subchannel, or
+   * nullptr upon error. */
+  grpc_connected_subchannel** target;
+
+  /* args for wrapped_on_complete */
+  wrapped_rr_closure_arg wrapped_on_complete_arg;
+};
 }  // namespace
 
-struct glb_lb_policy {
+static void add_pending_pick(pending_pick** root,
+                             const grpc_lb_policy_pick_args* pick_args,
+                             grpc_connected_subchannel** target,
+                             grpc_call_context_element* context,
+                             grpc_closure* on_complete) {
+  pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
+  pp->next = *root;
+  pp->pick_args = *pick_args;
+  pp->target = target;
+  pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
+  pp->wrapped_on_complete_arg.target = target;
+  pp->wrapped_on_complete_arg.context = context;
+  pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
+  pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
+      pick_args->lb_token_mdelem_storage;
+  pp->wrapped_on_complete_arg.free_when_done = pp;
+  GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
+                    wrapped_rr_closure, &pp->wrapped_on_complete_arg,
+                    grpc_schedule_on_exec_ctx);
+  *root = pp;
+}
+
+/* Same as the \a pending_pick struct but for ping operations */
+typedef struct pending_ping {
+  struct pending_ping* next;
+
+  /* args for sending the ping */
+  wrapped_rr_closure_arg* on_initiate;
+  wrapped_rr_closure_arg* on_ack;
+} pending_ping;
+
+static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
+                             grpc_closure* on_ack) {
+  pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
+  if (on_initiate != nullptr) {
+    pping->on_initiate =
+        (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
+    pping->on_initiate->wrapped_closure = on_initiate;
+    pping->on_initiate->free_when_done = pping->on_initiate;
+    GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
+                      &pping->on_initiate, grpc_schedule_on_exec_ctx);
+  }
+  if (on_ack != nullptr) {
+    pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
+    pping->on_ack->wrapped_closure = on_ack;
+    pping->on_ack->free_when_done = pping->on_ack;
+    GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
+                      &pping->on_ack, grpc_schedule_on_exec_ctx);
+  }
+  pping->next = *root;
+  *root = pping;
+}
+
+/*
+ * glb_lb_policy
+ */
+typedef struct rr_connectivity_data rr_connectivity_data;
+
+typedef struct glb_lb_policy {
   /** base policy: must be first */
   grpc_lb_policy base;
 
@@ -194,9 +333,6 @@
   /** the RR policy to use of the backend servers returned by the LB server */
   grpc_lb_policy* rr_policy;
 
-  grpc_closure on_rr_connectivity_changed;
-  grpc_connectivity_state rr_connectivity_state;
-
   bool started_picking;
 
   /** our connectivity state tracker */
@@ -301,85 +437,15 @@
   grpc_closure client_load_report_closure;
   /* Client load report message payload. */
   grpc_byte_buffer* client_load_report_payload;
+} glb_lb_policy;
+
+/* Keeps track and reacts to changes in connectivity of the RR instance */
+struct rr_connectivity_data {
+  grpc_closure on_change;
+  grpc_connectivity_state state;
+  glb_lb_policy* glb_policy;
 };
 
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static grpc_error* initial_metadata_add_lb_token(
-    grpc_metadata_batch* initial_metadata,
-    grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
-  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
-  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
-  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
-                                      lb_token);
-}
-
-static void destroy_client_stats(void* arg) {
-  grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
-}
-
-static void pending_pick_set_metadata_and_context(pending_pick* pp) {
-  /* if connected_subchannel is nullptr, no pick has been made by the RR
-   * policy (e.g., all addresses failed to connect). There won't be any
-   * user_data/token available */
-  if (pp->pick->connected_subchannel != nullptr) {
-    if (!GRPC_MDISNULL(pp->lb_token)) {
-      initial_metadata_add_lb_token(pp->pick->initial_metadata,
-                                    &pp->pick->lb_token_mdelem_storage,
-                                    GRPC_MDELEM_REF(pp->lb_token));
-    } else {
-      gpr_log(GPR_ERROR,
-              "[grpclb %p] No LB token for connected subchannel pick %p",
-              pp->glb_policy, pp->pick);
-      abort();
-    }
-    // Pass on client stats via context. Passes ownership of the reference.
-    GPR_ASSERT(pp->client_stats != nullptr);
-    pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
-        pp->client_stats;
-    pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
-        destroy_client_stats;
-  } else {
-    grpc_grpclb_client_stats_unref(pp->client_stats);
-  }
-}
-
-/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
-static void pending_pick_complete(void* arg, grpc_error* error) {
-  pending_pick* pp = (pending_pick*)arg;
-  pending_pick_set_metadata_and_context(pp);
-  GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
-  gpr_free(pp);
-}
-
-static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
-                                         grpc_lb_policy_pick_state* pick) {
-  pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
-  pp->pick = pick;
-  pp->glb_policy = glb_policy;
-  GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
-                    grpc_schedule_on_exec_ctx);
-  pp->original_on_complete = pick->on_complete;
-  pp->pick->on_complete = &pp->on_complete;
-  return pp;
-}
-
-static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
-  new_pp->next = *root;
-  *root = new_pp;
-}
-
-static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
-                             grpc_closure* on_ack) {
-  pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
-  pping->on_initiate = on_initiate;
-  pping->on_ack = on_ack;
-  pping->next = *root;
-  *root = pping;
-}
-
 static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
                             bool log) {
   if (server->drop) return false;
@@ -491,6 +557,7 @@
       gpr_free(uri);
       user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
     }
+
     grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
                                   false /* is_balancer */,
                                   nullptr /* balancer_name */, user_data);
@@ -531,6 +598,7 @@
     grpc_error* rr_state_error) {
   const grpc_connectivity_state curr_glb_state =
       grpc_connectivity_state_check(&glb_policy->state_tracker);
+
   /* The new connectivity status is a function of the previous one and the new
    * input coming from the status of the RR policy.
    *
@@ -560,6 +628,7 @@
    *
    *  (*) This function mustn't be called during shutting down. */
   GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
+
   switch (rr_state) {
     case GRPC_CHANNEL_TRANSIENT_FAILURE:
     case GRPC_CHANNEL_SHUTDOWN:
@@ -570,6 +639,7 @@
     case GRPC_CHANNEL_READY:
       GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
   }
+
   if (grpc_lb_glb_trace.enabled()) {
     gpr_log(
         GPR_INFO,
@@ -587,8 +657,10 @@
  * cleanups this callback would otherwise be responsible for.
  * If \a force_async is true, then we will manually schedule the
  * completion callback even if the pick is available immediately. */
-static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
-                                         bool force_async, pending_pick* pp) {
+static bool pick_from_internal_rr_locked(
+    glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
+    bool force_async, grpc_connected_subchannel** target,
+    wrapped_rr_closure_arg* wc_arg) {
   // Check for drops if we are not using fallback backend addresses.
   if (glb_policy->serverlist != nullptr) {
     // Look at the index into the serverlist to see if we should drop this call.
@@ -598,36 +670,57 @@
       glb_policy->serverlist_index = 0;  // Wrap-around.
     }
     if (server->drop) {
+      // Not using the RR policy, so unref it.
+      if (grpc_lb_glb_trace.enabled()) {
+        gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
+                wc_arg->rr_policy);
+      }
+      GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
       // Update client load reporting stats to indicate the number of
       // dropped calls.  Note that we have to do this here instead of in
       // the client_load_reporting filter, because we do not create a
       // subchannel call (and therefore no client_load_reporting filter)
       // for dropped calls.
-      GPR_ASSERT(glb_policy->client_stats != nullptr);
+      GPR_ASSERT(wc_arg->client_stats != nullptr);
       grpc_grpclb_client_stats_add_call_dropped_locked(
-          server->load_balance_token, glb_policy->client_stats);
+          server->load_balance_token, wc_arg->client_stats);
+      grpc_grpclb_client_stats_unref(wc_arg->client_stats);
       if (force_async) {
-        GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
-        gpr_free(pp);
+        GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
+        GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+        gpr_free(wc_arg->free_when_done);
         return false;
       }
-      gpr_free(pp);
+      gpr_free(wc_arg->free_when_done);
       return true;
     }
   }
-  // Set client_stats and user_data.
-  pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
-  GPR_ASSERT(pp->pick->user_data == nullptr);
-  pp->pick->user_data = (void**)&pp->lb_token;
   // Pick via the RR policy.
-  bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
+  const bool pick_done = grpc_lb_policy_pick_locked(
+      wc_arg->rr_policy, pick_args, target, wc_arg->context,
+      (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
   if (pick_done) {
-    pending_pick_set_metadata_and_context(pp);
-    if (force_async) {
-      GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
-      pick_done = false;
+    /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
+              wc_arg->rr_policy);
     }
-    gpr_free(pp);
+    GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
+    /* add the load reporting initial metadata */
+    initial_metadata_add_lb_token(pick_args->initial_metadata,
+                                  pick_args->lb_token_mdelem_storage,
+                                  GRPC_MDELEM_REF(wc_arg->lb_token));
+    // Pass on client stats via context. Passes ownership of the reference.
+    GPR_ASSERT(wc_arg->client_stats != nullptr);
+    wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
+    wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+    if (force_async) {
+      GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
+      GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+      gpr_free(wc_arg->free_when_done);
+      return false;
+    }
+    gpr_free(wc_arg->free_when_done);
   }
   /* else, the pending pick will be registered and taken care of by the
    * pending pick list inside the RR policy (glb_policy->rr_policy).
@@ -669,7 +762,7 @@
   gpr_free(args);
 }
 
-static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
+static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error);
 static void create_rr_locked(glb_lb_policy* glb_policy,
                              grpc_lb_policy_args* args) {
   GPR_ASSERT(glb_policy->rr_policy == nullptr);
@@ -691,46 +784,72 @@
   glb_policy->base.request_reresolution = nullptr;
   glb_policy->rr_policy = new_rr_policy;
   grpc_error* rr_state_error = nullptr;
-  glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
-      glb_policy->rr_policy, &rr_state_error);
+  const grpc_connectivity_state rr_state =
+      grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
+                                               &rr_state_error);
   /* Connectivity state is a function of the RR policy updated/created */
-  update_lb_connectivity_status_locked(
-      glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
+  update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error);
   /* Add the gRPC LB's interested_parties pollset_set to that of the newly
    * created RR policy. This will make the RR policy progress upon activity on
    * gRPC LB, which in turn is tied to the application's call */
   grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
                                    glb_policy->base.interested_parties);
-  GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
-                    on_rr_connectivity_changed_locked, glb_policy,
+
+  /* Allocate the data for the tracking of the new RR policy's connectivity.
+   * It'll be deallocated in glb_rr_connectivity_changed() */
+  rr_connectivity_data* rr_connectivity =
+      (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data));
+  GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
+                    glb_rr_connectivity_changed_locked, rr_connectivity,
                     grpc_combiner_scheduler(glb_policy->base.combiner));
+  rr_connectivity->glb_policy = glb_policy;
+  rr_connectivity->state = rr_state;
+
   /* Subscribe to changes to the connectivity of the new RR */
-  GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
-  grpc_lb_policy_notify_on_state_change_locked(
-      glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
-      &glb_policy->on_rr_connectivity_changed);
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
+  grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
+                                               &rr_connectivity->state,
+                                               &rr_connectivity->on_change);
   grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
-  // Send pending picks to RR policy.
+
+  /* Update picks and pings in wait */
   pending_pick* pp;
   while ((pp = glb_policy->pending_picks)) {
     glb_policy->pending_picks = pp->next;
+    GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
+    pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
+    pp->wrapped_on_complete_arg.client_stats =
+        grpc_grpclb_client_stats_ref(glb_policy->client_stats);
     if (grpc_lb_glb_trace.enabled()) {
       gpr_log(GPR_INFO,
               "[grpclb %p] Pending pick about to (async) PICK from RR %p",
               glb_policy, glb_policy->rr_policy);
     }
-    pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
+    pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
+                                 true /* force_async */, pp->target,
+                                 &pp->wrapped_on_complete_arg);
   }
-  // Send pending pings to RR policy.
+
   pending_ping* pping;
   while ((pping = glb_policy->pending_pings)) {
     glb_policy->pending_pings = pping->next;
+    grpc_closure* on_initiate = nullptr;
+    grpc_closure* on_ack = nullptr;
+    if (pping->on_initiate != nullptr) {
+      GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+      pping->on_initiate->rr_policy = glb_policy->rr_policy;
+      on_initiate = &pping->on_initiate->wrapper_closure;
+    }
+    if (pping->on_ack != nullptr) {
+      GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+      pping->on_ack->rr_policy = glb_policy->rr_policy;
+      on_ack = &pping->on_ack->wrapper_closure;
+    }
     if (grpc_lb_glb_trace.enabled()) {
       gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
               glb_policy, glb_policy->rr_policy);
     }
-    grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
-                                   pping->on_ack);
+    grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
     gpr_free(pping);
   }
 }
@@ -756,28 +875,31 @@
   lb_policy_args_destroy(args);
 }
 
-static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
-  glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
+  rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg;
+  glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
   if (glb_policy->shutting_down) {
-    GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+    GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+    gpr_free(rr_connectivity);
     return;
   }
-  if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+  if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
     /* An RR policy that has transitioned into the SHUTDOWN connectivity state
      * should not be considered for picks or updates: the SHUTDOWN state is a
      * sink, policies can't transition back from it. .*/
     GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
     glb_policy->rr_policy = nullptr;
-    GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+    GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+    gpr_free(rr_connectivity);
     return;
   }
   /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
-  update_lb_connectivity_status_locked(
-      glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
-  /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
-  grpc_lb_policy_notify_on_state_change_locked(
-      glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
-      &glb_policy->on_rr_connectivity_changed);
+  update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state,
+                                       GRPC_ERROR_REF(error));
+  /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
+  grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
+                                               &rr_connectivity->state,
+                                               &rr_connectivity->on_change);
 }
 
 static void destroy_balancer_name(void* balancer_name) {
@@ -885,17 +1007,22 @@
   gpr_free(glb_policy);
 }
 
-static void glb_shutdown_locked(grpc_lb_policy* pol,
-                                grpc_lb_policy* new_policy) {
+static void glb_shutdown_locked(grpc_lb_policy* pol) {
   glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   glb_policy->shutting_down = true;
+
+  /* We need a copy of the lb_call pointer because we can't cancell the call
+   * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
+   * the cancel, needs to acquire that same lock */
+  grpc_call* lb_call = glb_policy->lb_call;
+
   /* glb_policy->lb_call and this local lb_call must be consistent at this point
    * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
    * of query_for_backends_locked, which can only be invoked while
    * glb_policy->shutting_down is false. */
-  if (glb_policy->lb_call != nullptr) {
-    grpc_call_cancel(glb_policy->lb_call, nullptr);
+  if (lb_call != nullptr) {
+    grpc_call_cancel(lb_call, nullptr);
     /* lb_on_server_status_received will pick up the cancel and clean up */
   }
   if (glb_policy->retry_timer_callback_pending) {
@@ -904,8 +1031,12 @@
   if (glb_policy->fallback_timer_callback_pending) {
     grpc_timer_cancel(&glb_policy->lb_fallback_timer);
   }
+
+  pending_pick* pp = glb_policy->pending_picks;
+  glb_policy->pending_picks = nullptr;
+  pending_ping* pping = glb_policy->pending_pings;
+  glb_policy->pending_pings = nullptr;
   if (glb_policy->rr_policy != nullptr) {
-    grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
     GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
   } else {
     grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
@@ -920,33 +1051,28 @@
   }
   grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
                               GRPC_ERROR_REF(error), "glb_shutdown");
-  // Clear pending picks.
-  pending_pick* pp = glb_policy->pending_picks;
-  glb_policy->pending_picks = nullptr;
+
   while (pp != nullptr) {
     pending_pick* next = pp->next;
-    if (new_policy != nullptr) {
-      // Hand pick over to new policy.
-      grpc_grpclb_client_stats_unref(pp->client_stats);
-      pp->pick->on_complete = pp->original_on_complete;
-      if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
-        // Synchronous return; schedule callback.
-        GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
-      }
-      gpr_free(pp);
-    } else {
-      pp->pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
-    }
+    *pp->target = nullptr;
+    GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+                       GRPC_ERROR_REF(error));
+    gpr_free(pp);
     pp = next;
   }
-  // Clear pending pings.
-  pending_ping* pping = glb_policy->pending_pings;
-  glb_policy->pending_pings = nullptr;
+
   while (pping != nullptr) {
     pending_ping* next = pping->next;
-    GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
-    GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
+    if (pping->on_initiate != nullptr) {
+      GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure,
+                         GRPC_ERROR_REF(error));
+      gpr_free(pping->on_initiate);
+    }
+    if (pping->on_ack != nullptr) {
+      GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
+                         GRPC_ERROR_REF(error));
+      gpr_free(pping->on_ack);
+    }
     gpr_free(pping);
     pping = next;
   }
@@ -964,16 +1090,16 @@
 //   level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
 //   we invoke the completion closure and set *target to nullptr right here.
 static void glb_cancel_pick_locked(grpc_lb_policy* pol,
-                                   grpc_lb_policy_pick_state* pick,
+                                   grpc_connected_subchannel** target,
                                    grpc_error* error) {
   glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
   pending_pick* pp = glb_policy->pending_picks;
   glb_policy->pending_picks = nullptr;
   while (pp != nullptr) {
     pending_pick* next = pp->next;
-    if (pp->pick == pick) {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(&pp->on_complete,
+    if (pp->target == target) {
+      *target = nullptr;
+      GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick Cancelled", &error, 1));
     } else {
@@ -983,7 +1109,7 @@
     pp = next;
   }
   if (glb_policy->rr_policy != nullptr) {
-    grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
+    grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target,
                                       GRPC_ERROR_REF(error));
   }
   GRPC_ERROR_UNREF(error);
@@ -1008,9 +1134,9 @@
   glb_policy->pending_picks = nullptr;
   while (pp != nullptr) {
     pending_pick* next = pp->next;
-    if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+    if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
         initial_metadata_flags_eq) {
-      GRPC_CLOSURE_SCHED(&pp->on_complete,
+      GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick Cancelled", &error, 1));
     } else {
@@ -1036,7 +1162,7 @@
       !glb_policy->fallback_timer_callback_pending) {
     grpc_millis deadline =
         grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
-    GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
+    GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
     GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
                       glb_policy,
                       grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1058,9 +1184,19 @@
 }
 
 static int glb_pick_locked(grpc_lb_policy* pol,
-                           grpc_lb_policy_pick_state* pick) {
+                           const grpc_lb_policy_pick_args* pick_args,
+                           grpc_connected_subchannel** target,
+                           grpc_call_context_element* context, void** user_data,
+                           grpc_closure* on_complete) {
+  if (pick_args->lb_token_mdelem_storage == nullptr) {
+    *target = nullptr;
+    GRPC_CLOSURE_SCHED(on_complete,
+                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                           "No mdelem storage for the LB token. Load reporting "
+                           "won't work without it. Failing"));
+    return 0;
+  }
   glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
-  pending_pick* pp = pending_pick_create(glb_policy, pick);
   bool pick_done = false;
   if (glb_policy->rr_policy != nullptr) {
     const grpc_connectivity_state rr_connectivity_state =
@@ -1068,7 +1204,7 @@
                                                  nullptr);
     // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
     // callback registered to capture this event
-    // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
+    // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We
     // need to make sure we aren't trying to pick from a RR policy instance
     // that's in shutdown.
     if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
@@ -1078,16 +1214,32 @@
                 glb_policy, glb_policy->rr_policy,
                 grpc_connectivity_state_name(rr_connectivity_state));
       }
-      pending_pick_add(&glb_policy->pending_picks, pp);
+      add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
+                       on_complete);
       pick_done = false;
     } else {  // RR not in shutdown
       if (grpc_lb_glb_trace.enabled()) {
         gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
                 glb_policy->rr_policy);
       }
+      GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
+      wrapped_rr_closure_arg* wc_arg =
+          (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
+      GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+                        grpc_schedule_on_exec_ctx);
+      wc_arg->rr_policy = glb_policy->rr_policy;
+      wc_arg->target = target;
+      wc_arg->context = context;
       GPR_ASSERT(glb_policy->client_stats != nullptr);
-      pick_done =
-          pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
+      wc_arg->client_stats =
+          grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+      wc_arg->wrapped_closure = on_complete;
+      wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+      wc_arg->initial_metadata = pick_args->initial_metadata;
+      wc_arg->free_when_done = wc_arg;
+      wc_arg->glb_policy = pol;
+      pick_done = pick_from_internal_rr_locked(
+          glb_policy, pick_args, false /* force_async */, target, wc_arg);
     }
   } else {  // glb_policy->rr_policy == NULL
     if (grpc_lb_glb_trace.enabled()) {
@@ -1095,7 +1247,8 @@
               "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
               glb_policy);
     }
-    pending_pick_add(&glb_policy->pending_picks, pp);
+    add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
+                     on_complete);
     if (!glb_policy->started_picking) {
       start_picking_locked(glb_policy);
     }
@@ -1117,7 +1270,7 @@
   if (glb_policy->rr_policy) {
     grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
   } else {
-    pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
+    add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
     if (!glb_policy->started_picking) {
       start_picking_locked(glb_policy);
     }
@@ -1142,7 +1295,7 @@
     }
     query_for_backends_locked(glb_policy);
   }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
+  GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer");
 }
 
 static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
@@ -1168,7 +1321,7 @@
                 glb_policy);
       }
     }
-    GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
+    GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
     GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
                       lb_call_on_retry_timer_locked, glb_policy,
                       grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1176,8 +1329,8 @@
     grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
                     &glb_policy->lb_on_call_retry);
   }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                       "lb_on_server_status_received_locked");
+  GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+                            "lb_on_server_status_received_locked");
 }
 
 static void send_client_load_report_locked(void* arg, grpc_error* error);
@@ -1200,7 +1353,7 @@
   glb_policy->client_load_report_payload = nullptr;
   if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
     glb_policy->client_load_report_timer_callback_pending = false;
-    GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
+    GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
     if (glb_policy->lb_call == nullptr) {
       maybe_restart_lb_call(glb_policy);
     }
@@ -1241,7 +1394,7 @@
   glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
   if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
     glb_policy->client_load_report_timer_callback_pending = false;
-    GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
+    GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
     if (glb_policy->lb_call == nullptr) {
       maybe_restart_lb_call(glb_policy);
     }
@@ -1394,8 +1547,10 @@
   op->flags = 0;
   op->reserved = nullptr;
   op++;
-  /* take a ref to be released in lb_on_sent_initial_request_locked() */
-  GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+  /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
+   * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
+                          "lb_on_sent_initial_request_locked");
   call_error = grpc_call_start_batch_and_execute(
       glb_policy->lb_call, ops, (size_t)(op - ops),
       &glb_policy->lb_on_sent_initial_request);
@@ -1411,8 +1566,10 @@
   op->flags = 0;
   op->reserved = nullptr;
   op++;
-  /* take a ref to be released in lb_on_server_status_received_locked() */
-  GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
+  /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
+   * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
+                          "lb_on_server_status_received_locked");
   call_error = grpc_call_start_batch_and_execute(
       glb_policy->lb_call, ops, (size_t)(op - ops),
       &glb_policy->lb_on_server_status_received);
@@ -1424,8 +1581,9 @@
   op->flags = 0;
   op->reserved = nullptr;
   op++;
-  /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
-  GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
+  /* take another weak ref to be unref'd/reused in
+   * lb_on_response_received_locked */
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
   call_error = grpc_call_start_batch_and_execute(
       glb_policy->lb_call, ops, (size_t)(op - ops),
       &glb_policy->lb_on_response_received);
@@ -1440,7 +1598,8 @@
   if (glb_policy->client_load_report_payload != nullptr) {
     do_send_client_load_report_locked(glb_policy);
   }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+  GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+                            "lb_on_sent_initial_request_locked");
 }
 
 static void lb_on_response_received_locked(void* arg, grpc_error* error) {
@@ -1472,9 +1631,11 @@
                   "client load reporting interval = %" PRIdPTR " milliseconds",
                   glb_policy, glb_policy->client_stats_report_interval);
         }
-        /* take a ref to be unref'd in send_client_load_report_locked() */
+        /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
+         * strong ref count goes to zero) to be unref'd in
+         * send_client_load_report_locked() */
         glb_policy->client_load_report_timer_callback_pending = true;
-        GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
+        GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
         schedule_next_client_load_report(glb_policy);
       } else if (grpc_lb_glb_trace.enabled()) {
         gpr_log(GPR_INFO,
@@ -1556,21 +1717,21 @@
       op->flags = 0;
       op->reserved = nullptr;
       op++;
-      /* reuse the "lb_on_response_received_locked" ref taken in
+      /* reuse the "lb_on_response_received_locked" weak ref taken in
        * query_for_backends_locked() */
       const grpc_call_error call_error = grpc_call_start_batch_and_execute(
           glb_policy->lb_call, ops, (size_t)(op - ops),
           &glb_policy->lb_on_response_received); /* loop */
       GPR_ASSERT(GRPC_CALL_OK == call_error);
     } else {
-      GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                           "lb_on_response_received_locked_shutdown");
+      GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+                                "lb_on_response_received_locked_shutdown");
     }
   } else { /* empty payload: call cancelled. */
-           /* dispose of the "lb_on_response_received_locked" ref taken in
+           /* dispose of the "lb_on_response_received_locked" weak ref taken in
             * query_for_backends_locked() and reused in every reception loop */
-    GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                         "lb_on_response_received_locked_empty_payload");
+    GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+                              "lb_on_response_received_locked_empty_payload");
   }
 }
 
@@ -1590,7 +1751,7 @@
       rr_handover_locked(glb_policy);
     }
   }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
+  GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer");
 }
 
 static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
@@ -1674,7 +1835,7 @@
         grpc_channel_get_channel_stack(glb_policy->lb_channel));
     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
     glb_policy->watching_lb_channel = true;
-    GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+    GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
     grpc_client_channel_watch_connectivity_state(
         client_channel_elem,
         grpc_polling_entity_create_from_pollset_set(
@@ -1730,8 +1891,8 @@
     case GRPC_CHANNEL_SHUTDOWN:
     done:
       glb_policy->watching_lb_channel = false;
-      GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                           "watch_lb_channel_connectivity_cb_shutdown");
+      GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+                                "watch_lb_channel_connectivity_cb_shutdown");
       break;
   }
 }
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 6038527..9ff40aa 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -31,6 +31,15 @@
 
 grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
 
+namespace {
+struct pending_pick {
+  struct pending_pick* next;
+  uint32_t initial_metadata_flags;
+  grpc_connected_subchannel** target;
+  grpc_closure* on_complete;
+};
+}  // namespace
+
 typedef struct {
   /** base policy: must be first */
   grpc_lb_policy base;
@@ -45,7 +54,7 @@
   /** are we shut down? */
   bool shutdown;
   /** list of picks that are waiting on connectivity */
-  grpc_lb_policy_pick_state* pending_picks;
+  pending_pick* pending_picks;
   /** our connectivity state tracker */
   grpc_connectivity_state_tracker state_tracker;
 } pick_first_lb_policy;
@@ -63,27 +72,19 @@
   }
 }
 
-static void pf_shutdown_locked(grpc_lb_policy* pol,
-                               grpc_lb_policy* new_policy) {
+static void pf_shutdown_locked(grpc_lb_policy* pol) {
   pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_pick_first_trace.enabled()) {
     gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
   }
   p->shutdown = true;
-  grpc_lb_policy_pick_state* pick;
-  while ((pick = p->pending_picks) != nullptr) {
-    p->pending_picks = pick->next;
-    if (new_policy != nullptr) {
-      // Hand off to new LB policy.
-      if (grpc_lb_policy_pick_locked(new_policy, pick)) {
-        // Synchronous return, schedule closure.
-        GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
-      }
-    } else {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
-    }
+  pending_pick* pp;
+  while ((pp = p->pending_picks) != nullptr) {
+    p->pending_picks = pp->next;
+    *pp->target = nullptr;
+    GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
+    gpr_free(pp);
   }
   grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
                               GRPC_ERROR_REF(error), "shutdown");
@@ -103,18 +104,19 @@
 }
 
 static void pf_cancel_pick_locked(grpc_lb_policy* pol,
-                                  grpc_lb_policy_pick_state* pick,
+                                  grpc_connected_subchannel** target,
                                   grpc_error* error) {
   pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
-  grpc_lb_policy_pick_state* pp = p->pending_picks;
+  pending_pick* pp = p->pending_picks;
   p->pending_picks = nullptr;
   while (pp != nullptr) {
-    grpc_lb_policy_pick_state* next = pp->next;
-    if (pp == pick) {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(pick->on_complete,
+    pending_pick* next = pp->next;
+    if (pp->target == target) {
+      *target = nullptr;
+      GRPC_CLOSURE_SCHED(pp->on_complete,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick Cancelled", &error, 1));
+      gpr_free(pp);
     } else {
       pp->next = p->pending_picks;
       p->pending_picks = pp;
@@ -129,20 +131,21 @@
                                    uint32_t initial_metadata_flags_eq,
                                    grpc_error* error) {
   pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
-  grpc_lb_policy_pick_state* pick = p->pending_picks;
+  pending_pick* pp = p->pending_picks;
   p->pending_picks = nullptr;
-  while (pick != nullptr) {
-    grpc_lb_policy_pick_state* next = pick->next;
-    if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+  while (pp != nullptr) {
+    pending_pick* next = pp->next;
+    if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
         initial_metadata_flags_eq) {
-      GRPC_CLOSURE_SCHED(pick->on_complete,
+      GRPC_CLOSURE_SCHED(pp->on_complete,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick Cancelled", &error, 1));
+      gpr_free(pp);
     } else {
-      pick->next = p->pending_picks;
-      p->pending_picks = pick;
+      pp->next = p->pending_picks;
+      p->pending_picks = pp;
     }
-    pick = next;
+    pp = next;
   }
   GRPC_ERROR_UNREF(error);
 }
@@ -172,20 +175,27 @@
 }
 
 static int pf_pick_locked(grpc_lb_policy* pol,
-                          grpc_lb_policy_pick_state* pick) {
+                          const grpc_lb_policy_pick_args* pick_args,
+                          grpc_connected_subchannel** target,
+                          grpc_call_context_element* context, void** user_data,
+                          grpc_closure* on_complete) {
   pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
   // If we have a selected subchannel already, return synchronously.
   if (p->selected != nullptr) {
-    pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
-        p->selected->connected_subchannel, "picked");
+    *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel,
+                                            "picked");
     return 1;
   }
   // No subchannel selected yet, so handle asynchronously.
   if (!p->started_picking) {
     start_picking_locked(p);
   }
-  pick->next = p->pending_picks;
-  p->pending_picks = pick;
+  pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
+  pp->next = p->pending_picks;
+  pp->target = target;
+  pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+  pp->on_complete = on_complete;
+  p->pending_picks = pp;
   return 0;
 }
 
@@ -471,17 +481,18 @@
       // Drop all other subchannels, since we are now connected.
       destroy_unselected_subchannels_locked(p);
       // Update any calls that were waiting for a pick.
-      grpc_lb_policy_pick_state* pick;
-      while ((pick = p->pending_picks)) {
-        p->pending_picks = pick->next;
-        pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+      pending_pick* pp;
+      while ((pp = p->pending_picks)) {
+        p->pending_picks = pp->next;
+        *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
             p->selected->connected_subchannel, "picked");
         if (grpc_lb_pick_first_trace.enabled()) {
           gpr_log(GPR_INFO,
                   "Servicing pending pick with selected subchannel %p",
                   (void*)p->selected);
         }
-        GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+        GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
+        gpr_free(pp);
       }
       // Renew notification.
       grpc_lb_subchannel_data_start_connectivity_watch(sd);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 92c7d5b..a964af0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -41,6 +41,31 @@
 
 grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
 
+namespace {
+/** List of entities waiting for a pick.
+ *
+ * Once a pick is available, \a target is updated and \a on_complete called. */
+struct pending_pick {
+  pending_pick* next;
+
+  /* output argument where to store the pick()ed user_data. It'll be NULL if no
+   * such data is present or there's an error (the definite test for errors is
+   * \a target being NULL). */
+  void** user_data;
+
+  /* bitmask passed to pick() and used for selective cancelling. See
+   * grpc_lb_policy_cancel_picks() */
+  uint32_t initial_metadata_flags;
+
+  /* output argument where to store the pick()ed connected subchannel, or NULL
+   * upon error. */
+  grpc_connected_subchannel** target;
+
+  /* to be invoked once the pick() has completed (regardless of success) */
+  grpc_closure* on_complete;
+};
+}  // namespace
+
 typedef struct round_robin_lb_policy {
   /** base policy: must be first */
   grpc_lb_policy base;
@@ -52,7 +77,7 @@
   /** are we shutting down? */
   bool shutdown;
   /** List of picks that are waiting on connectivity */
-  grpc_lb_policy_pick_state* pending_picks;
+  pending_pick* pending_picks;
 
   /** our connectivity state tracker */
   grpc_connectivity_state_tracker state_tracker;
@@ -144,27 +169,19 @@
   gpr_free(p);
 }
 
-static void rr_shutdown_locked(grpc_lb_policy* pol,
-                               grpc_lb_policy* new_policy) {
+static void rr_shutdown_locked(grpc_lb_policy* pol) {
   round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_round_robin_trace.enabled()) {
     gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
   }
   p->shutdown = true;
-  grpc_lb_policy_pick_state* pick;
-  while ((pick = p->pending_picks) != nullptr) {
-    p->pending_picks = pick->next;
-    if (new_policy != nullptr) {
-      // Hand off to new LB policy.
-      if (grpc_lb_policy_pick_locked(new_policy, pick)) {
-        // Synchronous return; schedule callback.
-        GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
-      }
-    } else {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
-    }
+  pending_pick* pp;
+  while ((pp = p->pending_picks) != nullptr) {
+    p->pending_picks = pp->next;
+    *pp->target = nullptr;
+    GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
+    gpr_free(pp);
   }
   grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
                               GRPC_ERROR_REF(error), "rr_shutdown");
@@ -184,18 +201,19 @@
 }
 
 static void rr_cancel_pick_locked(grpc_lb_policy* pol,
-                                  grpc_lb_policy_pick_state* pick,
+                                  grpc_connected_subchannel** target,
                                   grpc_error* error) {
   round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
-  grpc_lb_policy_pick_state* pp = p->pending_picks;
+  pending_pick* pp = p->pending_picks;
   p->pending_picks = nullptr;
   while (pp != nullptr) {
-    grpc_lb_policy_pick_state* next = pp->next;
-    if (pp == pick) {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(pick->on_complete,
+    pending_pick* next = pp->next;
+    if (pp->target == target) {
+      *target = nullptr;
+      GRPC_CLOSURE_SCHED(pp->on_complete,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick cancelled", &error, 1));
+      gpr_free(pp);
     } else {
       pp->next = p->pending_picks;
       p->pending_picks = pp;
@@ -210,21 +228,22 @@
                                    uint32_t initial_metadata_flags_eq,
                                    grpc_error* error) {
   round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
-  grpc_lb_policy_pick_state* pick = p->pending_picks;
+  pending_pick* pp = p->pending_picks;
   p->pending_picks = nullptr;
-  while (pick != nullptr) {
-    grpc_lb_policy_pick_state* next = pick->next;
-    if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+  while (pp != nullptr) {
+    pending_pick* next = pp->next;
+    if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
         initial_metadata_flags_eq) {
-      pick->connected_subchannel = nullptr;
-      GRPC_CLOSURE_SCHED(pick->on_complete,
+      *pp->target = nullptr;
+      GRPC_CLOSURE_SCHED(pp->on_complete,
                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                              "Pick cancelled", &error, 1));
+      gpr_free(pp);
     } else {
-      pick->next = p->pending_picks;
-      p->pending_picks = pick;
+      pp->next = p->pending_picks;
+      p->pending_picks = pp;
     }
-    pick = next;
+    pp = next;
   }
   GRPC_ERROR_UNREF(error);
 }
@@ -249,10 +268,13 @@
 }
 
 static int rr_pick_locked(grpc_lb_policy* pol,
-                          grpc_lb_policy_pick_state* pick) {
+                          const grpc_lb_policy_pick_args* pick_args,
+                          grpc_connected_subchannel** target,
+                          grpc_call_context_element* context, void** user_data,
+                          grpc_closure* on_complete) {
   round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
   if (grpc_lb_round_robin_trace.enabled()) {
-    gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
+    gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol,
             p->shutdown);
   }
   GPR_ASSERT(!p->shutdown);
@@ -262,18 +284,18 @@
       /* readily available, report right away */
       grpc_lb_subchannel_data* sd =
           &p->subchannel_list->subchannels[next_ready_index];
-      pick->connected_subchannel =
+      *target =
           GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
-      if (pick->user_data != nullptr) {
-        *pick->user_data = sd->user_data;
+      if (user_data != nullptr) {
+        *user_data = sd->user_data;
       }
       if (grpc_lb_round_robin_trace.enabled()) {
         gpr_log(
             GPR_DEBUG,
             "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
-            "index %" PRIuPTR ")",
-            p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list,
-            next_ready_index);
+            "index %lu)",
+            (void*)p, (void*)sd->subchannel, (void*)*target,
+            (void*)sd->subchannel_list, (unsigned long)next_ready_index);
       }
       /* only advance the last picked pointer if the selection was used */
       update_last_ready_subchannel_index_locked(p, next_ready_index);
@@ -284,8 +306,13 @@
   if (!p->started_picking) {
     start_picking_locked(p);
   }
-  pick->next = p->pending_picks;
-  p->pending_picks = pick;
+  pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
+  pp->next = p->pending_picks;
+  pp->target = target;
+  pp->on_complete = on_complete;
+  pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+  pp->user_data = user_data;
+  p->pending_picks = pp;
   return 0;
 }
 
@@ -468,13 +495,13 @@
         // picks, update the last picked pointer
         update_last_ready_subchannel_index_locked(p, next_ready_index);
       }
-      grpc_lb_policy_pick_state* pick;
-      while ((pick = p->pending_picks)) {
-        p->pending_picks = pick->next;
-        pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+      pending_pick* pp;
+      while ((pp = p->pending_picks)) {
+        p->pending_picks = pp->next;
+        *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
             selected->connected_subchannel, "rr_picked");
-        if (pick->user_data != nullptr) {
-          *pick->user_data = selected->user_data;
+        if (pp->user_data != nullptr) {
+          *pp->user_data = selected->user_data;
         }
         if (grpc_lb_round_robin_trace.enabled()) {
           gpr_log(GPR_DEBUG,
@@ -483,7 +510,8 @@
                   (void*)p, (void*)selected->subchannel,
                   (void*)p->subchannel_list, (unsigned long)next_ready_index);
         }
-        GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+        GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
+        gpr_free(pp);
       }
     }
     // Renew notification.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index 5ce1298..a3b4c8e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -213,13 +213,13 @@
 
 void grpc_lb_subchannel_list_ref_for_connectivity_watch(
     grpc_lb_subchannel_list* subchannel_list, const char* reason) {
-  GRPC_LB_POLICY_REF(subchannel_list->policy, reason);
+  GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason);
   grpc_lb_subchannel_list_ref(subchannel_list, reason);
 }
 
 void grpc_lb_subchannel_list_unref_for_connectivity_watch(
     grpc_lb_subchannel_list* subchannel_list, const char* reason) {
-  GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason);
+  GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason);
   grpc_lb_subchannel_list_unref(subchannel_list, reason);
 }