Merge pull request #9664 from ctiller/c3+r+l

Make load balancers use combiner locks
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 6cbc333..21ba430 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -183,7 +183,7 @@
   grpc_pollset_set *interested_parties;
 
   /* the following properties are guarded by a mutex since API's require them
-     to be instantaniously available */
+     to be instantaneously available */
   gpr_mu info_mu;
   char *info_lb_policy_name;
   /** service config in JSON form */
@@ -200,9 +200,9 @@
   grpc_lb_policy *lb_policy;
 } lb_policy_connectivity_watcher;
 
-static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
-                            grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state);
+static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
+                                   grpc_lb_policy *lb_policy,
+                                   grpc_connectivity_state current_state);
 
 static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
                                                   channel_data *chand,
@@ -213,7 +213,7 @@
        state == GRPC_CHANNEL_SHUTDOWN) &&
       chand->lb_policy != NULL) {
     /* cancel picks with wait_for_ready=false */
-    grpc_lb_policy_cancel_picks(
+    grpc_lb_policy_cancel_picks_locked(
         exec_ctx, chand->lb_policy,
         /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
         /* check= */ 0, GRPC_ERROR_REF(error));
@@ -237,7 +237,7 @@
     set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
                                           GRPC_ERROR_REF(error), "lb_changed");
     if (w->state != GRPC_CHANNEL_SHUTDOWN) {
-      watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+      watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
     }
   }
 
@@ -245,9 +245,9 @@
   gpr_free(w);
 }
 
-static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
-                            grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state) {
+static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
+                                   grpc_lb_policy *lb_policy,
+                                   grpc_connectivity_state current_state) {
   lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
 
@@ -256,8 +256,8 @@
                     grpc_combiner_scheduler(chand->combiner, false));
   w->state = current_state;
   w->lb_policy = lb_policy;
-  grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
-                                        &w->on_changed);
+  grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
+                                               &w->on_changed);
 }
 
 static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
@@ -313,13 +313,14 @@
     grpc_lb_policy_args lb_policy_args;
     lb_policy_args.args = chand->resolver_result;
     lb_policy_args.client_channel_factory = chand->client_channel_factory;
+    lb_policy_args.combiner = chand->combiner;
     lb_policy =
         grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
     if (lb_policy != NULL) {
       GRPC_LB_POLICY_REF(lb_policy, "config_change");
       GRPC_ERROR_UNREF(state_error);
-      state =
-          grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
+      state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
+                                                       &state_error);
     }
     // Find service config.
     channel_arg =
@@ -383,7 +384,7 @@
     set_channel_connectivity_state_locked(
         exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
     if (lb_policy != NULL) {
-      watch_lb_policy(exec_ctx, chand, lb_policy, state);
+      watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
     }
     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
     grpc_resolver_next_locked(exec_ctx, chand->resolver,
@@ -404,7 +405,7 @@
   }
 
   if (exit_idle) {
-    grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+    grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
   }
 
@@ -441,7 +442,7 @@
       grpc_closure_sched(exec_ctx, op->send_ping,
                          GRPC_ERROR_CREATE("Ping with no load balancing"));
     } else {
-      grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+      grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
       op->bind_pollset = NULL;
     }
     op->send_ping = NULL;
@@ -808,8 +809,9 @@
 
   if (initial_metadata == NULL) {
     if (chand->lb_policy != NULL) {
-      grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
-                                 connected_subchannel, GRPC_ERROR_REF(error));
+      grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
+                                        connected_subchannel,
+                                        GRPC_ERROR_REF(error));
     }
     for (closure = chand->waiting_for_config_closures.head; closure != NULL;
          closure = closure->next_data.next) {
@@ -848,7 +850,7 @@
     const grpc_lb_policy_pick_args inputs = {
         initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
         gpr_inf_future(GPR_CLOCK_MONOTONIC)};
-    const bool result = grpc_lb_policy_pick(
+    const bool result = grpc_lb_policy_pick_locked(
         exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
     GPR_TIMER_END("pick_subchannel", 0);
@@ -1216,7 +1218,7 @@
                                   grpc_error *error_ignored) {
   channel_data *chand = arg;
   if (chand->lb_policy != NULL) {
-    grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+    grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
   } else {
     chand->exit_idle_when_lb_policy_arrives = true;
     if (!chand->started_resolving && chand->resolver != NULL) {
diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c
index 90401b5..aba51ad 100644
--- a/src/core/ext/client_channel/lb_policy.c
+++ b/src/core/ext/client_channel/lb_policy.c
@@ -32,14 +32,17 @@
  */
 
 #include "src/core/ext/client_channel/lb_policy.h"
+#include "src/core/lib/iomgr/combiner.h"
 
 #define WEAK_REF_BITS 16
 
 void grpc_lb_policy_init(grpc_lb_policy *policy,
-                         const grpc_lb_policy_vtable *vtable) {
+                         const grpc_lb_policy_vtable *vtable,
+                         grpc_combiner *combiner) {
   policy->vtable = vtable;
   gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
   policy->interested_parties = grpc_pollset_set_create();
+  policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
 }
 
 #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -71,6 +74,13 @@
   ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
 }
 
+static void shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                            grpc_error *error) {
+  grpc_lb_policy *policy = arg;
+  policy->vtable->shutdown_locked(exec_ctx, policy);
+  GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, policy, "strong-unref");
+}
+
 void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
                           grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
   gpr_atm old_val =
@@ -79,10 +89,15 @@
   gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
   gpr_atm check = 1 << WEAK_REF_BITS;
   if ((old_val & mask) == check) {
-    policy->vtable->shutdown(exec_ctx, policy);
+    grpc_closure_sched(
+        exec_ctx,
+        grpc_closure_create(shutdown_locked, policy,
+                            grpc_combiner_scheduler(policy->combiner, false)),
+        GRPC_ERROR_NONE);
+  } else {
+    grpc_lb_policy_weak_unref(exec_ctx,
+                              policy REF_FUNC_PASS_ARGS("strong-unref"));
   }
-  grpc_lb_policy_weak_unref(exec_ctx,
-                            policy REF_FUNC_PASS_ARGS("strong-unref"));
 }
 
 void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
@@ -95,52 +110,58 @@
       ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
   if (old_val == 1) {
     grpc_pollset_set_destroy(exec_ctx, policy->interested_parties);
+    grpc_combiner *combiner = policy->combiner;
     policy->vtable->destroy(exec_ctx, policy);
+    GRPC_COMBINER_UNREF(exec_ctx, combiner, "lb_policy");
   }
 }
 
