Added connectivity tests, fixed bugs
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index b68954b..1234758 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -40,7 +40,6 @@
 #include "src/core/channel/connected_channel.h"
 #include "src/core/surface/channel.h"
 #include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset_set.h"
 #include "src/core/support/string.h"
 #include "src/core/transport/connectivity_state.h"
 #include <grpc/support/alloc.h>
@@ -79,8 +78,20 @@
   grpc_connectivity_state_tracker state_tracker;
   /** when an lb_policy arrives, should we try to exit idle */
   int exit_idle_when_lb_policy_arrives;
+  /** pollset_set of interested parties in a new connection */
+  grpc_pollset_set pollset_set;
 } channel_data;
 
+/** We create one watcher for each new lb_policy that is returned from a resolver,
+    to watch for state changes from the lb_policy. When a state change is seen, we
+    update the channel, and create a new watcher */
+typedef struct {
+  channel_data *chand;
+  grpc_iomgr_closure on_changed;
+  grpc_connectivity_state state;
+  grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
 typedef enum {
   CALL_CREATED,
   CALL_WAITING_FOR_SEND,
@@ -394,17 +405,61 @@
   perform_transport_stream_op(elem, op, 0);
 }
 
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+
+static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
+  lb_policy_connectivity_watcher *w = arg;
+  int start_new = 0;
+
+  gpr_log(GPR_DEBUG, "on_lb_policy_state_changed: %p %d", w->lb_policy, w->state);
+
+  gpr_mu_lock(&w->chand->mu_config);
+  /* check if the notification is for a stale policy */
+  if (w->lb_policy == w->chand->lb_policy) {
+    grpc_connectivity_state_set(&w->chand->state_tracker, w->state);
+    start_new = 1;
+  } else {
+    gpr_log(GPR_DEBUG, "stale state change: %p vs %p", w->lb_policy, w->chand->lb_policy);
+  }
+  gpr_mu_unlock(&w->chand->mu_config);
+
+  if (start_new) {
+    watch_lb_policy(w->chand, w->lb_policy, w->state);
+  }
+
+  GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+  gpr_free(w);
+}
+
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+  lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+  GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+
+  gpr_log(GPR_DEBUG, "watch_lb_policy: %p %d", lb_policy, current_state);
+
+  w->chand = chand;
+  grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+  w->state = current_state;
+  w->lb_policy = lb_policy;
+  grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+}
+
 static void cc_on_config_changed(void *arg, int iomgr_success) {
   channel_data *chand = arg;
   grpc_lb_policy *lb_policy = NULL;
   grpc_lb_policy *old_lb_policy;
   grpc_resolver *old_resolver;
   grpc_iomgr_closure *wakeup_closures = NULL;
+  grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
   int exit_idle = 0;
 
   if (chand->incoming_configuration != NULL) {
     lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
-    GRPC_LB_POLICY_REF(lb_policy, "channel");
+    if (lb_policy != NULL) {
+      GRPC_LB_POLICY_REF(lb_policy, "channel");
+      GRPC_LB_POLICY_REF(lb_policy, "config_change");
+      state = grpc_lb_policy_check_connectivity(lb_policy);
+    }
 
     grpc_client_config_unref(chand->incoming_configuration);
   }
@@ -423,18 +478,7 @@
     exit_idle = 1;
     chand->exit_idle_when_lb_policy_arrives = 0;
   }
-  gpr_mu_unlock(&chand->mu_config);
 
-  if (exit_idle) {
-    grpc_lb_policy_exit_idle(lb_policy);
-    GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
-  }
-
-  if (old_lb_policy) {
-    GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
-  }
-
-  gpr_mu_lock(&chand->mu_config);
   if (iomgr_success && chand->resolver) {
     grpc_resolver *resolver = chand->resolver;
     GRPC_RESOLVER_REF(resolver, "channel-next");
@@ -443,6 +487,10 @@
     grpc_resolver_next(resolver, &chand->incoming_configuration,
                        &chand->on_config_changed);
     GRPC_RESOLVER_UNREF(resolver, "channel-next");
+    grpc_connectivity_state_set(&chand->state_tracker, state);
+    if (lb_policy != NULL) {
+      watch_lb_policy(chand, lb_policy, state);
+    }
   } else {
     old_resolver = chand->resolver;
     chand->resolver = NULL;
@@ -455,12 +503,24 @@
     }
   }
 
