Add ability to continue waiting calls
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c770cb3..7bd6717 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -49,11 +49,20 @@
   grpc_subchannel *subchannel;
 } connection;
 
+typedef struct {
+  grpc_iomgr_closure closure;
+  size_t version;
+  grpc_subchannel *subchannel;
+  grpc_connectivity_state connectivity_state;
+} state_watcher;
+
 typedef struct waiting_for_connect {
   struct waiting_for_connect *next;
   grpc_iomgr_closure *notify;
-  grpc_transport_stream_op *initial_op;
+  grpc_transport_stream_op initial_op;
   grpc_subchannel_call **target;
+  grpc_subchannel *subchannel;
+  grpc_iomgr_closure continuation;
 } waiting_for_connect;
 
 struct grpc_subchannel {
@@ -85,6 +94,8 @@
 
   /** active connection */
   connection *active;
+  /** version number for the active connection */
+  size_t active_version;
   /** refcount */
   int refs;
   /** are we connecting */
@@ -228,6 +239,16 @@
                          &c->connected);
 }
 
+static void continue_creating_call(void *arg, int iomgr_success) {
+  waiting_for_connect *w4c = arg;
+  grpc_subchannel_create_call(w4c->subchannel, 
+    &w4c->initial_op, 
+    w4c->target, 
+    w4c->notify);
+  grpc_subchannel_unref(w4c->subchannel);
+  gpr_free(w4c);
+}
+
 void grpc_subchannel_create_call(grpc_subchannel *c,
                                  grpc_transport_stream_op *initial_op,
                                  grpc_subchannel_call **target,
@@ -245,8 +266,11 @@
     waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
     w4c->next = c->waiting;
     w4c->notify = notify;
-    w4c->initial_op = initial_op;
+    w4c->initial_op = *initial_op;
     w4c->target = target;
+    w4c->subchannel = c;
+    subchannel_ref_locked(c);
+    grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
     c->waiting = w4c;
     grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
     if (!c->connecting) {
@@ -291,7 +315,70 @@
 
 void grpc_subchannel_process_transport_op(grpc_subchannel *c,
                                           grpc_transport_op *op) {
-  abort();
+  abort(); /* not implemented */
+}
+
+static void on_state_changed(void *p, int iomgr_success) {
+  state_watcher *sw = p;
+  grpc_subchannel *c = sw->subchannel;
+  gpr_mu *mu = &c->mu;
+  int destroy;
+  grpc_transport_op op;
+  grpc_channel_element *elem;
+  connection *destroy_connection = NULL;
+  int do_connect = 0;
+
+  gpr_mu_lock(mu);
+
+  /* if we failed or there is a version number mismatch, just leave
+     this closure */
+  if (!iomgr_success || sw->subchannel->active_version != sw->version) {
+    goto done;
+  }
+
+  switch (sw->connectivity_state) {
+    case GRPC_CHANNEL_CONNECTING:
+    case GRPC_CHANNEL_READY:
+    case GRPC_CHANNEL_IDLE:
+      /* all is still good: keep watching */
+      memset(&op, 0, sizeof(op));
+      op.connectivity_state = &sw->connectivity_state;
+      op.on_connectivity_state_change = &sw->closure;
+      elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
+      elem->filter->start_transport_op(elem, &op);
+      /* early out */
+      gpr_mu_unlock(mu);
+      return;
+    case GRPC_CHANNEL_FATAL_FAILURE:
+      /* things have gone wrong, deactivate and enter idle */
+      if (sw->subchannel->active->refs == 0) {
+        destroy_connection = sw->subchannel->active;
+      }
+      sw->subchannel->active = NULL;
+      break;
+    case GRPC_CHANNEL_TRANSIENT_FAILURE:
+      /* things are starting to go wrong, reconnect but don't deactivate */
+      subchannel_ref_locked(c);
+      do_connect = 1;
+      c->connecting = 1;
+      break;
+  }
+
+done:
+  grpc_connectivity_state_set(&c->state_tracker,
+                              compute_connectivity_locked(c));
+  destroy = subchannel_unref_locked(c);
+  gpr_free(sw);
+  gpr_mu_unlock(mu);
+  if (do_connect) {
+    start_connect(c);
+  }
+  if (destroy) {
+    subchannel_destroy(c);
+  }
+  if (destroy_connection != NULL) {
+    connection_destroy(destroy_connection);
+  }
 }
 
 static void publish_transport(grpc_subchannel *c) {
@@ -301,8 +388,12 @@
   size_t num_filters;
   const grpc_channel_filter **filters;
   waiting_for_connect *w4c;
-  int destroy;
+  grpc_transport_op op;
+  state_watcher *sw;
+  connection *destroy_connection = NULL;
+  grpc_channel_element *elem;
 
+  /* build final filter list */
   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
   filters = gpr_malloc(sizeof(*filters) * num_filters);
   memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
@@ -310,31 +401,54 @@
          sizeof(*filters) * c->connecting_result.num_filters);
   filters[num_filters - 1] = &grpc_connected_channel_filter;
 
+  /* construct channel stack */
   channel_stack_size = grpc_channel_stack_size(filters, num_filters);
   con = gpr_malloc(sizeof(connection) + channel_stack_size);
   stk = (grpc_channel_stack *)(con + 1);
-
   con->refs = 0;
   con->subchannel = c;
   grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
   grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
 
+  /* initialize state watcher */
+  sw = gpr_malloc(sizeof(*sw));
+  grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
+  sw->subchannel = c;
+  sw->connectivity_state = GRPC_CHANNEL_READY;
+
   gpr_mu_lock(&c->mu);
-  GPR_ASSERT(c->active == NULL);
+
+  /* publish */
+  if (c->active != NULL && c->active->refs == 0) {
+    destroy_connection = c->active;
+  }
   c->active = con;
+  c->active_version++;
+  sw->version = c->active_version;
   c->connecting = 0;
+
+  /* watch for changes; subchannel ref for connecting is donated
+     to the state watcher */
+  memset(&op, 0, sizeof(op));
+  op.connectivity_state = &sw->connectivity_state;
+  op.on_connectivity_state_change = &sw->closure;
+  elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
+  elem->filter->start_transport_op(elem, &op);
+
+  /* signal completion */
   connectivity_state_changed_locked(c);
   while ((w4c = c->waiting)) {
-    abort(); /* not implemented */
+    c->waiting = w4c->next;
+    grpc_iomgr_add_callback(&w4c->continuation);
   }
-  destroy = subchannel_unref_locked(c);
+
   gpr_mu_unlock(&c->mu);
 
   gpr_free(filters);
 
-  if (destroy) {
-    subchannel_destroy(c);
+  if (destroy_connection != NULL) {
+    connection_destroy(destroy_connection);
   }
 }