-int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                        const grpc_lb_policy_pick_args *pick_args,
-                        grpc_connected_subchannel **target, void **user_data,
-                        grpc_closure *on_complete) {
-  return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
-                              on_complete);
+int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                               const grpc_lb_policy_pick_args *pick_args,
+                               grpc_connected_subchannel **target,
+                               void **user_data, grpc_closure *on_complete) {
+  return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target,
+                                     user_data, on_complete);
 }
 
-void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                                grpc_connected_subchannel **target,
-                                grpc_error *error) {
-  policy->vtable->cancel_pick(exec_ctx, policy, target, error);
+void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
+                                       grpc_lb_policy *policy,
+                                       grpc_connected_subchannel **target,
+                                       grpc_error *error) {
+  policy->vtable->cancel_pick_locked(exec_ctx, policy, target, error);
 }
 
-void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
-                                 grpc_lb_policy *policy,
-                                 uint32_t initial_metadata_flags_mask,
-                                 uint32_t initial_metadata_flags_eq,
-                                 grpc_error *error) {
-  policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
-                               initial_metadata_flags_eq, error);
+void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+                                        grpc_lb_policy *policy,
+                                        uint32_t initial_metadata_flags_mask,
+                                        uint32_t initial_metadata_flags_eq,
+                                        grpc_error *error) {
+  policy->vtable->cancel_picks_locked(exec_ctx, policy,
+                                      initial_metadata_flags_mask,
+                                      initial_metadata_flags_eq, error);
 }
 
-void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
-  policy->vtable->exit_idle(exec_ctx, policy);
+void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
+                                     grpc_lb_policy *policy) {
+  policy->vtable->exit_idle_locked(exec_ctx, policy);
 }
 
-void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                             grpc_closure *closure) {
-  policy->vtable->ping_one(exec_ctx, policy, closure);
+void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
+                                    grpc_lb_policy *policy,
+                                    grpc_closure *closure) {
+  policy->vtable->ping_one_locked(exec_ctx, policy, closure);
 }
 
-void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
-                                           grpc_lb_policy *policy,
-                                           grpc_connectivity_state *state,
-                                           grpc_closure *closure) {
-  policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure);
+void grpc_lb_policy_notify_on_state_change_locked(
+    grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+    grpc_connectivity_state *state, grpc_closure *closure) {
+  policy->vtable->notify_on_state_change_locked(exec_ctx, policy, state,
+                                                closure);
 }
 
-grpc_connectivity_state grpc_lb_policy_check_connectivity(
+grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
     grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
     grpc_error **connectivity_error) {
-  return policy->vtable->check_connectivity(exec_ctx, policy,
-                                            connectivity_error);
+  return policy->vtable->check_connectivity_locked(exec_ctx, policy,
+                                                   connectivity_error);
 }
diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h
index 120c641..3405709 100644
--- a/src/core/ext/client_channel/lb_policy.h
+++ b/src/core/ext/client_channel/lb_policy.h
@@ -51,6 +51,8 @@
   gpr_atm ref_pair;
   /* owned pointer to interested parties in load balancing decisions */
   grpc_pollset_set *interested_parties;
+  /* combiner under which lb_policy actions take place */
+  grpc_combiner *combiner;
 };
 
 /** Extra arguments for an LB pick */
@@ -69,42 +71,44 @@
 
 struct grpc_lb_policy_vtable {
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+  void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
 
   /** \see grpc_lb_policy_pick */
-  int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-              const grpc_lb_policy_pick_args *pick_args,
-              grpc_connected_subchannel **target, void **user_data,
-              grpc_closure *on_complete);
+  int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                     const grpc_lb_policy_pick_args *pick_args,
+                     grpc_connected_subchannel **target, void **user_data,
+                     grpc_closure *on_complete);
 
   /** \see grpc_lb_policy_cancel_pick */
-  void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                      grpc_connected_subchannel **target, grpc_error *error);
+  void (*cancel_pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                             grpc_connected_subchannel **target,
+                             grpc_error *error);
 
   /** \see grpc_lb_policy_cancel_picks */
-  void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                       uint32_t initial_metadata_flags_mask,
-                       uint32_t initial_metadata_flags_eq, grpc_error *error);
+  void (*cancel_picks_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                              uint32_t initial_metadata_flags_mask,
+                              uint32_t initial_metadata_flags_eq,
+                              grpc_error *error);
 
   /** \see grpc_lb_policy_ping_one */
-  void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                   grpc_closure *closure);
+  void (*ping_one_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                          grpc_closure *closure);
 
   /** Try to enter a READY connectivity state */
