Fix refcounting
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 3d06576..eadeb0e 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -80,6 +80,8 @@
   grpc_mdctx *mdctx;
   /** master channel */
   grpc_channel *master;
+  /** have we seen a disconnection? */
+  int disconnected;
 
   /** set during connection */
   grpc_connect_out_args connecting_result;
@@ -152,6 +154,7 @@
 
 static void connection_destroy(connection *c) {
   GPR_ASSERT(c->refs == 0);
+  gpr_log(GPR_DEBUG, "CONNECTION_DESTROY %p", c);
   grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
   gpr_free(c);
 }
@@ -342,8 +345,32 @@
 
 void grpc_subchannel_process_transport_op(grpc_subchannel *c,
                                           grpc_transport_op *op) {
-  gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented");
-  abort();
+  connection *con = NULL;
+  grpc_subchannel *destroy;
+  gpr_mu_lock(&c->mu);
+  if (op->disconnect) {
+    c->disconnected = 1;
+    grpc_connectivity_state_set(&c->state_tracker,
+                                compute_connectivity_locked(c));
+  }
+  if (c->active != NULL) {
+    con = c->active;
+    CONNECTION_REF_LOCKED(con, "transport-op");
+  }
+  gpr_mu_unlock(&c->mu);
+
+  if (con != NULL) {
+    grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+    grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+    top_elem->filter->start_transport_op(top_elem, op);
+
+    gpr_mu_lock(&c->mu);
+    destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
+    gpr_mu_unlock(&c->mu);
+    if (destroy) {
+      subchannel_destroy(destroy);
+    }
+  }
 }
 
 static void on_state_changed(void *p, int iomgr_success) {
@@ -388,7 +415,7 @@
     case GRPC_CHANNEL_TRANSIENT_FAILURE:
       /* things are starting to go wrong, reconnect but don't deactivate */
       /* released by connection */
-      SUBCHANNEL_REF_LOCKED(c, "connection");
+      SUBCHANNEL_REF_LOCKED(c, "connecting");
       do_connect = 1;
       c->connecting = 1;
       break;
@@ -397,7 +424,7 @@
 done:
   grpc_connectivity_state_set(&c->state_tracker,
                               compute_connectivity_locked(c));
-  destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
+  destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
   gpr_free(sw);
   gpr_mu_unlock(mu);
   if (do_connect) {
@@ -450,6 +477,14 @@
 
   gpr_mu_lock(&c->mu);
 
+  if (c->disconnected) {
+    gpr_mu_unlock(&c->mu);
+    gpr_free(sw);
+    gpr_free(filters);
+    grpc_channel_stack_destroy(stk);
+    return;
+  }
+
   /* publish */
   if (c->active != NULL && c->active->refs == 0) {
     destroy_connection = c->active;
@@ -464,6 +499,8 @@
   memset(&op, 0, sizeof(op));
   op.connectivity_state = &sw->connectivity_state;
   op.on_connectivity_state_change = &sw->closure;
+  SUBCHANNEL_REF_LOCKED(c, "state_watcher");
+  GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
   elem =
       grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
   elem->filter->start_transport_op(elem, &op);
@@ -491,7 +528,7 @@
   } else {
     int destroy;
     gpr_mu_lock(&c->mu);
-    destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
+    destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting");
     gpr_mu_unlock(&c->mu);
     if (destroy) subchannel_destroy(c);
     /* TODO(ctiller): retry after sleeping */
@@ -504,6 +541,9 @@
 }
 
 static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
+  if (c->disconnected) {
+    return GRPC_CHANNEL_FATAL_FAILURE;
+  }
   if (c->connecting) {
     return GRPC_CHANNEL_CONNECTING;
   }