+  if (exit_idle) {
+    grpc_lb_policy_exit_idle(lb_policy);
+    GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+  }
+
+  if (old_lb_policy != NULL) {
+    GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+  }
+
   while (wakeup_closures) {
     grpc_iomgr_closure *next = wakeup_closures->next;
     grpc_iomgr_add_callback(wakeup_closures);
     wakeup_closures = next;
   }
 
+  if (lb_policy != NULL) {
+    GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
+  }
   GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
 }
 
@@ -491,6 +551,8 @@
     chand->resolver = NULL;
     if (chand->lb_policy != NULL) {
       grpc_lb_policy_shutdown(chand->lb_policy);
+      GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+      chand->lb_policy = NULL;
     }
   }
 
@@ -578,10 +640,11 @@
   gpr_mu_init(&chand->mu_config);
   chand->mdctx = metadata_context;
   chand->master = master;
+  grpc_pollset_set_init(&chand->pollset_set);
   grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
                           chand);
 
-  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
+  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
 }
 
 /* Destructor for channel_data */
@@ -595,6 +658,8 @@
   if (chand->lb_policy != NULL) {
     GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
   }
+  grpc_connectivity_state_destroy(&chand->state_tracker);
+  grpc_pollset_set_destroy(&chand->pollset_set);
   gpr_mu_destroy(&chand->mu_config);
 }
 
@@ -649,3 +714,20 @@
                                                  on_complete);
   gpr_mu_unlock(&chand->mu_config);
 }
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+  channel_data *chand = elem->channel_data;
+  return &chand->pollset_set;
+}
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
+                                          grpc_pollset *pollset) {
+  channel_data *chand = elem->channel_data;
+  grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
+}
+
+void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
+                                          grpc_pollset *pollset) {
+  channel_data *chand = elem->channel_data;
+  grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
+}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 16f1133..cd81294 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -59,4 +59,11 @@
     grpc_channel_element *elem, grpc_connectivity_state *state,
     grpc_iomgr_closure *on_complete);
 
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
+                                          grpc_pollset *pollset);
+void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
+                                          grpc_pollset *pollset);
+
 #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 4f4c7eb..8409aab 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -73,12 +73,30 @@
   grpc_connectivity_state_tracker state_tracker;
 } pick_first_lb_policy;
 
+static void del_interested_parties_locked(pick_first_lb_policy *p) {
+  pending_pick *pp;
+  for (pp = p->pending_picks; pp; pp = pp->next) {
+    grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
+                                         pp->pollset);
+  }
+}
+
+static void add_interested_parties_locked(pick_first_lb_policy *p) {
+  pending_pick *pp;
+  for (pp = p->pending_picks; pp; pp = pp->next) {
+    grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
+                                         pp->pollset);
+  }
+}
+
 void pf_destroy(grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   size_t i;
+  del_interested_parties_locked(p);
   for (i = 0; i < p->num_subchannels; i++) {
     GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
   }
+  grpc_connectivity_state_destroy(&p->state_tracker);
   gpr_free(p->subchannels);
   gpr_mu_destroy(&p->mu);
   gpr_free(p);
@@ -88,6 +106,7 @@
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
   gpr_mu_lock(&p->mu);
+  del_interested_parties_locked(p);
   while ((pp = p->pending_picks)) {
     p->pending_picks = pp->next;
     *pp->target = NULL;
@@ -142,77 +161,82 @@
   }
 }
 
