Make pick_first fast path lock free, take channel lock for less time
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index b91f060..beacffc 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -57,10 +57,8 @@
 
   /** mutex protecting remaining members */
   gpr_mu mu;
-  /** the selected channel
-      TODO(ctiller): this should be atomically set so we don't
-                     need to take a mutex in the common case */
-  grpc_connected_subchannel *selected;
+  /** the selected channel (a grpc_connected_subchannel) */
+  gpr_atm selected;
   /** have we started picking? */
   int started_picking;
   /** are we shut down? */
@@ -76,15 +74,18 @@
   grpc_connectivity_state_tracker state_tracker;
 } pick_first_lb_policy;
 
+#define GET_SELECTED(p) ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+
 void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+  grpc_connected_subchannel *selected = GET_SELECTED(p);
   size_t i;
   GPR_ASSERT(p->pending_picks == NULL);
   for (i = 0; i < p->num_subchannels; i++) {
     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
   }
-  if (p->selected) {
-    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+  if (selected != NULL) {
+    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
   }
   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
   gpr_free(p->subchannels);
@@ -95,16 +96,18 @@
 void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
+  grpc_connected_subchannel *selected;
   gpr_mu_lock(&p->mu);
+  selected = GET_SELECTED(p);
   p->shutdown = 1;
   pp = p->pending_picks;
   p->pending_picks = NULL;
   grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                               GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
   /* cancel subscription */
-  if (p->selected != NULL) {
+  if (selected != NULL) {
     grpc_connected_subchannel_notify_on_state_change(
-        exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+        exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
   } else {
     grpc_subchannel_notify_on_state_change(
         exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
@@ -171,10 +174,16 @@
             grpc_connected_subchannel **target, grpc_closure *on_complete) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
+  grpc_connected_subchannel *selected = GET_SELECTED(p);
+  if (selected != NULL) {
+    *target = selected;
+    return 1;
+  }
   gpr_mu_lock(&p->mu);
-  if (p->selected) {
+  selected = GET_SELECTED(p);
+  if (selected) {
     gpr_mu_unlock(&p->mu);
-    *target = p->selected;
+    *target = selected;
     return 1;
   } else {
     if (!p->started_picking) {
@@ -219,14 +228,17 @@
   pick_first_lb_policy *p = arg;
   grpc_subchannel *selected_subchannel;
   pending_pick *pp;
+  grpc_connected_subchannel *selected;
 
   gpr_mu_lock(&p->mu);
 
+  selected = GET_SELECTED(p);
+
   if (p->shutdown) {
     gpr_mu_unlock(&p->mu);
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     return;
-  } else if (p->selected != NULL) {
+  } else if (selected != NULL) {
     if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
       /* if the selected channel goes bad, we're done */
       p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
@@ -235,7 +247,7 @@
                                 p->checking_connectivity, "selected_changed");
     if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
       grpc_connected_subchannel_notify_on_state_change(
-          exec_ctx, p->selected, &p->base.interested_parties,
+          exec_ctx, selected, &p->base.interested_parties,
           &p->checking_connectivity, &p->connectivity_changed);
     } else {
       GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -247,10 +259,10 @@
         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                                     GRPC_CHANNEL_READY, "connecting_ready");
         selected_subchannel = p->subchannels[p->checking_subchannel];
-        p->selected =
-            grpc_subchannel_get_connected_subchannel(selected_subchannel);
-        GPR_ASSERT(p->selected);
-        GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
+        selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
+        GPR_ASSERT(selected != NULL);
+        gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
+        GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
         /* drop the pick list: we are connected now */
         GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
         grpc_exec_ctx_enqueue(exec_ctx,
@@ -258,14 +270,14 @@
         /* update any calls that were waiting for a pick */
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
-          *pp->target = p->selected;
+          *pp->target = selected;
           grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
                                        pp->pollset);
           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
           gpr_free(pp);
         }
         grpc_connected_subchannel_notify_on_state_change(
-            exec_ctx, p->selected, &p->base.interested_parties,
+            exec_ctx, selected, &p->base.interested_parties,
             &p->checking_connectivity, &p->connectivity_changed);
         break;
       case GRPC_CHANNEL_TRANSIENT_FAILURE: