Further client config work
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9630f68..965d4e5 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -34,6 +34,7 @@
 #include "src/core/channel/client_channel.h"
 
 #include <stdio.h>
+#include <string.h>
 
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/connected_channel.h"
@@ -75,6 +76,7 @@
   CALL_CREATED,
   CALL_WAITING_FOR_CONFIG,
   CALL_WAITING_FOR_PICK,
+  CALL_WAITING_FOR_CALL,
   CALL_ACTIVE,
   CALL_CANCELLED
 } call_state;
@@ -87,17 +89,13 @@
 
   call_state state;
   gpr_timespec deadline;
-  union {
-    struct {
-      /* our child call stack */
-      grpc_subchannel_call *subchannel_call;
-    } active;
-    grpc_transport_stream_op waiting_op;
-    struct {
-      grpc_linked_mdelem status;
-      grpc_linked_mdelem details;
-    } cancelled;
-  } s;
+  grpc_subchannel *picked_channel;
+  grpc_iomgr_closure async_setup_task;
+  grpc_transport_stream_op waiting_op;
+  /* our child call stack */
+  grpc_subchannel_call *subchannel_call;
+  grpc_linked_mdelem status;
+  grpc_linked_mdelem details;
 };
 
 #if 0
@@ -110,9 +108,9 @@
   /* no more access to calld->s.waiting allowed */
   GPR_ASSERT(calld->state == CALL_WAITING);
 
-  if (calld->s.waiting_op.bind_pollset) {
+  if (calld->waiting_op.bind_pollset) {
     grpc_transport_setup_del_interested_party(chand->transport_setup,
-                                              calld->s.waiting_op.bind_pollset);
+                                              calld->waiting_op.bind_pollset);
   }
 
   calld->state = CALL_ACTIVE;
@@ -143,7 +141,7 @@
   for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
     if (chand->waiting_children[i] == calld) {
       grpc_transport_setup_del_interested_party(
-          chand->transport_setup, calld->s.waiting_op.bind_pollset);
+          chand->transport_setup, calld->waiting_op.bind_pollset);
       continue;
     }
     chand->waiting_children[new_count++] = chand->waiting_children[i];
@@ -166,15 +164,15 @@
     char status[GPR_LTOA_MIN_BUFSIZE];
     grpc_metadata_batch mdb;
     gpr_ltoa(GRPC_STATUS_CANCELLED, status);
-    calld->s.cancelled.status.md =
+    calld->status.md =
         grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
-    calld->s.cancelled.details.md =
+    calld->details.md =
         grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
-    calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
-    calld->s.cancelled.status.next = &calld->s.cancelled.details;
-    calld->s.cancelled.details.prev = &calld->s.cancelled.status;
-    mdb.list.head = &calld->s.cancelled.status;
-    mdb.list.tail = &calld->s.cancelled.details;
+    calld->status.prev = calld->details.next = NULL;
+    calld->status.next = &calld->details;
+    calld->details.prev = &calld->status;
+    mdb.list.head = &calld->status;
+    mdb.list.tail = &calld->details;
     mdb.garbage.head = mdb.garbage.tail = NULL;
     mdb.deadline = gpr_inf_future;
     grpc_sopb_add_metadata(op->recv_ops, mdb);
@@ -186,16 +184,111 @@
   }
 }
 
-static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) {
-  abort();
+typedef struct {
+  grpc_iomgr_closure closure;
+  grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
+
+static void continue_with_pick(void *arg, int iomgr_success) {
+  waiting_call *wc = arg;
+  call_data *calld = wc->elem->call_data;
+  perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+  gpr_free(wc);
+}
+
+static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
+  channel_data *chand = elem->channel_data;
+  waiting_call *wc = gpr_malloc(sizeof(*wc));
+  grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+  wc->elem = elem;
+  wc->closure.next = chand->waiting_for_config_closures;
+  chand->waiting_for_config_closures = &wc->closure;
+}
+
+static int is_empty(void *p, int len) {
+  char *ptr = p;
+  int i;
+  for (i = 0; i < len; i++) {
+    if (ptr[i] != 0) return 0;
+  }
+  return 1;
+}
+
+static void started_call(void *arg, int iomgr_success) {
+  call_data *calld = arg;
+  grpc_transport_stream_op op;
+  int have_waiting;
+
+  gpr_mu_lock(&calld->mu_state);
+  if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+    memset(&op, 0, sizeof(op));
+    op.cancel_with_status = GRPC_STATUS_CANCELLED;
+    gpr_mu_unlock(&calld->mu_state);
+    grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+  } else if (calld->state == CALL_WAITING_FOR_CALL) {
+    have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+    if (calld->subchannel_call != NULL) {
+      calld->state = CALL_ACTIVE;
+      gpr_mu_unlock(&calld->mu_state);
+      if (have_waiting) {
+        grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
+      }
+    } else {
+      calld->state = CALL_CANCELLED;
+      gpr_mu_unlock(&calld->mu_state);
+      if (have_waiting) {
+        handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+      }
+    }
+  } else {
+    GPR_ASSERT(calld->state == CALL_CANCELLED);
+  }
+}
+
+static void picked_target(void *arg, int iomgr_success) {
+  call_data *calld = arg;
+  channel_data *chand = calld->elem->channel_data;
+  grpc_transport_stream_op op;
+
+  if (calld->picked_channel == NULL) {
+    /* treat this like a cancellation */
+    calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+    perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+  } else {
+    gpr_mu_lock(&calld->mu_state);
+    if (calld->state == CALL_CANCELLED) {
+      gpr_mu_unlock(&calld->mu_state);
+      handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+    } else {
+      GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+      calld->state = CALL_WAITING_FOR_CALL;
+      op = calld->waiting_op;
+      memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+      gpr_mu_unlock(&calld->mu_state);
+      grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+      grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task);
+    }
+  }
 }
 
 static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
-  abort();
+  grpc_metadata_batch *initial_metadata;
+  grpc_transport_stream_op *op = &calld->waiting_op;
+
+  GPR_ASSERT(op->bind_pollset);
+  GPR_ASSERT(op->send_ops);
+  GPR_ASSERT(op->send_ops->nops >= 1);
+  GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+  initial_metadata = &op->send_ops->ops[0].data.metadata;
+
+  grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+  grpc_lb_policy_pick(lb_policy, op->bind_pollset, 
+    initial_metadata, &calld->picked_channel, &calld->async_setup_task);
 }
 
-static void cc_start_transport_stream_op(grpc_call_element *elem,
-                                  grpc_transport_stream_op *op) {
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   grpc_subchannel_call *subchannel_call;
@@ -206,7 +299,8 @@
   gpr_mu_lock(&calld->mu_state);
   switch (calld->state) {
     case CALL_ACTIVE:
-      subchannel_call = calld->s.active.subchannel_call;
+      GPR_ASSERT(!continuation);
+      subchannel_call = calld->subchannel_call;
       gpr_mu_unlock(&calld->mu_state);
       grpc_subchannel_call_process_op(subchannel_call, op);
       break;
@@ -214,13 +308,44 @@
       gpr_mu_unlock(&calld->mu_state);
       handle_op_after_cancellation(elem, op);
       break;
+    case CALL_WAITING_FOR_CONFIG:
+    case CALL_WAITING_FOR_PICK:
+    case CALL_WAITING_FOR_CALL:
+      if (!continuation) {
+        if (op->cancel_with_status != GRPC_STATUS_OK) {
+          calld->state = CALL_CANCELLED;
+          gpr_mu_unlock(&calld->mu_state);
+          handle_op_after_cancellation(elem, op);
+        } else {
+          GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
+                     (op->send_ops == NULL));
+          GPR_ASSERT((calld->waiting_op.recv_ops == NULL) !=
+                     (op->recv_ops == NULL));
+          if (op->send_ops != NULL) {
+            calld->waiting_op.send_ops = op->send_ops;
+            calld->waiting_op.is_last_send = op->is_last_send;
+            calld->waiting_op.on_done_send = op->on_done_send;
+          }
+          if (op->recv_ops != NULL) {
+            calld->waiting_op.recv_ops = op->recv_ops;
+            calld->waiting_op.recv_state = op->recv_state;
+            calld->waiting_op.on_done_recv = op->on_done_recv;
+          }
+          gpr_mu_unlock(&calld->mu_state);
+          if (op->on_consumed != NULL) {
+            op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+          }
+        }
+        break;
+      }
+      /* fall through */
     case CALL_CREATED:
       if (op->cancel_with_status != GRPC_STATUS_OK) {
         calld->state = CALL_CANCELLED;
         gpr_mu_unlock(&calld->mu_state);
         handle_op_after_cancellation(elem, op);
       } else {
-        calld->s.waiting_op = *op;
+        calld->waiting_op = *op;
 
         gpr_mu_lock(&chand->mu_config);
         lb_policy = chand->lb_policy;
@@ -235,141 +360,22 @@
           grpc_lb_policy_unref(lb_policy);
         } else {
           calld->state = CALL_WAITING_FOR_CONFIG;
-          add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
+          add_to_lb_policy_wait_queue_locked_state_config(elem);
           gpr_mu_unlock(&chand->mu_config);
           gpr_mu_unlock(&calld->mu_state);
         }
       }
       break;
-    case CALL_WAITING_FOR_CONFIG:
-    case CALL_WAITING_FOR_PICK:
-      if (op->cancel_with_status != GRPC_STATUS_OK) {
-        calld->state = CALL_CANCELLED;
-        gpr_mu_unlock(&calld->mu_state);
-        handle_op_after_cancellation(elem, op);
-      } else {
-        GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
-                   (op->send_ops == NULL));
-        GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
-                   (op->recv_ops == NULL));
-        if (op->send_ops != NULL) {
-          calld->s.waiting_op.send_ops = op->send_ops;
-          calld->s.waiting_op.is_last_send = op->is_last_send;
-          calld->s.waiting_op.on_done_send = op->on_done_send;
-        }
-        if (op->recv_ops != NULL) {
-          calld->s.waiting_op.recv_ops = op->recv_ops;
-          calld->s.waiting_op.recv_state = op->recv_state;
-          calld->s.waiting_op.on_done_recv = op->on_done_recv;
-        }
-        gpr_mu_unlock(&calld->mu_state);
-        if (op->on_consumed != NULL) {
-          op->on_consumed->cb(op->on_consumed->cb_arg, 0);
-        }
-      }
-      break;
   }
+}
 
-
-
-
-#if 0
-  gpr_mu_lock(&chand->mu);
-  switch (calld->state) {
-    case CALL_ACTIVE:
-      child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
-      gpr_mu_unlock(&chand->mu);
-      child_elem->filter->start_transport_op(child_elem, op);
-      break;
-    case CALL_CREATED:
-      if (op->cancel_with_status != GRPC_STATUS_OK) {
-        calld->state = CALL_CANCELLED;
-        gpr_mu_unlock(&chand->mu);
-        handle_op_after_cancellation(elem, op);
-      } else {
-        calld->state = CALL_WAITING;
-        calld->s.waiting_op.bind_pollset = NULL;
-        if (chand->active_child) {
-          /* channel is connected - use the connected stack */
-          if (prepare_activate(elem, chand->active_child)) {
-            gpr_mu_unlock(&chand->mu);
-            /* activate the request (pass it down) outside the lock */
-            complete_activate(elem, op);
-          } else {
-            gpr_mu_unlock(&chand->mu);
-          }
-        } else {
-          /* check to see if we should initiate a connection (if we're not
-             already),
-             but don't do so until outside the lock to avoid re-entrancy
-             problems if
-             the callback is immediate */
-          int initiate_transport_setup = 0;
-          if (!chand->transport_setup_initiated) {
-            chand->transport_setup_initiated = 1;
-            initiate_transport_setup = 1;
-          }
-          /* add this call to the waiting set to be resumed once we have a child
-             channel stack, growing the waiting set if needed */
-          if (chand->waiting_child_count == chand->waiting_child_capacity) {
-            chand->waiting_child_capacity =
-                GPR_MAX(chand->waiting_child_capacity * 2, 8);
-            chand->waiting_children = gpr_realloc(
-                chand->waiting_children,
-                chand->waiting_child_capacity * sizeof(call_data *));
-          }
-          calld->s.waiting_op = *op;
-          chand->waiting_children[chand->waiting_child_count++] = calld;
-          grpc_transport_setup_add_interested_party(chand->transport_setup,
-                                                    op->bind_pollset);
-          gpr_mu_unlock(&chand->mu);
-
-          /* finally initiate transport setup if needed */
-          if (initiate_transport_setup) {
-            grpc_transport_setup_initiate(chand->transport_setup);
-          }
-        }
-      }
-      break;
-    case CALL_WAITING:
-      if (op->cancel_with_status != GRPC_STATUS_OK) {
-        waiting_op = calld->s.waiting_op;
-        remove_waiting_child(chand, calld);
-        calld->state = CALL_CANCELLED;
-        gpr_mu_unlock(&chand->mu);
-        handle_op_after_cancellation(elem, &waiting_op);
-        handle_op_after_cancellation(elem, op);
-      } else {
-        GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
-                   (op->send_ops == NULL));
-        GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
-                   (op->recv_ops == NULL));
-        if (op->send_ops) {
-          calld->s.waiting_op.send_ops = op->send_ops;
-          calld->s.waiting_op.is_last_send = op->is_last_send;
-          calld->s.waiting_op.on_done_send = op->on_done_send;
-        }
-        if (op->recv_ops) {
-          calld->s.waiting_op.recv_ops = op->recv_ops;
-          calld->s.waiting_op.recv_state = op->recv_state;
-          calld->s.waiting_op.on_done_recv = op->on_done_recv;
-        }
-        gpr_mu_unlock(&chand->mu);
-        if (op->on_consumed) {
-          op->on_consumed->cb(op->on_consumed->cb_arg, 0);
-        }
-      }
-      break;
-    case CALL_CANCELLED:
-      gpr_mu_unlock(&chand->mu);
-      handle_op_after_cancellation(elem, op);
-      break;
-  }
-#endif  
+static void cc_start_transport_stream_op(grpc_call_element *elem,
+                                  grpc_transport_stream_op *op) {
+  perform_transport_stream_op(elem, op, 0);
 }
 
 static void update_state_locked(channel_data *chand) {
-
+  gpr_log(GPR_ERROR, "update_state_locked not implemented");
 }
 
 static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -382,9 +388,10 @@
   if (chand->incoming_configuration) {
     lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
     grpc_lb_policy_ref(lb_policy);
+
+    grpc_client_config_unref(chand->incoming_configuration);
   }
 