-static void del_interested_parties_locked(pick_first_lb_policy *p) {
-  pending_pick *pp;
-  for (pp = p->pending_picks; pp; pp = pp->next) {
-    grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
-                                         pp->pollset);
-  }
-}
-
-static void add_interested_parties_locked(pick_first_lb_policy *p) {
-  pending_pick *pp;
-  for (pp = p->pending_picks; pp; pp = pp->next) {
-    grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
-                                         pp->pollset);
-  }
-}
-
 static void pf_connectivity_changed(void *arg, int iomgr_success) {
   pick_first_lb_policy *p = arg;
   pending_pick *pp;
   int unref = 0;
 
   gpr_mu_lock(&p->mu);
-loop:
-  switch (p->checking_connectivity) {
-    case GRPC_CHANNEL_READY:
-      p->selected = p->subchannels[p->checking_subchannel];
-      while ((pp = p->pending_picks)) {
-        p->pending_picks = pp->next;
-        *pp->target = p->selected;
-        grpc_subchannel_del_interested_party(p->selected, pp->pollset);
-        grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
-        gpr_free(pp);
-      }
+
+  if (p->selected != NULL) {
+    grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
+    if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
+      grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
+    } else {
       unref = 1;
-      break;
-    case GRPC_CHANNEL_TRANSIENT_FAILURE:
-      del_interested_parties_locked(p);
-      p->checking_subchannel =
-          (p->checking_subchannel + 1) % p->num_subchannels;
-      p->checking_connectivity = grpc_subchannel_check_connectivity(
-          p->subchannels[p->checking_subchannel]);
-      add_interested_parties_locked(p);
-      goto loop;
-    case GRPC_CHANNEL_CONNECTING:
-    case GRPC_CHANNEL_IDLE:
-      grpc_subchannel_notify_on_state_change(
-          p->subchannels[p->checking_subchannel], &p->checking_connectivity,
-          &p->connectivity_changed);
-      break;
-    case GRPC_CHANNEL_FATAL_FAILURE:
-      del_interested_parties_locked(p);
-      GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
-               p->subchannels[p->num_subchannels - 1]);
-      p->num_subchannels--;
-      GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
-      if (p->num_subchannels == 0) {
+    }
+  } else {
+loop:
+    switch (p->checking_connectivity) {
+      case GRPC_CHANNEL_READY:
+        grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY);
+        p->selected = p->subchannels[p->checking_subchannel];
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
-          *pp->target = NULL;
+          *pp->target = p->selected;
+          grpc_subchannel_del_interested_party(p->selected, pp->pollset);
           grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
           gpr_free(pp);
         }
-        unref = 1;
-      } else {
-        p->checking_subchannel %= p->num_subchannels;
+        grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
+        break;
+      case GRPC_CHANNEL_TRANSIENT_FAILURE:
+        grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
+        del_interested_parties_locked(p);
+        p->checking_subchannel =
+            (p->checking_subchannel + 1) % p->num_subchannels;
         p->checking_connectivity = grpc_subchannel_check_connectivity(
             p->subchannels[p->checking_subchannel]);
         add_interested_parties_locked(p);
-        goto loop;
-      }
+        if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+          grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
+        } else {
+          goto loop;
+        }
+        break;
+      case GRPC_CHANNEL_CONNECTING:
+      case GRPC_CHANNEL_IDLE:
+        grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
+        grpc_subchannel_notify_on_state_change(
+            p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+            &p->connectivity_changed);
+        break;
+      case GRPC_CHANNEL_FATAL_FAILURE:
+        del_interested_parties_locked(p);
+        GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+                 p->subchannels[p->num_subchannels - 1]);
+        p->num_subchannels--;
+        GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
+        if (p->num_subchannels == 0) {
+          grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
+          while ((pp = p->pending_picks)) {
+            p->pending_picks = pp->next;
+            *pp->target = NULL;
+            grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+            gpr_free(pp);
+          }
+          unref = 1;
+        } else {
+          grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
+          p->checking_subchannel %= p->num_subchannels;
+          p->checking_connectivity = grpc_subchannel_check_connectivity(
+              p->subchannels[p->checking_subchannel]);
+          add_interested_parties_locked(p);
+          goto loop;
+        }
+    }
   }
+
   gpr_mu_unlock(&p->mu);
 
   if (unref) {
@@ -278,6 +302,7 @@
   grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
   p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
   p->num_subchannels = num_subchannels;
+  grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first");
   memcpy(p->subchannels, subchannels,
          sizeof(grpc_subchannel *) * num_subchannels);
   grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 9d5baf8..7a425f9 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -81,3 +81,12 @@
 void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
   policy->vtable->exit_idle(policy);
 }
+
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state,
+                                 grpc_iomgr_closure *closure) {
+  policy->vtable->notify_on_state_change(policy, state, closure);
+}
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(grpc_lb_policy *policy) {
+  return policy->vtable->check_connectivity(policy);
+}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 363faf3..98c741b 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -75,6 +75,8 @@
                                  grpc_iomgr_closure *closure);
 };
 
+#define GRPC_LB_POLICY_REFCOUNT_DEBUG
+
 #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
 #define GRPC_LB_POLICY_REF(p, r) \
   grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
