Add ChannelConnectivityWatcher
diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h
index c50091d..73f28a1 100644
--- a/include/grpc++/channel.h
+++ b/include/grpc++/channel.h
@@ -30,6 +30,10 @@
struct grpc_channel;
namespace grpc {
+class ChannelConnectivityWatcher;
+}
+
+namespace grpc {
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
public CallHook,
@@ -71,6 +75,7 @@
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
+ std::unique_ptr<ChannelConnectivityWatcher> connectivity_watcher_;
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb0..27bd75f 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -35,17 +35,77 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
#include "src/core/lib/profiling/timers.h"
namespace grpc {
+namespace {
+void WatchStateChange(void* arg);
+} // namespace
+
+// Constantly watches channel connectivity status to reconnect a transiently
+// disconnected channel. This is a temporary work-around before we have retry
+// support.
+class ChannelConnectivityWatcher {
+ public:
+ ChannelConnectivityWatcher(Channel* channel)
+ : channel_(channel), thd_id_(0), being_destroyed_(0) {}
+
+ void WatchStateChangeImpl() {
+ grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
+ while (state != GRPC_CHANNEL_SHUTDOWN) {
+ if (gpr_atm_no_barrier_load(&being_destroyed_) == 1) {
+ break;
+ }
+ channel_->WaitForStateChange(
+ state, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)));
+ state = channel_->GetState(false);
+ }
+ }
+ void StartWatching() {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ void Destroy() {
+ if (thd_id_ != 0) {
+ gpr_atm_no_barrier_store(&being_destroyed_, 1);
+ gpr_thd_join(thd_id_);
+ }
+ }
+
+ private:
+ Channel* channel_;
+ gpr_thd_id thd_id_;
+ gpr_atm being_destroyed_;
+};
+
+namespace {
+void WatchStateChange(void* arg) {
+ ChannelConnectivityWatcher* watcher =
+ static_cast<ChannelConnectivityWatcher*>(arg);
+ watcher->WatchStateChangeImpl();
+}
+} // namespace
+
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
- : host_(host), c_channel_(channel) {
+ : connectivity_watcher_(new ChannelConnectivityWatcher(this)),
+ host_(host),
+ c_channel_(channel) {
g_gli_initializer.summon();
+ if (host != "inproc") {
+ connectivity_watcher_->StartWatching();
+ }
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ connectivity_watcher_->Destroy();
+ grpc_channel_destroy(c_channel_);
+}
namespace {