-  void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+  void (*exit_idle_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
 
   /** check the current connectivity of the lb_policy */
-  grpc_connectivity_state (*check_connectivity)(
+  grpc_connectivity_state (*check_connectivity_locked)(
       grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
       grpc_error **connectivity_error);
 
   /** call notify when the connectivity state of a channel changes from *state.
       Updates *state with the new state of the policy. Calling with a NULL \a
       state cancels the subscription.  */
-  void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
-                                 grpc_lb_policy *policy,
-                                 grpc_connectivity_state *state,
-                                 grpc_closure *closure);
+  void (*notify_on_state_change_locked)(grpc_exec_ctx *exec_ctx,
+                                        grpc_lb_policy *policy,
+                                        grpc_connectivity_state *state,
+                                        grpc_closure *closure);
 };
 
 /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
@@ -144,7 +148,8 @@
 
 /** called by concrete implementations to initialize the base struct */
 void grpc_lb_policy_init(grpc_lb_policy *policy,
-                         const grpc_lb_policy_vtable *vtable);
+                         const grpc_lb_policy_vtable *vtable,
+                         grpc_combiner *combiner);
 
 /** Finds an appropriate subchannel for a call, based on \a pick_args.
 
@@ -159,43 +164,45 @@
 
     Any IO should be done under the \a interested_parties \a grpc_pollset_set
     in the \a grpc_lb_policy struct. */
-int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                        const grpc_lb_policy_pick_args *pick_args,
-                        grpc_connected_subchannel **target, void **user_data,
-                        grpc_closure *on_complete);
+int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+                               const grpc_lb_policy_pick_args *pick_args,
+                               grpc_connected_subchannel **target,
+                               void **user_data, grpc_closure *on_complete);
 
 /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
     against one of the connected subchannels managed by \a policy. */
-void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                             grpc_closure *closure);
+void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
+                                    grpc_lb_policy *policy,
+                                    grpc_closure *closure);
 
 /** Cancel picks for \a target.
     The \a on_complete callback of the pending picks will be invoked with \a
     *target set to NULL. */
-void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                                grpc_connected_subchannel **target,
-                                grpc_error *error);
+void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
+                                       grpc_lb_policy *policy,
+                                       grpc_connected_subchannel **target,
+                                       grpc_error *error);
 
 /** Cancel all pending picks for which their \a initial_metadata_flags (as given
     in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
     when AND'd with \a initial_metadata_flags_mask */
-void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
-                                 grpc_lb_policy *policy,
-                                 uint32_t initial_metadata_flags_mask,
-                                 uint32_t initial_metadata_flags_eq,
-                                 grpc_error *error);
+void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+                                        grpc_lb_policy *policy,
+                                        uint32_t initial_metadata_flags_mask,
+                                        uint32_t initial_metadata_flags_eq,
+                                        grpc_error *error);
 
 /** Try to enter a READY connectivity state */
-void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
+                                     grpc_lb_policy *policy);
 
 /* Call notify when the connectivity state of a channel changes from \a *state.
  * Updates \a *state with the new state of the policy */
-void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
-                                           grpc_lb_policy *policy,
-                                           grpc_connectivity_state *state,
-                                           grpc_closure *closure);
+void grpc_lb_policy_notify_on_state_change_locked(
+    grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+    grpc_connectivity_state *state, grpc_closure *closure);
 
-grpc_connectivity_state grpc_lb_policy_check_connectivity(
+grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
     grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
     grpc_error **connectivity_error);
 
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h
index 9b8b03f..27c12c0 100644
--- a/src/core/ext/client_channel/lb_policy_factory.h
+++ b/src/core/ext/client_channel/lb_policy_factory.h
@@ -107,6 +107,7 @@
 typedef struct grpc_lb_policy_args {
   grpc_client_channel_factory *client_channel_factory;
   grpc_channel_args *args;
+  grpc_combiner *combiner;
 } grpc_lb_policy_args;
 
 struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 09c68a9..f2da148 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -438,7 +438,7 @@
   gpr_mu_unlock(&w->subchannel->mu);
   GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
   gpr_free(w);
-  follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+  grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
 }
 
 static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 8a2af48..b5210cb 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -115,6 +115,7 @@
 #include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
 #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
 #include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/iomgr/timer.h"
@@ -285,9 +286,6 @@
   /** base policy: must be first */
   grpc_lb_policy base;
 
-  /** mutex protecting remaining members */
-  gpr_mu mu;
-
   /** who the client is trying to communicate with */
   const char *server_name;
   grpc_client_channel_factory *cc_factory;
@@ -557,9 +555,9 @@
     const grpc_lb_policy_pick_args *pick_args,
     grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
   GPR_ASSERT(rr_policy != NULL);
-  const bool pick_done =
-      grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
-                          (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+  const bool pick_done = grpc_lb_policy_pick_locked(
+      exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token,
+      &wc_arg->wrapper_closure);
   if (pick_done) {
     /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
     if (grpc_lb_glb_trace) {
@@ -590,6 +588,7 @@
   grpc_lb_policy_args args;
   memset(&args, 0, sizeof(args));
   args.client_channel_factory = glb_policy->cc_factory;
+  args.combiner = glb_policy->base.combiner;
   grpc_lb_addresses *addresses =
       process_serverlist_locked(exec_ctx, serverlist);
 
@@ -608,8 +607,8 @@
   return rr;
 }
 
-static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
-                                        grpc_error *error);
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
+                                               void *arg, grpc_error *error);
 /* glb_policy->rr_policy may be NULL (initial handover) */
 static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
                                glb_lb_policy *glb_policy) {
@@ -633,8 +632,8 @@
 
   grpc_error *new_rr_state_error = NULL;
   const grpc_connectivity_state new_rr_state =
-      grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy,
-                                        &new_rr_state_error);
+      grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
+                                               &new_rr_state_error);
   /* Connectivity state is a function of the new RR policy just created */
   const bool replace_old_rr = update_lb_connectivity_status_locked(
       exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
@@ -677,17 +676,18 @@
   rr_connectivity_data *rr_connectivity =
       gpr_malloc(sizeof(rr_connectivity_data));
   memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
-  grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
-                    rr_connectivity, grpc_schedule_on_exec_ctx);
+  grpc_closure_init(&rr_connectivity->on_change,
+                    glb_rr_connectivity_changed_locked, rr_connectivity,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
   rr_connectivity->glb_policy = glb_policy;
   rr_connectivity->state = new_rr_state;
 
   /* Subscribe to changes to the connectivity of the new RR */
   GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
-  grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
-                                        &rr_connectivity->state,
-                                        &rr_connectivity->on_change);
-  grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
+  grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+                                               &rr_connectivity->state,
+                                               &rr_connectivity->on_change);
+  grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
 
   /* Update picks and pings in wait */
   pending_pick *pp;
@@ -713,17 +713,16 @@
       gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
               (intptr_t)glb_policy->rr_policy);
     }
-    grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
-                            &pping->wrapped_notify_arg.wrapper_closure);
+    grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
+                                   &pping->wrapped_notify_arg.wrapper_closure);
   }
 }
 
-static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
-                                        grpc_error *error) {
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
+                                               void *arg, grpc_error *error) {
   rr_connectivity_data *rr_connectivity = arg;
   glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
 
-  gpr_mu_lock(&glb_policy->mu);
   const bool shutting_down = glb_policy->shutting_down;
   bool unref_needed = false;
   GRPC_ERROR_REF(error);
@@ -740,11 +739,10 @@
     update_lb_connectivity_status_locked(exec_ctx, glb_policy,
                                          rr_connectivity->state, error);
     /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
-    grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
-                                          &rr_connectivity->state,
-                                          &rr_connectivity->on_change);
+    grpc_lb_policy_notify_on_state_change_locked(
+        exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
+        &rr_connectivity->on_change);
   }
-  gpr_mu_unlock(&glb_policy->mu);
   if (unref_needed) {
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
                               "rr_connectivity_cb");
@@ -899,8 +897,7 @@
     gpr_free(glb_policy);
     return NULL;
   }
-  grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
-  gpr_mu_init(&glb_policy->mu);
+  grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
   grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
                                "grpclb");
   return &glb_policy->base;
@@ -918,13 +915,11 @@
   if (glb_policy->serverlist != NULL) {
     grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
   }
-  gpr_mu_destroy(&glb_policy->mu);
   gpr_free(glb_policy);
 }
 
-static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   glb_policy->shutting_down = true;
 
   pending_pick *pp = glb_policy->pending_picks;
@@ -941,7 +936,6 @@
    * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
    * the cancel, needs to acquire that same lock */
   grpc_call *lb_call = glb_policy->lb_call;
-  gpr_mu_unlock(&glb_policy->mu);
 
   /* glb_policy->lb_call and this local lb_call must be consistent at this point
    * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
@@ -967,11 +961,10 @@
   }
 }
 
-static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                            grpc_connected_subchannel **target,
-                            grpc_error *error) {
+static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                   grpc_connected_subchannel **target,
+                                   grpc_error *error) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   pending_pick *pp = glb_policy->pending_picks;
   glb_policy->pending_picks = NULL;
   while (pp != NULL) {
@@ -987,16 +980,15 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&glb_policy->mu);
   GRPC_ERROR_UNREF(error);
 }
 
-static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                             uint32_t initial_metadata_flags_mask,
-                             uint32_t initial_metadata_flags_eq,
-                             grpc_error *error) {
+static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+                                    grpc_lb_policy *pol,
+                                    uint32_t initial_metadata_flags_mask,
+                                    uint32_t initial_metadata_flags_eq,
+                                    grpc_error *error) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   pending_pick *pp = glb_policy->pending_picks;
   glb_policy->pending_picks = NULL;
   while (pp != NULL) {
@@ -1012,7 +1004,6 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&glb_policy->mu);
   GRPC_ERROR_UNREF(error);
 }
 
@@ -1025,19 +1016,17 @@
   query_for_backends_locked(exec_ctx, glb_policy);
 }
 
-static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   if (!glb_policy->started_picking) {
     start_picking_locked(exec_ctx, glb_policy);
   }
-  gpr_mu_unlock(&glb_policy->mu);
 }
 
-static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                    const grpc_lb_policy_pick_args *pick_args,
-                    grpc_connected_subchannel **target, void **user_data,
-                    grpc_closure *on_complete) {
+static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                           const grpc_lb_policy_pick_args *pick_args,
+                           grpc_connected_subchannel **target, void **user_data,
+                           grpc_closure *on_complete) {
   if (pick_args->lb_token_mdelem_storage == NULL) {
     *target = NULL;
     grpc_closure_sched(
@@ -1048,7 +1037,6 @@
   }
 
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   glb_policy->deadline = pick_args->deadline;
   bool pick_done;
 
@@ -1087,53 +1075,43 @@
     }
     pick_done = false;
   }
-  gpr_mu_unlock(&glb_policy->mu);
   return pick_done;
 }
 
-static grpc_connectivity_state glb_check_connectivity(
+static grpc_connectivity_state glb_check_connectivity_locked(
     grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     grpc_error **connectivity_error) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  grpc_connectivity_state st;
-  gpr_mu_lock(&glb_policy->mu);
-  st = grpc_connectivity_state_get(&glb_policy->state_tracker,
-                                   connectivity_error);
-  gpr_mu_unlock(&glb_policy->mu);
-  return st;
+  return grpc_connectivity_state_get(&glb_policy->state_tracker,
+                                     connectivity_error);
 }
 
-static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                         grpc_closure *closure) {
+static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                grpc_closure *closure) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   if (glb_policy->rr_policy) {
-    grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
+    grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
   } else {
     add_pending_ping(&glb_policy->pending_pings, closure);
     if (!glb_policy->started_picking) {
       start_picking_locked(exec_ctx, glb_policy);
     }
   }
-  gpr_mu_unlock(&glb_policy->mu);
 }
 
-static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
-                                       grpc_lb_policy *pol,
-                                       grpc_connectivity_state *current,
-                                       grpc_closure *notify) {
+static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+                                              grpc_lb_policy *pol,
+                                              grpc_connectivity_state *current,
+                                              grpc_closure *notify) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-  gpr_mu_lock(&glb_policy->mu);
   grpc_connectivity_state_notify_on_state_change(
       exec_ctx, &glb_policy->state_tracker, current, notify);
-
-  gpr_mu_unlock(&glb_policy->mu);
 }
 
-static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
-                                         grpc_error *error);
-static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
-                                    grpc_error *error);
+static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
+                                                void *arg, grpc_error *error);
+static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error);
 static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
                                 glb_lb_policy *glb_policy) {
   GPR_ASSERT(glb_policy->server_name != NULL);
@@ -1162,11 +1140,11 @@
   grpc_grpclb_request_destroy(request);
 
   grpc_closure_init(&glb_policy->lb_on_server_status_received,
-                    lb_on_server_status_received, glb_policy,
-                    grpc_schedule_on_exec_ctx);
+                    lb_on_server_status_received_locked, glb_policy,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
   grpc_closure_init(&glb_policy->lb_on_response_received,
-                    lb_on_response_received, glb_policy,
-                    grpc_schedule_on_exec_ctx);
+                    lb_on_response_received_locked, glb_policy,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
 
   gpr_backoff_init(&glb_policy->lb_call_backoff_state,
                    GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1261,14 +1239,13 @@
   GPR_ASSERT(GRPC_CALL_OK == call_error);
 }
 
-static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
-                                    grpc_error *error) {
+static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error) {
   glb_lb_policy *glb_policy = arg;
 
   grpc_op ops[2];
   memset(ops, 0, sizeof(ops));
   grpc_op *op = ops;
-  gpr_mu_lock(&glb_policy->mu);
   if (glb_policy->lb_response_payload != NULL) {
     gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
     /* Received data from the LB server. Look inside
@@ -1342,20 +1319,17 @@
           &glb_policy->lb_on_response_received); /* loop */
       GPR_ASSERT(GRPC_CALL_OK == call_error);
     }
