Clean up refcounting.
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 24a0c83..0883a4e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -90,9 +90,6 @@
                             const grpc_channel_args& args)
         : SubchannelList(policy, tracer, addresses, combiner,
                          client_channel_factory, args) {}
-
-    void RefForConnectivityWatch(const char* reason);
-    void UnrefForConnectivityWatch(const char* reason);
   };
 
   void ShutdownLocked() override;
@@ -220,9 +217,7 @@
   if (subchannel_list_ != nullptr) {
     for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
       if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
-        subchannel_list_->RefForConnectivityWatch(
-            "connectivity_watch+start_picking");
-        subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
+        subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked();
         break;
       }
     }
@@ -332,8 +327,7 @@
     // If we've started picking, start trying to connect to the first
     // subchannel in the new list.
     if (started_picking_) {
-      subchannel_list_->RefForConnectivityWatch("connectivity_watch+update");
-      subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
+      subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked();
     }
   } else {
     // We do have a selected subchannel.
@@ -359,9 +353,7 @@
         }
         subchannel_list_ = std::move(subchannel_list);
         DestroyUnselectedSubchannelsLocked();
-        subchannel_list_->RefForConnectivityWatch(
-            "connectivity_watch+replace_selected");
-        sd->StartConnectivityWatchLocked();
+        sd->StartOrRenewConnectivityWatchLocked();
         // If there was a previously pending update (which may or may
         // not have contained the currently selected subchannel), drop
         // it, so that it doesn't override what we've done here.
@@ -391,35 +383,12 @@
     // If we've started picking, start trying to connect to the first
     // subchannel in the new list.
     if (started_picking_) {
-      latest_pending_subchannel_list_->RefForConnectivityWatch(
-          "connectivity_watch+update");
       latest_pending_subchannel_list_->subchannel(0)
-          ->StartConnectivityWatchLocked();
+          ->StartOrRenewConnectivityWatchLocked();
     }
   }
 }
 
-void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch(
-    const char* reason) {
-  // TODO(roth): We currently track these refs manually.  Once the new
-  // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
-  // along with the closures instead of doing this manually.
-  // Ref subchannel list.
-  Ref(DEBUG_LOCATION, reason).release();
-  // Ref LB policy.
-  PickFirst* p = static_cast<PickFirst*>(policy());
-  p->Ref(DEBUG_LOCATION, reason).release();
-}
-
-void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch(
-    const char* reason) {
-  // Unref LB policy.
-  PickFirst* p = static_cast<PickFirst*>(policy());
-  p->Unref(DEBUG_LOCATION, reason);
-  // Unref subchannel list.
-  Unref(DEBUG_LOCATION, reason);
-}
-
 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
     grpc_error* error) {
   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
@@ -434,17 +403,8 @@
             grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
             subchannel_list()->shutting_down(), grpc_error_string(error));
   }
-// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
-  // If the subchannel list is shutting down, stop watching.
-  if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
-    StopConnectivityWatchLocked();
-    UnrefSubchannelLocked("pf_sl_shutdown");
-    subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown");
-    GRPC_ERROR_UNREF(error);
-    return;
-  }
-  // If we're still here, the notification must be for a subchannel in
-  // either the current or latest pending subchannel lists.
+  // The notification must be for a subchannel in either the current or
+  // latest pending subchannel lists.
   GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
              p->latest_pending_subchannel_list_ == subchannel_list());
   // Handle updates for the currently selected subchannel.
@@ -455,8 +415,6 @@
         p->latest_pending_subchannel_list_ != nullptr) {
       p->selected_ = nullptr;
       StopConnectivityWatchLocked();
-      subchannel_list()->UnrefForConnectivityWatch(
-          "selected_not_ready+switch_to_update");
       subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update");
       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
       grpc_connectivity_state_set(
@@ -478,14 +436,13 @@
         p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
         // In transient failure. Rely on re-resolution to recover.
         p->selected_ = nullptr;
-        StopConnectivityWatchLocked();
-        subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown");
         UnrefSubchannelLocked("pf_selected_shutdown");
+        StopConnectivityWatchLocked();
       } else {
         grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(),
                                     GRPC_ERROR_REF(error), "selected_changed");
         // Renew notification.
-        StartConnectivityWatchLocked();
+        StartOrRenewConnectivityWatchLocked();
       }
     }
     GRPC_ERROR_UNREF(error);
