Client connectivity API

Initial plumbing work; needs tests and more client_channel
implementation.
diff --git a/BUILD b/BUILD
index 8366fa1..51c76aa 100644
--- a/BUILD
+++ b/BUILD
@@ -340,6 +340,7 @@
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",
@@ -571,6 +572,7 @@
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",
@@ -1045,6 +1047,7 @@
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",
diff --git a/Makefile b/Makefile
index 695f000..712b129 100644
--- a/Makefile
+++ b/Makefile
@@ -3340,6 +3340,7 @@
     src/core/surface/call_details.c \
     src/core/surface/call_log_batch.c \
     src/core/surface/channel.c \
+    src/core/surface/channel_connectivity.c \
     src/core/surface/channel_create.c \
     src/core/surface/completion_queue.c \
     src/core/surface/event_string.c \
@@ -3601,6 +3602,7 @@
     src/core/surface/call_details.c \
     src/core/surface/call_log_batch.c \
     src/core/surface/channel.c \
+    src/core/surface/channel_connectivity.c \
     src/core/surface/channel_create.c \
     src/core/surface/completion_queue.c \
     src/core/surface/event_string.c \
diff --git a/build.json b/build.json
index e21efde..224aef4 100644
--- a/build.json
+++ b/build.json
@@ -277,6 +277,7 @@
         "src/core/surface/call_details.c",
         "src/core/surface/call_log_batch.c",
         "src/core/surface/channel.c",
+        "src/core/surface/channel_connectivity.c",
         "src/core/surface/channel_create.c",
         "src/core/surface/completion_queue.c",
         "src/core/surface/event_string.c",
diff --git a/gRPC.podspec b/gRPC.podspec
index 34fbccf..0dab803 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -349,6 +349,7 @@
                       'src/core/surface/call_details.c',
                       'src/core/surface/call_log_batch.c',
                       'src/core/surface/channel.c',
+                      'src/core/surface/channel_connectivity.c',
                       'src/core/surface/channel_create.c',
                       'src/core/surface/completion_queue.c',
                       'src/core/surface/event_string.c',
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 3c72c1d..33d534d 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -392,6 +392,25 @@
    drained and no threads are executing grpc_completion_queue_next */
 void grpc_completion_queue_destroy(grpc_completion_queue *cq);
 
+/** Check the connectivity state of a channel. */
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+    grpc_channel *channel, int try_to_connect);
+
+/** Watch for a change in connectivity state.
+    Once the channel connectivity state is different from last_observed_state,
+    tag will be enqueued on cq with success=1.
+    If deadline expires BEFORE the state is changed, tag will be enqueued on cq
+    with success=0.
+    If optional_new_state is non-NULL, it will be set to the newly observed
+   connectivity
+    state of the channel at the same point as tag is enqueued onto the
+   completion
+    queue. */
+void grpc_channel_watch_connectivity_state(
+    grpc_channel *channel, grpc_connectivity_state last_observed_state,
+    grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
+    grpc_completion_queue *cq, void *tag);
+
 /* Create a call given a grpc_channel, in order to call 'method'. All
    completions are sent to 'completion_queue'. 'method' and 'host' need only
    live through the invocation of this function. */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index f890f99..b68954b 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -77,6 +77,8 @@
   grpc_iomgr_closure on_config_changed;
   /** connectivity state being tracked */
   grpc_connectivity_state_tracker state_tracker;
+  /** when an lb_policy arrives, should we try to exit idle */
+  int exit_idle_when_lb_policy_arrives;
 } channel_data;
 
 typedef enum {
@@ -398,6 +400,7 @@
   grpc_lb_policy *old_lb_policy;
   grpc_resolver *old_resolver;
   grpc_iomgr_closure *wakeup_closures = NULL;
+  int exit_idle = 0;
 
   if (chand->incoming_configuration != NULL) {
     lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
@@ -415,8 +418,18 @@
     wakeup_closures = chand->waiting_for_config_closures;
     chand->waiting_for_config_closures = NULL;
   }
+  if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+    GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+    exit_idle = 1;
+    chand->exit_idle_when_lb_policy_arrives = 0;
+  }
   gpr_mu_unlock(&chand->mu_config);
 
+  if (exit_idle) {
+    grpc_lb_policy_exit_idle(lb_policy);
+    GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+  }
+
   if (old_lb_policy) {
     GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
   }