-    gpr_mu_unlock(&glb_policy->mu);
   } else { /* empty payload: call cancelled. */
            /* dispose of the "lb_on_response_received" weak ref taken in
             * query_for_backends_locked() and reused in every reception loop */
-    gpr_mu_unlock(&glb_policy->mu);
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
                               "lb_on_response_received_empty_payload");
   }
 }
 
-static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
-                                   grpc_error *error) {
+static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                          grpc_error *error) {
   glb_lb_policy *glb_policy = arg;
-  gpr_mu_lock(&glb_policy->mu);
 
   if (!glb_policy->shutting_down) {
     if (grpc_lb_glb_trace) {
@@ -1365,15 +1339,13 @@
     GPR_ASSERT(glb_policy->lb_call == NULL);
     query_for_backends_locked(exec_ctx, glb_policy);
   }
-  gpr_mu_unlock(&glb_policy->mu);
   GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
                             "grpclb_on_retry_timer");
 }
 
-static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
-                                         grpc_error *error) {
+static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
+                                                void *arg, grpc_error *error) {
   glb_lb_policy *glb_policy = arg;
-  gpr_mu_lock(&glb_policy->mu);
 
   GPR_ASSERT(glb_policy->lb_call != NULL);
 
@@ -1408,21 +1380,27 @@
       }
     }
     GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
-    grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer,
-                      glb_policy, grpc_schedule_on_exec_ctx);
+    grpc_closure_init(
+        &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
+        glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
     grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
                     &glb_policy->lb_on_call_retry, now);
   }
-  gpr_mu_unlock(&glb_policy->mu);
   GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
                             "lb_on_server_status_received");
 }
 
 /* Code wiring the policy with the rest of the core */
 static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
-    glb_destroy,     glb_shutdown,           glb_pick,
-    glb_cancel_pick, glb_cancel_picks,       glb_ping_one,
-    glb_exit_idle,   glb_check_connectivity, glb_notify_on_state_change};
+    glb_destroy,
+    glb_shutdown_locked,
+    glb_pick_locked,
+    glb_cancel_pick_locked,
+    glb_cancel_picks_locked,
+    glb_ping_one_locked,
+    glb_exit_idle_locked,
+    glb_check_connectivity_locked,
+    glb_notify_on_state_change_locked};
 
 static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
 
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 1b96518..501cb6d 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -38,6 +38,7 @@
 #include "src/core/ext/client_channel/lb_policy_registry.h"
 #include "src/core/ext/client_channel/subchannel.h"
 #include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/transport/connectivity_state.h"
 
@@ -57,11 +58,11 @@
 
   grpc_closure connectivity_changed;
 
-  /** the selected channel (a grpc_connected_subchannel) */
-  gpr_atm selected;
+  /** remaining members are protected by the combiner */
 
-  /** mutex protecting remaining members */
-  gpr_mu mu;
+  /** the selected channel */
+  grpc_connected_subchannel *selected;
+
   /** have we started picking? */
   int started_picking;
   /** are we shut down? */