@@ -533,7 +490,7 @@
         GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
       }
       // Renew notification.
-      StartConnectivityWatchLocked();
+      StartOrRenewConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
@@ -551,8 +508,7 @@
             &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
             GRPC_ERROR_REF(error), "connecting_transient_failure");
       }
-      // Reuses the connectivity refs from the previous watch.
-      sd->StartConnectivityWatchLocked();
+      sd->StartOrRenewConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_CONNECTING:
@@ -564,7 +520,7 @@
                                     "connecting_changed");
       }
       // Renew notification.
-      StartConnectivityWatchLocked();
+      StartOrRenewConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_SHUTDOWN:
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 889616e..2b2c0e5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -134,10 +134,6 @@
       GRPC_ERROR_UNREF(last_transient_failure_error_);
     }
 
-    // Manages references for connectivity watches.
-    void RefForConnectivityWatch(const char* reason);
-    void UnrefForConnectivityWatch(const char* reason);
-
     // Starts watching the subchannels in this list.
     void StartWatchingLocked();
 
@@ -377,6 +373,7 @@
     /* readily available, report right away */
     RoundRobinSubchannelData* sd =
         subchannel_list_->subchannel(next_ready_index);
+    GPR_ASSERT(sd->connected_subchannel() != nullptr);
     pick->connected_subchannel = sd->connected_subchannel()->Ref();
     if (pick->user_data != nullptr) {
       *pick->user_data = sd->user_data();
@@ -422,27 +419,6 @@
   return false;
 }
 
-void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch(
-    const char* reason) {
-  // TODO(roth): We currently track these refs manually.  Once the new
-  // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
-  // along with the closures instead of doing this manually.
-  // Ref subchannel list.
-  Ref(DEBUG_LOCATION, reason).release();
-  // Ref LB policy.
-  RoundRobin* p = static_cast<RoundRobin*>(policy());
-  p->Ref(DEBUG_LOCATION, reason).release();
-}
-
-void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch(
-    const char* reason) {
-  // Unref LB policy.
-  RoundRobin* p = static_cast<RoundRobin*>(policy());
-  p->Unref(DEBUG_LOCATION, reason);
-  // Unref subchannel list.
-  Unref(DEBUG_LOCATION, reason);
-}
-
 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
   if (num_subchannels() == 0) return;
   // Check current state of each subchannel synchronously, since any
@@ -474,8 +450,7 @@
   // Start connectivity watch for each subchannel.
   for (size_t i = 0; i < num_subchannels(); i++) {
     if (subchannel(i)->subchannel() != nullptr) {
-      RefForConnectivityWatch("connectivity_watch");
-      subchannel(i)->StartConnectivityWatchLocked();
+      subchannel(i)->StartOrRenewConnectivityWatchLocked();
     }
   }
 }
@@ -597,15 +572,6 @@
         subchannel_list()->shutting_down(), grpc_error_string(error));
   }
   GPR_ASSERT(subchannel() != nullptr);
-// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
-  // If the subchannel list is shutting down, stop watching.
-  if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
-    StopConnectivityWatchLocked();
-    UnrefSubchannelLocked("rr_sl_shutdown");
-    subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown");
-    GRPC_ERROR_UNREF(error);
-    return;
-  }
   // If the new state is TRANSIENT_FAILURE, re-resolve.
   // Only do this if we've started watching, not at startup time.
   // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