@@ -609,3 +622,30 @@
   grpc_resolver_next(resolver, &chand->incoming_configuration,
                      &chand->on_config_changed);
 }
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+    grpc_channel_element *elem, int try_to_connect) {
+  channel_data *chand = elem->channel_data;
+  grpc_connectivity_state out;
+  gpr_mu_lock(&chand->mu_config);
+  out = grpc_connectivity_state_check(&chand->state_tracker);
+  if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+    if (chand->lb_policy != NULL) {
+      grpc_lb_policy_exit_idle(chand->lb_policy);
+    } else {
+      chand->exit_idle_when_lb_policy_arrives = 1;
+    }
+  }
+  gpr_mu_unlock(&chand->mu_config);
+  return out;
+}
+
+void grpc_client_channel_watch_connectivity_state(
+    grpc_channel_element *elem, grpc_connectivity_state *state,
+    grpc_iomgr_closure *on_complete) {
+  channel_data *chand = elem->channel_data;
+  gpr_mu_lock(&chand->mu_config);
+  grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+                                                 on_complete);
+  gpr_mu_unlock(&chand->mu_config);
+}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index fd2be46..16f1133 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -52,4 +52,11 @@
 void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
                                       grpc_resolver *resolver);
 
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+    grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_channel_watch_connectivity_state(
+    grpc_channel_element *elem, grpc_connectivity_state *state,
+    grpc_iomgr_closure *on_complete);
+
 #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 73da624..4f4c7eb 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -97,6 +97,25 @@
   gpr_mu_unlock(&p->mu);
 }
 
+static void start_picking(pick_first_lb_policy *p) {
+  p->started_picking = 1;
+  p->checking_subchannel = 0;
+  p->checking_connectivity = GRPC_CHANNEL_IDLE;
+  GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
+  grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+                                         &p->checking_connectivity,
+                                         &p->connectivity_changed);
+}
+
+void pf_exit_idle(grpc_lb_policy *pol) {
+  pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+  gpr_mu_lock(&p->mu);
+  if (!p->started_picking) {
+    start_picking(p);
+  }
+  gpr_mu_unlock(&p->mu);
+}
+
 void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
              grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
              grpc_iomgr_closure *on_complete) {
@@ -109,13 +128,7 @@
     on_complete->cb(on_complete->cb_arg, 1);
   } else {
     if (!p->started_picking) {
-      p->started_picking = 1;
-      p->checking_subchannel = 0;
-      p->checking_connectivity = GRPC_CHANNEL_IDLE;
-      GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
-      grpc_subchannel_notify_on_state_change(
-          p->subchannels[p->checking_subchannel], &p->checking_connectivity,
-          &p->connectivity_changed);
+      start_picking(p);
     }
     grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
                                          pollset);
@@ -249,8 +262,13 @@
 }
 
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
-    pf_destroy,   pf_shutdown,           pf_pick,
-    pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+    pf_destroy,
+    pf_shutdown,
+    pf_pick,
+    pf_exit_idle,
+    pf_broadcast,
+    pf_check_connectivity,
+    pf_notify_on_state_change};
 
 grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
                                                  size_t num_subchannels) {
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 6d1c788..9d5baf8 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -77,3 +77,7 @@
 void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
   policy->vtable->broadcast(policy, op);
 }
+
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
+  policy->vtable->exit_idle(policy);
+}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index a468f76..363faf3 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -59,6 +59,9 @@
                grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
                grpc_iomgr_closure *on_complete);
 
+  /** try to enter a READY connectivity state */
+  void (*exit_idle)(grpc_lb_policy *policy);
+
   /** broadcast a transport op to all subchannels */
   void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
 
@@ -106,4 +109,6 @@
 
 void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
 
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
+
 #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