@@ -77,32 +78,24 @@
   grpc_connectivity_state_tracker state_tracker;
 } pick_first_lb_policy;
 
-#define GET_SELECTED(p) \
-  ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
-
 static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_connected_subchannel *selected = GET_SELECTED(p);
   size_t i;
   GPR_ASSERT(p->pending_picks == NULL);
   for (i = 0; i < p->num_subchannels; i++) {
     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
   }
-  if (selected != NULL) {
-    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
+  if (p->selected != NULL) {
+    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
   }
   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
   gpr_free(p->subchannels);
-  gpr_mu_destroy(&p->mu);
   gpr_free(p);
 }
 
-static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
-  grpc_connected_subchannel *selected;
-  gpr_mu_lock(&p->mu);
-  selected = GET_SELECTED(p);
   p->shutdown = 1;
   pp = p->pending_picks;
   p->pending_picks = NULL;
@@ -110,15 +103,14 @@
       exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
       GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
   /* cancel subscription */
-  if (selected != NULL) {
+  if (p->selected != NULL) {
     grpc_connected_subchannel_notify_on_state_change(
-        exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
+        exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
   } else if (p->num_subchannels > 0) {
     grpc_subchannel_notify_on_state_change(
         exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
         &p->connectivity_changed);
   }
-  gpr_mu_unlock(&p->mu);
   while (pp != NULL) {
     pending_pick *next = pp->next;
     *pp->target = NULL;
@@ -128,12 +120,11 @@
   }
 }
 
-static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                           grpc_connected_subchannel **target,
-                           grpc_error *error) {
+static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                  grpc_connected_subchannel **target,
+                                  grpc_error *error) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
-  gpr_mu_lock(&p->mu);
   pp = p->pending_picks;
   p->pending_picks = NULL;
   while (pp != NULL) {
@@ -150,17 +141,15 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&p->mu);
   GRPC_ERROR_UNREF(error);
 }
 
-static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                            uint32_t initial_metadata_flags_mask,
-                            uint32_t initial_metadata_flags_eq,
-                            grpc_error *error) {
+static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                   uint32_t initial_metadata_flags_mask,
+                                   uint32_t initial_metadata_flags_eq,
+                                   grpc_error *error) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
-  gpr_mu_lock(&p->mu);
   pp = p->pending_picks;
   p->pending_picks = NULL;
   while (pp != NULL) {
@@ -177,7 +166,6 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&p->mu);
   GRPC_ERROR_UNREF(error);
 }
 
@@ -192,63 +180,48 @@
       &p->connectivity_changed);
 }
 
-static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  gpr_mu_lock(&p->mu);
   if (!p->started_picking) {
     start_picking(exec_ctx, p);
   }
-  gpr_mu_unlock(&p->mu);
 }
 
-static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                   const grpc_lb_policy_pick_args *pick_args,
-                   grpc_connected_subchannel **target, void **user_data,
-                   grpc_closure *on_complete) {
+static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                          const grpc_lb_policy_pick_args *pick_args,
+                          grpc_connected_subchannel **target, void **user_data,
+                          grpc_closure *on_complete) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
 
   /* Check atomically for a selected channel */
-  grpc_connected_subchannel *selected = GET_SELECTED(p);
-  if (selected != NULL) {
-    *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+  if (p->selected != NULL) {
+    *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
     return 1;
   }
 
-  /* No subchannel selected yet, so acquire lock and then attempt again */
-  gpr_mu_lock(&p->mu);
-  selected = GET_SELECTED(p);
-  if (selected) {
-    gpr_mu_unlock(&p->mu);
-    *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
-    return 1;
-  } else {
-    if (!p->started_picking) {
-      start_picking(exec_ctx, p);
-    }
-    pp = gpr_malloc(sizeof(*pp));
-    pp->next = p->pending_picks;
-    pp->target = target;
-    pp->initial_metadata_flags = pick_args->initial_metadata_flags;
-    pp->on_complete = on_complete;
-    p->pending_picks = pp;
-    gpr_mu_unlock(&p->mu);
-    return 0;
+  /* No subchannel selected yet, so try again */
+  if (!p->started_picking) {
+    start_picking(exec_ctx, p);
   }
+  pp = gpr_malloc(sizeof(*pp));
+  pp->next = p->pending_picks;
+  pp->target = target;
+  pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+  pp->on_complete = on_complete;
+  p->pending_picks = pp;
+  return 0;
 }
 
-static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
-                                grpc_error *error) {
-  pick_first_lb_policy *p = arg;
+static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
+                                       pick_first_lb_policy *p) {
   size_t i;
   size_t num_subchannels = p->num_subchannels;
   grpc_subchannel **subchannels;
 
-  gpr_mu_lock(&p->mu);
   subchannels = p->subchannels;
   p->num_subchannels = 0;
   p->subchannels = NULL;
-  gpr_mu_unlock(&p->mu);
   GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
 
   for (i = 0; i < num_subchannels; i++) {
@@ -258,25 +231,19 @@
   gpr_free(subchannels);
 }
 
-static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
-                                    grpc_error *error) {
+static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error) {
   pick_first_lb_policy *p = arg;
   grpc_subchannel *selected_subchannel;
   pending_pick *pp;
-  grpc_connected_subchannel *selected;
 
   GRPC_ERROR_REF(error);
 
-  gpr_mu_lock(&p->mu);
-
-  selected = GET_SELECTED(p);
-
   if (p->shutdown) {
-    gpr_mu_unlock(&p->mu);
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     GRPC_ERROR_UNREF(error);
     return;
-  } else if (selected != NULL) {
+  } else if (p->selected != NULL) {
     if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
       /* if the selected channel goes bad, we're done */
       p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
@@ -286,7 +253,7 @@
                                 "selected_changed");
     if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
       grpc_connected_subchannel_notify_on_state_change(
-          exec_ctx, selected, p->base.interested_parties,
+          exec_ctx, p->selected, p->base.interested_parties,
           &p->checking_connectivity, &p->connectivity_changed);
     } else {
       GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -301,26 +268,21 @@
                                     GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
                                     "connecting_ready");
         selected_subchannel = p->subchannels[p->checking_subchannel];
