Share one monitoring thread between channels
diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h
index 3849240..c50091d 100644
--- a/include/grpc++/channel.h
+++ b/include/grpc++/channel.h
@@ -30,8 +30,6 @@
 struct grpc_channel;
 
 namespace grpc {
-class ChannelConnectivityWatcher;
-
 /// Channels represent a connection to an endpoint. Created by \a CreateChannel.
 class Channel final : public ChannelInterface,
                       public CallHook,
@@ -73,7 +71,6 @@
   bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
                               gpr_timespec deadline) override;
 
-  std::unique_ptr<ChannelConnectivityWatcher> connectivity_watcher_;
   const grpc::string host_;
   grpc_channel* const c_channel_;  // owned
 };
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index ffe88df..f06d25b 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -43,8 +43,23 @@
 namespace grpc {
 
 namespace {
-int kConnectivityCheckIntervalMsec = 100;
+int kConnectivityCheckIntervalMsec = 500;
 void WatchStateChange(void* arg);
+
+class TagSaver final : public CompletionQueueTag {
+ public:
+  explicit TagSaver(void* tag) : tag_(tag) {}
+  ~TagSaver() override {}
+  bool FinalizeResult(void** tag, bool* status) override {
+    *tag = tag_;
+    delete this;
+    return true;
+  }
+
+ private:
+  void* tag_;
+};
+
 }  // namespace
 
 // Constantly watches channel connectivity status to reconnect a transiently
@@ -52,55 +67,80 @@
 // support.
 class ChannelConnectivityWatcher {
  public:
-  explicit ChannelConnectivityWatcher(Channel* channel)
-      : channel_(channel), thd_id_(0) {}
-
-  void WatchStateChangeImpl() {
-    grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
-    bool ok = false;
-    void* tag = NULL;
-    while (state != GRPC_CHANNEL_SHUTDOWN) {
-      channel_->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_REALTIME),
-                                    &cq_, NULL);
-      while (cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)) ==
-             CompletionQueue::TIMEOUT) {
-        gpr_sleep_until(
-            gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
-                         gpr_time_from_micros(kConnectivityCheckIntervalMsec,
-                                              GPR_TIMESPAN)));
-      }
-      state = channel_->GetState(false);
-    }
+  ChannelConnectivityWatcher() {
+    gpr_thd_options options = gpr_thd_options_default();
+    gpr_thd_options_set_joinable(&options);
+    gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
   }
 
-  void StartWatching() {
-    const char* disabled_str =
-        std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
-    if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
-      // This NotifyOnstateChange() is not used to monitor the channel state
-      // change, but to hold a reference of the c channel. So that
-      // WatchStateChangeImpl() can observe state == GRPC_CHANNEL_SHUTDOWN
-      // without holding any lock on the channel object.
-      channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE,
-                                    gpr_inf_future(GPR_CLOCK_REALTIME),
-                                    &shutdown_cq_, NULL);
-      gpr_thd_options options = gpr_thd_options_default();
-      gpr_thd_options_set_joinable(&options);
-      gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
-    }
-  }
-
-  void Destroy() {
+  ~ChannelConnectivityWatcher() {
+    cq_.Shutdown();
     if (thd_id_ != 0) {
       gpr_thd_join(thd_id_);
     }
+  }
+
+  void WatchStateChangeImpl() {
     bool ok = false;
     void* tag = NULL;
-    shutdown_cq_.Next(&tag, &ok);
+    CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
+    while (status != CompletionQueue::SHUTDOWN) {
+      status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+      // Make sure we've seen 2 TIMEOUTs before going to sleep
+      if (status == CompletionQueue::TIMEOUT) {
+        status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+      }
+      if (status == CompletionQueue::TIMEOUT) {
+        gpr_sleep_until(
+            gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+                         gpr_time_from_millis(kConnectivityCheckIntervalMsec,
+                                              GPR_TIMESPAN)));
+      } else if (status == CompletionQueue::GOT_EVENT) {
+        ChannelState* channel_state = static_cast<ChannelState*>(tag);
+        channel_state->state = grpc_channel_check_connectivity_state(
+            channel_state->channel, false);
+        if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+          void* shutdown_tag = NULL;
+          channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+          delete channel_state;
+        } else {
+          TagSaver* tag_saver = new TagSaver(channel_state);
+          grpc_channel_watch_connectivity_state(
+              channel_state->channel, channel_state->state,
+              gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+        }
+      }
+    }
+  }
+
+  void StartWatching(grpc_channel* channel) {
+    const char* disabled_str =
+        std::getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
+    if (disabled_str == nullptr || strcmp(disabled_str, "1")) {
+      ChannelState* channel_state = new ChannelState(channel);
+      // The first grpc_channel_watch_connectivity_state() is not used to
+      // monitor the channel state change, but to hold a reference of the
+      // c channel. So that WatchStateChangeImpl() can observe state ==
+      // GRPC_CHANNEL_SHUTDOWN without holding any lock on the channel object.
+      grpc_channel_watch_connectivity_state(
+          channel_state->channel, channel_state->state,
+          gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
+          new TagSaver(nullptr));
+      grpc_channel_watch_connectivity_state(
+          channel_state->channel, channel_state->state,
+          gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
+          new TagSaver(channel_state));
+    }
   }
 
  private:
-  Channel* channel_;
+  struct ChannelState {
+    explicit ChannelState(grpc_channel* channel)
+        : channel(channel), state(GRPC_CHANNEL_IDLE){};
+    grpc_channel* channel;
+    grpc_connectivity_state state;
+    CompletionQueue shutdown_cq;
+  };
   gpr_thd_id thd_id_;
   CompletionQueue cq_;
   CompletionQueue shutdown_cq_;
@@ -112,22 +152,21 @@
       static_cast<ChannelConnectivityWatcher*>(arg);
   watcher->WatchStateChangeImpl();
 }
+
+ChannelConnectivityWatcher channel_connectivity_watcher;
 }  // namespace
 
 static internal::GrpcLibraryInitializer g_gli_initializer;
 Channel::Channel(const grpc::string& host, grpc_channel* channel)
-    : connectivity_watcher_(new ChannelConnectivityWatcher(this)),
-      host_(host),
-      c_channel_(channel) {
+    : host_(host), c_channel_(channel) {
   g_gli_initializer.summon();
   if (grpc_channel_support_connectivity_watcher(channel)) {
-    connectivity_watcher_->StartWatching();
+    channel_connectivity_watcher.StartWatching(channel);
   }
 }
 
 Channel::~Channel() {
   grpc_channel_destroy(c_channel_);
-  connectivity_watcher_->Destroy();
 }
 
 namespace {
@@ -213,23 +252,6 @@
   return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
 }
 
-namespace {
-class TagSaver final : public CompletionQueueTag {
- public:
-  explicit TagSaver(void* tag) : tag_(tag) {}
-  ~TagSaver() override {}
-  bool FinalizeResult(void** tag, bool* status) override {
-    *tag = tag_;
-    delete this;
-    return true;
-  }
-
- private:
-  void* tag_;
-};
-
-}  // namespace
-
 void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
                                       gpr_timespec deadline,
                                       CompletionQueue* cq, void* tag) {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index f316dd0..9954f9f 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -704,10 +704,10 @@
   ResetStub();
   SendRpc(stub_.get(), 1, false);
   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
-  // It needs more than 2 * kConnectivityCheckIntervalMsec time to reconnect
-  // the channel
+  // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+  // channel.
   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
-                               gpr_time_from_millis(210, GPR_TIMESPAN)));
+                               gpr_time_from_millis(510, GPR_TIMESPAN)));
   SendRpc(stub_.get(), 1, false);
 }