@@ -111,4 +113,9 @@
 
 void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
 
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state,
+                                 grpc_iomgr_closure *closure);
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(grpc_lb_policy *policy);
+
 #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 8cdad10..be26e0b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -38,9 +38,11 @@
 #include <grpc/support/alloc.h>
 
 #include "src/core/channel/channel_args.h"
+#include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
 #include "src/core/iomgr/alarm.h"
 #include "src/core/transport/connectivity_state.h"
+#include "src/core/surface/channel.h"
 
 typedef struct {
   /* all fields protected by subchannel->mu */
@@ -94,8 +96,9 @@
   grpc_iomgr_closure connected;
 
   /** pollset_set tracking who's interested in a connection
-      being setup */
-  grpc_pollset_set pollset_set;
+      being setup - owned by the master channel (in particular the client_channel
+      filter there-in) */
+  grpc_pollset_set *pollset_set;
 
   /** mutex protecting remaining elements */
   gpr_mu mu;
@@ -244,7 +247,6 @@
   grpc_channel_args_destroy(c->args);
   gpr_free(c->addr);
   grpc_mdctx_unref(c->mdctx);
-  grpc_pollset_set_destroy(&c->pollset_set);
   grpc_connectivity_state_destroy(&c->state_tracker);
   grpc_connector_unref(c->connector);
   gpr_free(c);
@@ -252,17 +254,18 @@
 
 void grpc_subchannel_add_interested_party(grpc_subchannel *c,
                                           grpc_pollset *pollset) {
-  grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
+  grpc_pollset_set_add_pollset(c->pollset_set, pollset);
 }
 
 void grpc_subchannel_del_interested_party(grpc_subchannel *c,
                                           grpc_pollset *pollset) {
-  grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
+  grpc_pollset_set_del_pollset(c->pollset_set, pollset);
 }
 
 grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
                                         grpc_subchannel_args *args) {
   grpc_subchannel *c = gpr_malloc(sizeof(*c));
+  grpc_channel_element *parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(args->master));
   memset(c, 0, sizeof(*c));
   c->refs = 1;
   c->connector = connector;
@@ -277,10 +280,10 @@
   c->args = grpc_channel_args_copy(args->args);
   c->mdctx = args->mdctx;
   c->master = args->master;
+  c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
   grpc_mdctx_ref(c->mdctx);
-  grpc_pollset_set_init(&c->pollset_set);
   grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
-  grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
+  grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel");
   gpr_mu_init(&c->mu);
   return c;
 }
@@ -288,7 +291,7 @@
 static void continue_connect(grpc_subchannel *c) {
   grpc_connect_in_args args;
 
-  args.interested_parties = &c->pollset_set;
+  args.interested_parties = c->pollset_set;
   args.addr = c->addr;
   args.addr_len = c->addr_len;
   args.deadline = compute_connect_deadline(c);
@@ -309,6 +312,7 @@
 
 static void continue_creating_call(void *arg, int iomgr_success) {
   waiting_for_connect *w4c = arg;
+  grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
   grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
                               w4c->notify);
   GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
@@ -344,6 +348,7 @@
       connectivity_state_changed_locked(c);
       /* released by connection */
       SUBCHANNEL_REF_LOCKED(c, "connecting");
+      GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
       gpr_mu_unlock(&c->mu);
 
       start_connect(c);
@@ -372,6 +377,7 @@
     c->connecting = 1;
     /* released by connection */
     SUBCHANNEL_REF_LOCKED(c, "connecting");
+    GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
     connectivity_state_changed_locked(c);
   }
   gpr_mu_unlock(&c->mu);
@@ -536,7 +542,9 @@
   memset(&op, 0, sizeof(op));
   op.connectivity_state = &sw->connectivity_state;
   op.on_connectivity_state_change = &sw->closure;
+  op.bind_pollset_set = c->pollset_set;
   SUBCHANNEL_REF_LOCKED(c, "state_watcher");
+  GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
   GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
   elem =
       grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
@@ -570,6 +578,7 @@
   if (iomgr_success) {
     continue_connect(c);
   } else {
+    GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
     GRPC_SUBCHANNEL_UNREF(c, "connecting");
   }
 }