-        selected =
-            grpc_subchannel_get_connected_subchannel(selected_subchannel);
-        GPR_ASSERT(selected != NULL);
-        GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
+        p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
+            grpc_subchannel_get_connected_subchannel(selected_subchannel),
+            "picked_first");
         /* drop the pick list: we are connected now */
         GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
-        gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
-        grpc_closure_sched(exec_ctx,
-                           grpc_closure_create(destroy_subchannels, p,
-                                               grpc_schedule_on_exec_ctx),
-                           GRPC_ERROR_NONE);
+        destroy_subchannels_locked(exec_ctx, p);
         /* update any calls that were waiting for a pick */
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
-          *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+          *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
           grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
           gpr_free(pp);
         }
         grpc_connected_subchannel_notify_on_state_change(
-            exec_ctx, selected, p->base.interested_parties,
+            exec_ctx, p->selected, p->base.interested_parties,
             &p->checking_connectivity, &p->connectivity_changed);
         break;
       case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -387,48 +349,44 @@
     }
   }
 
-  gpr_mu_unlock(&p->mu);
-
   GRPC_ERROR_UNREF(error);
 }
 
-static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
-                                                     grpc_lb_policy *pol,
-                                                     grpc_error **error) {
+static grpc_connectivity_state pf_check_connectivity_locked(
+    grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_connectivity_state st;
-  gpr_mu_lock(&p->mu);
-  st = grpc_connectivity_state_get(&p->state_tracker, error);
-  gpr_mu_unlock(&p->mu);
-  return st;
+  return grpc_connectivity_state_get(&p->state_tracker, error);
 }
 
-static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx,
-                                      grpc_lb_policy *pol,
-                                      grpc_connectivity_state *current,
-                                      grpc_closure *notify) {
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+                                             grpc_lb_policy *pol,
+                                             grpc_connectivity_state *current,
+                                             grpc_closure *notify) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  gpr_mu_lock(&p->mu);
   grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
                                                  current, notify);
-  gpr_mu_unlock(&p->mu);
 }
 
-static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                        grpc_closure *closure) {
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                               grpc_closure *closure) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_connected_subchannel *selected = GET_SELECTED(p);
-  if (selected) {
-    grpc_connected_subchannel_ping(exec_ctx, selected, closure);
+  if (p->selected) {
+    grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
   } else {
     grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
   }
 }
 
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
-    pf_destroy,     pf_shutdown,           pf_pick,
-    pf_cancel_pick, pf_cancel_picks,       pf_ping_one,
-    pf_exit_idle,   pf_check_connectivity, pf_notify_on_state_change};
+    pf_destroy,
+    pf_shutdown_locked,
+    pf_pick_locked,
+    pf_cancel_pick_locked,
+    pf_cancel_picks_locked,
+    pf_ping_one_locked,
+    pf_exit_idle_locked,
+    pf_check_connectivity_locked,
+    pf_notify_on_state_change_locked};
 
 static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
 
@@ -489,10 +447,9 @@
   }
   p->num_subchannels = subchannel_idx;
 
-  grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
-  grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
-                    grpc_schedule_on_exec_ctx);
-  gpr_mu_init(&p->mu);
+  grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+  grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
+                    grpc_combiner_scheduler(args->combiner, false));
   return &p->base;
 }
 
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 63e3d03..687df17 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -67,6 +67,7 @@
 #include "src/core/ext/client_channel/subchannel.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/transport/connectivity_state.h"
 #include "src/core/lib/transport/static_metadata.h"
@@ -134,7 +135,6 @@
 struct round_robin_lb_policy {
   /** base policy: must be first */
   grpc_lb_policy base;
-  gpr_mu mu;
 
   /** total number of addresses received at creation time */
   size_t num_addresses;
@@ -293,7 +293,6 @@
 
   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
   gpr_free(p->subchannels);
-  gpr_mu_destroy(&p->mu);
 
   elem = p->ready_list.next;
   while (elem != NULL && elem != &p->ready_list) {
@@ -309,12 +308,11 @@
   gpr_free(p);
 }
 
-static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
   pending_pick *pp;
   size_t i;
 
-  gpr_mu_lock(&p->mu);
   if (grpc_lb_round_robin_trace) {
     gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
   }
@@ -335,15 +333,13 @@
     grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
                                            &sd->connectivity_changed_closure);
   }
-  gpr_mu_unlock(&p->mu);
 }
 
-static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                           grpc_connected_subchannel **target,
-                           grpc_error *error) {
+static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                  grpc_connected_subchannel **target,
+                                  grpc_error *error) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
   pending_pick *pp;
-  gpr_mu_lock(&p->mu);
   pp = p->pending_picks;
   p->pending_picks = NULL;
   while (pp != NULL) {
@@ -360,17 +356,15 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&p->mu);
   GRPC_ERROR_UNREF(error);
 }
 
-static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                            uint32_t initial_metadata_flags_mask,
-                            uint32_t initial_metadata_flags_eq,
-                            grpc_error *error) {
+static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                                   uint32_t initial_metadata_flags_mask,
+                                   uint32_t initial_metadata_flags_eq,
+                                   grpc_error *error) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
   pending_pick *pp;