-  grpc_client_config_unref(chand->incoming_configuration);
   chand->incoming_configuration = NULL;
 
   gpr_mu_lock(&chand->mu_config);
@@ -402,7 +409,9 @@
     wakeup_closures = next;
   }
 
-  grpc_lb_policy_unref(old_lb_policy);
+  if (old_lb_policy) {
+    grpc_lb_policy_unref(old_lb_policy);
+  }
 
   if (iomgr_success) {
     grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
@@ -511,6 +520,7 @@
 
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
   GPR_ASSERT(server_transport_data == NULL);
+  gpr_mu_init(&calld->mu_state);
   calld->elem = elem;
   calld->state = CALL_CREATED;
   calld->deadline = gpr_inf_future;
@@ -527,7 +537,7 @@
   gpr_mu_lock(&calld->mu_state);
   switch (calld->state) {
     case CALL_ACTIVE:
-      subchannel_call = calld->s.active.subchannel_call;
+      subchannel_call = calld->subchannel_call;
       gpr_mu_unlock(&calld->mu_state);
       grpc_subchannel_call_unref(subchannel_call);
       break;
@@ -537,6 +547,7 @@
       break;
     case CALL_WAITING_FOR_PICK:
     case CALL_WAITING_FOR_CONFIG:
+    case CALL_WAITING_FOR_CALL:
       gpr_log(GPR_ERROR, "should never reach here");
       abort();
       break;
@@ -550,12 +561,12 @@
                               int is_last) {
   channel_data *chand = elem->channel_data;
 
-  GPR_ASSERT(!is_first);
+  memset(chand, 0, sizeof(*chand));
+
   GPR_ASSERT(is_last);
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 
   gpr_mu_init(&chand->mu_config);
-  chand->resolver = NULL;
   chand->mdctx = metadata_context;
   grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
 }
@@ -633,7 +644,7 @@
   call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
 
   for (i = 0; i < waiting_child_count; i++) {
-    call_ops[i] = waiting_children[i]->s.waiting_op;
+    call_ops[i] = waiting_children[i]->waiting_op;
     if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
       waiting_children[i] = NULL;
       grpc_transport_stream_op_finish_with_failure(&call_ops[i]);