new file mode 100644
index 0000000..bdd9678
--- /dev/null
+++ b/src/core/surface/channel_connectivity.c
@@ -0,0 +1,182 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/channel/client_channel.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/surface/completion_queue.h"
+
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+    grpc_channel *channel, int try_to_connect) {
+  /* forward through to the underlying client channel */
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  if (client_channel_elem->filter != &grpc_client_channel_filter) {
+    gpr_log(GPR_ERROR,
+            "grpc_channel_check_connectivity_state called on something that is "
+            "not a client channel, but '%s'",
+            client_channel_elem->filter->name);
+    return GRPC_CHANNEL_FATAL_FAILURE;
+  }
+  return grpc_client_channel_check_connectivity_state(client_channel_elem,
+                                                      try_to_connect);
+}
+
+typedef enum {
+  WAITING,
+  CALLING_BACK,
+  CALLING_BACK_AND_FINISHED,
+  CALLED_BACK
+} callback_phase;
+
+typedef struct {
+  gpr_mu mu;
+  callback_phase phase;
+  int success;
+  grpc_iomgr_closure on_complete;
+  grpc_alarm alarm;
+  grpc_connectivity_state state;
+  grpc_connectivity_state *optional_new_state;
+  grpc_completion_queue *cq;
+  grpc_cq_completion completion_storage;
+  void *tag;
+} state_watcher;
+
+static void delete_state_watcher(state_watcher *w) {
+  gpr_mu_destroy(&w->mu);
+  gpr_free(w);
+}
+
+static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+  int delete = 0;
+  state_watcher *w = pw;
+  gpr_mu_lock(&w->mu);
+  switch (w->phase) {
+    case WAITING:
+    case CALLED_BACK:
+      gpr_log(GPR_ERROR, "should never reach here");
+      abort();
+      break;
+    case CALLING_BACK:
+      w->phase = CALLED_BACK;
+      break;
+    case CALLING_BACK_AND_FINISHED:
+      delete = 1;
+      break;
+  }
+  gpr_mu_unlock(&w->mu);
+
+  if (delete) {
+    delete_state_watcher(w);
+  }
+}
+
+static void partly_done(state_watcher *w, int due_to_completion) {
+  int delete = 0;
+
+  if (due_to_completion) {
+    gpr_mu_lock(&w->mu);
+    w->success = 1;
+    gpr_mu_unlock(&w->mu);
+    grpc_alarm_cancel(&w->alarm);
+  }
+
+  gpr_mu_lock(&w->mu);
+  switch (w->phase) {
+    case WAITING:
+      w->phase = CALLING_BACK;
+      if (w->optional_new_state) {
+        *w->optional_new_state = w->state;
+      }
+      grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
+                     &w->completion_storage);
+      break;
+    case CALLING_BACK:
+      w->phase = CALLING_BACK_AND_FINISHED;
+      break;
+    case CALLING_BACK_AND_FINISHED:
+      gpr_log(GPR_ERROR, "should never reach here");
+      abort();
+      break;
+    case CALLED_BACK:
+      delete = 1;
+      break;
+  }
+  gpr_mu_unlock(&w->mu);
+
+  if (delete) {
+    delete_state_watcher(w);
+  }
+}
+
+static void watch_complete(void *pw, int success) { partly_done(pw, 0); }
+
+static void timeout_complete(void *pw, int success) { partly_done(pw, 1); }
+
+void grpc_channel_watch_connectivity_state(
+    grpc_channel *channel, grpc_connectivity_state last_observed_state,
+    grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
+    grpc_completion_queue *cq, void *tag) {
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  state_watcher *w = gpr_malloc(sizeof(*w));
+
+  grpc_cq_begin_op(cq);
+
+  gpr_mu_init(&w->mu);
+  grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
+  w->phase = WAITING;
+  w->state = last_observed_state;
+  w->success = 0;
+  w->optional_new_state = optional_new_state;
+  w->cq = cq;
+  w->tag = tag;
+
+  grpc_alarm_init(&w->alarm, deadline, timeout_complete, w,
+                  gpr_now(GPR_CLOCK_REALTIME));
+
+  if (client_channel_elem->filter != &grpc_client_channel_filter) {
+    gpr_log(GPR_ERROR,
+            "grpc_channel_watch_connectivity_state called on something that is "
+            "not a client channel, but '%s'",
+            client_channel_elem->filter->name);
+    grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+  } else {
+    grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
+                                                 &w->on_complete);
+  }
+}
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index a658f1a..a0c9bf3 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -975,6 +975,7 @@
 src/core/surface/call_details.c \
 src/core/surface/call_log_batch.c \
 src/core/surface/channel.c \
+src/core/surface/channel_connectivity.c \
 src/core/surface/channel_create.c \
 src/core/surface/completion_queue.c \
 src/core/surface/event_string.c \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 8daee3c..280b19a 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -9023,6 +9023,7 @@
       "src/core/surface/call_log_batch.c", 
       "src/core/surface/channel.c", 
       "src/core/surface/channel.h", 
+      "src/core/surface/channel_connectivity.c", 
       "src/core/surface/channel_create.c", 
       "src/core/surface/completion_queue.c", 
       "src/core/surface/completion_queue.h", 
@@ -9426,6 +9427,7 @@
       "src/core/surface/call_log_batch.c", 
       "src/core/surface/channel.c", 
       "src/core/surface/channel.h", 
+      "src/core/surface/channel_connectivity.c", 
       "src/core/surface/channel_create.c", 
       "src/core/surface/completion_queue.c", 
       "src/core/surface/completion_queue.h", 
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index 16744b1..ff53707 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -461,6 +461,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\completion_queue.c">
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index de9f205..6e1a699 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -286,6 +286,9 @@
     <ClCompile Include="..\..\src\core\surface\channel.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index 02c791f..6ad6d5c 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -396,6 +396,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\completion_queue.c">
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 333a71f..0b47ef6 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -217,6 +217,9 @@
     <ClCompile Include="..\..\src\core\surface\channel.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>