-  gpr_mu_lock(&p->mu);
   pp = p->pending_picks;
   p->pending_picks = NULL;
   while (pp != NULL) {
@@ -388,11 +382,11 @@
     }
     pp = next;
   }
-  gpr_mu_unlock(&p->mu);
   GRPC_ERROR_UNREF(error);
 }
 
-static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
+static void start_picking_locked(grpc_exec_ctx *exec_ctx,
+                                 round_robin_lb_policy *p) {
   size_t i;
   p->started_picking = 1;
 
@@ -411,23 +405,20 @@
   }
 }
 
-static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
-  gpr_mu_lock(&p->mu);
   if (!p->started_picking) {
-    start_picking(exec_ctx, p);
+    start_picking_locked(exec_ctx, p);
   }
-  gpr_mu_unlock(&p->mu);
 }
 
-static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                   const grpc_lb_policy_pick_args *pick_args,
-                   grpc_connected_subchannel **target, void **user_data,
-                   grpc_closure *on_complete) {
+static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                          const grpc_lb_policy_pick_args *pick_args,
+                          grpc_connected_subchannel **target, void **user_data,
+                          grpc_closure *on_complete) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
   pending_pick *pp;
   ready_list *selected;
-  gpr_mu_lock(&p->mu);
 
   if (grpc_lb_round_robin_trace) {
     gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
@@ -449,12 +440,11 @@
     }
     /* only advance the last picked pointer if the selection was used */
     advance_last_picked_locked(p);
-    gpr_mu_unlock(&p->mu);
     return 1;
   } else {
     /* no pick currently available. Save for later in list of pending picks */
     if (!p->started_picking) {
-      start_picking(exec_ctx, p);
+      start_picking_locked(exec_ctx, p);
     }
     pp = gpr_malloc(sizeof(*pp));
     pp->next = p->pending_picks;
@@ -463,7 +453,6 @@
     pp->initial_metadata_flags = pick_args->initial_metadata_flags;
     pp->user_data = user_data;
     p->pending_picks = pp;
-    gpr_mu_unlock(&p->mu);
     return 0;
   }
 }
@@ -538,17 +527,15 @@
   return sd->curr_connectivity_state;
 }
 
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
-                                    grpc_error *error) {
+static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error) {
   subchannel_data *sd = arg;
   round_robin_lb_policy *p = sd->policy;
   pending_pick *pp;
 
   GRPC_ERROR_REF(error);
-  gpr_mu_lock(&p->mu);
 
   if (p->shutdown) {
-    gpr_mu_unlock(&p->mu);
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
     GRPC_ERROR_UNREF(error);
     return;
@@ -645,56 +632,51 @@
       GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
       break;
   }
-  gpr_mu_unlock(&p->mu);
   GRPC_ERROR_UNREF(error);
 }
 
-static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
-                                                     grpc_lb_policy *pol,
-                                                     grpc_error **error) {
+static grpc_connectivity_state rr_check_connectivity_locked(
+    grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
-  grpc_connectivity_state st;
-  gpr_mu_lock(&p->mu);
-  st = grpc_connectivity_state_get(&p->state_tracker, error);
-  gpr_mu_unlock(&p->mu);
-  return st;
+  return grpc_connectivity_state_get(&p->state_tracker, error);
 }
 
-static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
-                                      grpc_lb_policy *pol,
-                                      grpc_connectivity_state *current,
-                                      grpc_closure *notify) {
+static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+                                             grpc_lb_policy *pol,
+                                             grpc_connectivity_state *current,
+                                             grpc_closure *notify) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
-  gpr_mu_lock(&p->mu);
   grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
                                                  current, notify);
-  gpr_mu_unlock(&p->mu);
 }
 
-static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                        grpc_closure *closure) {
+static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+                               grpc_closure *closure) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
   ready_list *selected;
   grpc_connected_subchannel *target;
-  gpr_mu_lock(&p->mu);
   if ((selected = peek_next_connected_locked(p))) {
-    gpr_mu_unlock(&p->mu);
     target = GRPC_CONNECTED_SUBCHANNEL_REF(
         grpc_subchannel_get_connected_subchannel(selected->subchannel),
         "rr_picked");
     grpc_connected_subchannel_ping(exec_ctx, target, closure);
     GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
   } else {
-    gpr_mu_unlock(&p->mu);
     grpc_closure_sched(exec_ctx, closure,
                        GRPC_ERROR_CREATE("Round Robin not connected"));
   }
 }
 
 static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
-    rr_destroy,     rr_shutdown,           rr_pick,
-    rr_cancel_pick, rr_cancel_picks,       rr_ping_one,
-    rr_exit_idle,   rr_check_connectivity, rr_notify_on_state_change};
+    rr_destroy,
+    rr_shutdown_locked,
+    rr_pick_locked,
+    rr_cancel_pick_locked,
+    rr_cancel_picks_locked,
+    rr_ping_one_locked,
+    rr_exit_idle_locked,
+    rr_check_connectivity_locked,
+    rr_notify_on_state_change_locked};
 
 static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
 
@@ -762,7 +744,8 @@
       }
       ++subchannel_idx;
       grpc_closure_init(&sd->connectivity_changed_closure,
-                        rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx);
+                        rr_connectivity_changed_locked, sd,
+                        grpc_combiner_scheduler(args->combiner, false));
     }
   }
   if (subchannel_idx == 0) {
@@ -779,7 +762,7 @@
   p->ready_list.next = NULL;
   p->ready_list_last_pick = &p->ready_list;
 
-  grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
+  grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
   grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
                                "round_robin");
 
@@ -787,7 +770,6 @@
     gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
             (void *)p, (unsigned long)p->num_subchannels);
   }
-  gpr_mu_init(&p->mu);
   return &p->base;
 }