@@ -580,9 +589,9 @@
     publish_transport(c);
   } else {
     gpr_mu_lock(&c->mu);
-    connectivity_state_changed_locked(c);
     GPR_ASSERT(!c->have_alarm);
     c->have_alarm = 1;
+    connectivity_state_changed_locked(c);
     c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta);
     c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
     grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_REALTIME));
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 9648795..24c6270 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -50,6 +50,10 @@
   ep->vtable->add_to_pollset(ep, pollset);
 }
 
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+  ep->vtable->add_to_pollset_set(ep, pollset_set);
+}
+
 void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
 
 void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index 881e851..572a070 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -35,6 +35,7 @@
 #define GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H
 
 #include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/pollset_set.h"
 #include <grpc/support/slice.h>
 #include <grpc/support/time.h>
 
@@ -70,6 +71,7 @@
                                       size_t nslices, grpc_endpoint_write_cb cb,
                                       void *user_data);
   void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
+  void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
   void (*shutdown)(grpc_endpoint *ep);
   void (*destroy)(grpc_endpoint *ep);
 };
@@ -98,6 +100,7 @@
 /* Add an endpoint to a pollset, so that when the pollset is polled, events from
    this endpoint are considered */
 void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set);
 
 struct grpc_endpoint {
   const grpc_endpoint_vtable *vtable;
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 5ff7df1..2076ac7 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -60,7 +60,7 @@
 
 void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
                                   grpc_pollset *pollset) {
-  size_t i;
+  size_t i, j;
   gpr_mu_lock(&pollset_set->mu);
   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
     pollset_set->pollset_capacity =
@@ -70,9 +70,15 @@
                                                sizeof(*pollset_set->pollsets));
   }
   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
-  for (i = 0; i < pollset_set->fd_count; i++) {
-    grpc_pollset_add_fd(pollset, pollset_set->fds[i]);
+  for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+    if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
+      GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+    } else {
+      grpc_pollset_add_fd(pollset, pollset_set->fds[i]);
+      pollset_set->fds[j++] = pollset_set->fds[i];
+    }
   }
+  pollset_set->fd_count = j;
   gpr_mu_unlock(&pollset_set->mu);
 }
 
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index b6d6efc..6996255 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -567,9 +567,14 @@
   grpc_pollset_add_fd(pollset, tcp->em_fd);
 }
 
+static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+  grpc_tcp *tcp = (grpc_tcp *)ep;
+  grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
+}
+
 static const grpc_endpoint_vtable vtable = {
     grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
-    grpc_tcp_shutdown, grpc_tcp_destroy};
+    grpc_tcp_add_to_pollset_set, grpc_tcp_shutdown, grpc_tcp_destroy};
 
 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 3548198..e94b6cf 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -331,9 +331,15 @@
   grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
 }
 
+static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
+                                    grpc_pollset_set *pollset_set) {
+  secure_endpoint *ep = (secure_endpoint *)secure_ep;
+  grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
+}
+
 static const grpc_endpoint_vtable vtable = {
     endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
-    endpoint_shutdown, endpoint_unref};
+    endpoint_add_to_pollset_set, endpoint_shutdown, endpoint_unref};
 
 grpc_endpoint *grpc_secure_endpoint_create(
     struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 71f8a55..8db22c0 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -58,6 +58,8 @@
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
 gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
 
+#define GRPC_CHANNEL_REF_COUNT_DEBUG
+
 #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
 void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
 void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index bdd9678..dd12ff5 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -73,10 +73,15 @@
   grpc_connectivity_state *optional_new_state;
   grpc_completion_queue *cq;
   grpc_cq_completion completion_storage;
+  grpc_channel *channel;
   void *tag;
 } state_watcher;
 
 static void delete_state_watcher(state_watcher *w) {
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel));
+  grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq));
+  GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
   gpr_mu_destroy(&w->mu);
   gpr_free(w);
 }
@@ -143,9 +148,9 @@
   }
 }
 
-static void watch_complete(void *pw, int success) { partly_done(pw, 0); }
+static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
 