@@ -628,7 +594,7 @@
   // If we've started watching, update overall state and renew notification.
   if (subchannel_list()->started_watching()) {
     subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
-    StartConnectivityWatchLocked();
+    StartOrRenewConnectivityWatchLocked();
   }
   GRPC_ERROR_UNREF(error);
 }
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index b88719b..8b843b1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -109,8 +109,8 @@
   // Synchronously checks the subchannel's connectivity state.  Calls
   // ProcessConnectivityChangeLocked() if the state has changed.
   // Must not be called while there is a connectivity notification
-  // pending (i.e., between calling StartConnectivityWatchLocked() and
-  // the resulting invocation of ProcessConnectivityChangeLocked()).
+  // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
+  // and the resulting invocation of ProcessConnectivityChangeLocked()).
   void CheckConnectivityStateLocked() {
     GPR_ASSERT(!connectivity_notification_pending_);
     grpc_error* error = GRPC_ERROR_NONE;
@@ -133,15 +133,15 @@
   // Starts or renewes watching the connectivity state of the subchannel.
   // ProcessConnectivityChangeLocked() will be called when the
   // connectivity state changes.
-  void StartConnectivityWatchLocked();
+  void StartOrRenewConnectivityWatchLocked();
 
   // Stops watching the connectivity state of the subchannel.
   void StopConnectivityWatchLocked();
 
   // Cancels watching the connectivity state of the subchannel.
   // Must be called only while there is a connectivity notification
-  // pending (i.e., between calling StartConnectivityWatchLocked() and
-  // the resulting invocation of ProcessConnectivityChangeLocked()).
+  // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
+  // and the resulting invocation of ProcessConnectivityChangeLocked()).
   // From within ProcessConnectivityChangeLocked(), use
   // StopConnectivityWatchLocked() instead.
   void CancelConnectivityWatchLocked(const char* reason);
@@ -159,8 +159,8 @@
 
   virtual ~SubchannelData();
 
-  // After StartConnectivityWatchLocked() is called, this method will be
-  // invoked when the subchannel's connectivity state changes.
+  // After StartOrRenewConnectivityWatchLocked() is called, this method
+  // will be invoked when the subchannel's connectivity state changes.
   // Implementations can use connectivity_state() to get the new
   // connectivity state.
   // Implementations must invoke either StopConnectivityWatch() or again
@@ -302,7 +302,7 @@
 
 template <typename SubchannelListType, typename SubchannelDataType>
 void SubchannelData<SubchannelListType,
-                    SubchannelDataType>::StartConnectivityWatchLocked() {
+                    SubchannelDataType>::StartOrRenewConnectivityWatchLocked() {
   if (subchannel_list_->tracer()->enabled()) {
     gpr_log(GPR_DEBUG,
             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@@ -313,7 +313,10 @@
             subchannel_,
             grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
   }
-  connectivity_notification_pending_ = true;
+  if (!connectivity_notification_pending_) {
+    subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
+    connectivity_notification_pending_ = true;
+  }
   grpc_subchannel_notify_on_state_change(
       subchannel_, subchannel_list_->policy()->interested_parties(),
       &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@@ -332,6 +335,7 @@
   }
   GPR_ASSERT(connectivity_notification_pending_);
   connectivity_notification_pending_ = false;
+  subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
 }
 
 template <typename SubchannelListType, typename SubchannelDataType>
@@ -387,9 +391,14 @@
     OnConnectivityChangedLocked(void* arg, grpc_error* error) {
   SubchannelData* sd = static_cast<SubchannelData*>(arg);
 // FIXME: add trace logging
+  if (sd->subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+    sd->UnrefSubchannelLocked("connectivity_shutdown");
+    sd->StopConnectivityWatchLocked();
+    return;
+  }
   if (!sd->UpdateConnectedSubchannelLocked()) {
     // We don't want to report this connectivity state, so renew the watch.
-    sd->StartConnectivityWatchLocked();
+    sd->StartOrRenewConnectivityWatchLocked();
     return;
   }
   // Now that we're inside the combiner, copy the pending connectivity