-static void timeout_complete(void *pw, int success) { partly_done(pw, 1); }
+static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
 
 void grpc_channel_watch_connectivity_state(
     grpc_channel *channel, grpc_connectivity_state last_observed_state,
@@ -165,6 +170,7 @@
   w->optional_new_state = optional_new_state;
   w->cq = cq;
   w->tag = tag;
+  w->channel = channel;
 
   grpc_alarm_init(&w->alarm, deadline, timeout_complete, w,
                   gpr_now(GPR_CLOCK_REALTIME));
@@ -176,6 +182,8 @@
             client_channel_elem->filter->name);
     grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
   } else {
+    GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
+    grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq));
     grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
                                                  &w->on_complete);
   }
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index e205f0a..cf2cf94 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -108,6 +108,7 @@
   gpr_refcount refs;
   grpc_mdctx *mdctx;
   grpc_channel_args *merge_args;
+  grpc_channel *master;
 } subchannel_factory;
 
 static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -118,6 +119,7 @@
 static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
   subchannel_factory *f = (subchannel_factory *)scf;
   if (gpr_unref(&f->refs)) {
+    GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
     grpc_channel_args_destroy(f->merge_args);
     grpc_mdctx_unref(f->mdctx);
     gpr_free(f);
@@ -136,6 +138,7 @@
   gpr_ref_init(&c->refs, 1);
   args->mdctx = f->mdctx;
   args->args = final_args;
+  args->master = f->master;
   s = grpc_subchannel_create(&c->base, args);
   grpc_connector_unref(&c->base);
   grpc_channel_args_destroy(final_args);
@@ -166,18 +169,21 @@
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
+  channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
+
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
   gpr_ref_init(&f->refs, 1);
   grpc_mdctx_ref(mdctx);
   f->mdctx = mdctx;
   f->merge_args = grpc_channel_args_copy(args);
+  f->master = channel;
+  GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
   resolver = grpc_resolver_create(target, &f->base);
   if (!resolver) {
     return NULL;
   }
 
-  channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
   grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
                                    resolver);
   GRPC_RESOLVER_UNREF(resolver, "create");
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 04e27d3..6e2a15b 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -46,6 +46,7 @@
 #include "src/core/surface/init.h"
 #include "src/core/surface/surface_trace.h"
 #include "src/core/transport/chttp2_transport.h"
+#include "src/core/transport/connectivity_state.h"
 
 #ifdef GPR_POSIX_SOCKET
 #include "src/core/client_config/resolvers/unix_resolver_posix.h"
@@ -76,6 +77,7 @@
     grpc_register_tracer("http", &grpc_http_trace);
     grpc_register_tracer("flowctl", &grpc_flowctl_trace);
     grpc_register_tracer("batch", &grpc_trace_batch);
+    grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
     grpc_security_pre_init();
     grpc_iomgr_init();
     grpc_tracer_init("GRPC_TRACE");
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 34ee3f8..1836d39 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -132,6 +132,7 @@
   grpc_mdctx *mdctx;
   grpc_channel_args *merge_args;
   grpc_channel_security_connector *security_connector;
+  grpc_channel *master;
 } subchannel_factory;
 
 static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -144,6 +145,7 @@
   if (gpr_unref(&f->refs)) {
     GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
                                   "subchannel_factory");
+    GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
     grpc_channel_args_destroy(f->merge_args);
     grpc_mdctx_unref(f->mdctx);
     gpr_free(f);
@@ -163,6 +165,7 @@
   gpr_ref_init(&c->refs, 1);
   args->mdctx = f->mdctx;
   args->args = final_args;
+  args->master = f->master;
   s = grpc_subchannel_create(&c->base, args);
   grpc_connector_unref(&c->base);
   grpc_channel_args_destroy(final_args);
@@ -215,6 +218,8 @@
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
+  channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
+
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
   gpr_ref_init(&f->refs, 1);
@@ -223,12 +228,13 @@
   GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory");
   f->security_connector = connector;
   f->merge_args = grpc_channel_args_copy(args_copy);
+  f->master = channel;
+  GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
   resolver = grpc_resolver_create(target, &f->base);
   if (!resolver) {
     return NULL;
   }
 
-  channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
   grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
                                    resolver);
   GRPC_RESOLVER_UNREF(resolver, "create");
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index ac399e4..1ad5066 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -110,6 +110,8 @@
 /** Add endpoint from this transport to pollset */
 static void add_to_pollset_locked(grpc_chttp2_transport *t,
                                   grpc_pollset *pollset);
+static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
+                                  grpc_pollset_set *pollset_set);
 
 /** Start new streams that have been created if we can */
 static void maybe_start_some_streams(
@@ -233,7 +235,7 @@
       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
   t->writing.is_client = is_client;
   grpc_connectivity_state_init(&t->channel_callback.state_tracker,
-                               GRPC_CHANNEL_READY);
+                               GRPC_CHANNEL_READY, "transport");
 
   gpr_slice_buffer_init(&t->global.qbuf);
 
@@ -668,6 +670,7 @@
 
 static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  int close_transport = 0;
 
   lock(t);
 
@@ -687,9 +690,7 @@
         t->global.last_incoming_stream_id,
         grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
         gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
-    if (!grpc_chttp2_has_streams(t)) {
-      close_transport_locked(t);
-    }
+    close_transport = !grpc_chttp2_has_streams(t);
   }
 
   if (op->set_accept_stream != NULL) {
@@ -702,6 +703,10 @@
     add_to_pollset_locked(t, op->bind_pollset);
   }
 
+  if (op->bind_pollset_set) {
+    add_to_pollset_set_locked(t, op->bind_pollset_set);
+  }
+
   if (op->send_ping) {
     send_ping_locked(t, op->send_ping);
   }
@@ -711,6 +716,12 @@
   }
 
   unlock(t);
+
+  if (close_transport) {
+    lock(t);
+    close_transport_locked(t);
+    unlock(t);
+  }
 }
 
 /*
@@ -1016,6 +1027,13 @@
   }
 }
 
+static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
+                                  grpc_pollset_set *pollset_set) {
+  if (t->ep) {
+    grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
+  }
+}
+
 /*
  * TRACING
  */
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 1091cea..85a8618 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -34,11 +34,27 @@
 #include "src/core/transport/connectivity_state.h"
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+int grpc_connectivity_state_trace = 0;
+
+const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
+  switch (state) {
+    case GRPC_CHANNEL_IDLE: return "IDLE";
+    case GRPC_CHANNEL_CONNECTING: return "CONNECTING";
+    case GRPC_CHANNEL_READY: return "READY";
+    case GRPC_CHANNEL_TRANSIENT_FAILURE: return "TRANSIENT_FAILURE";
+    case GRPC_CHANNEL_FATAL_FAILURE: return "FATAL_FAILURE";
+  }
+  abort();
+  return "UNKNOWN";
+}
 
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
-                                  grpc_connectivity_state init_state) {
+                                  grpc_connectivity_state init_state, const char *name) {
   tracker->current_state = init_state;
   tracker->watchers = NULL;
+  tracker->name = gpr_strdup(name);
 }
 
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -54,6 +70,7 @@
     }
     gpr_free(w);
   }
+  gpr_free(tracker->name);
 }
 
 grpc_connectivity_state grpc_connectivity_state_check(
@@ -64,6 +81,9 @@
 int grpc_connectivity_state_notify_on_state_change(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
     grpc_iomgr_closure *notify) {
+  if (grpc_connectivity_state_trace) {
+    gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(tracker->current_state));
+  }
   if (tracker->current_state != *current) {
     *current = tracker->current_state;
     grpc_iomgr_add_callback(notify);
@@ -82,6 +102,9 @@
     void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) {
   grpc_connectivity_state_watcher *new = NULL;
   grpc_connectivity_state_watcher *w;
+  if (grpc_connectivity_state_trace) {
+    gpr_log(GPR_DEBUG, "SET: %s: %s --> %s", tracker->name, grpc_connectivity_state_name(tracker->current_state), grpc_connectivity_state_name(state));
+  }
   if (tracker->current_state == state) {
     return;
   }
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index bbdcbcb..8e40158 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -51,10 +51,14 @@
   grpc_connectivity_state current_state;
   /** all our watchers */
   grpc_connectivity_state_watcher *watchers;
+  /** a name to help debugging */
+  char *name;
 } grpc_connectivity_state_tracker;
 
+extern int grpc_connectivity_state_trace;
+
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
-                                  grpc_connectivity_state init_state);
+                                  grpc_connectivity_state init_state, const char *name);
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
 
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 1429737..5af8123 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -105,6 +105,8 @@
   void *set_accept_stream_user_data;
   /** add this transport to a pollset */
   grpc_pollset *bind_pollset;
+  /** add this transport to a pollset_set */
+  grpc_pollset_set *bind_pollset_set;
   /** send a ping, call this back if not NULL */
   grpc_iomgr_closure *send_ping;
 } grpc_transport_op;