Merge pull request #11054 from apolcyn/ruby_dont_use_timeouts

fixes to connectivity state api and add in constant state watches
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb
index d3e5373..bed8c43 100755
--- a/src/ruby/end2end/channel_closing_driver.rb
+++ b/src/ruby/end2end/channel_closing_driver.rb
@@ -61,6 +61,11 @@
       'channel is closed while connectivity is watched'
   end
 
+  client_exit_code = $CHILD_STATUS
+  if client_exit_code != 0
+    fail "channel closing client failed, exit code #{client_exit_code}"
+  end
+
   server_runner.stop
 end
 
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index 80fb628..9910076 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -58,6 +58,9 @@
            'It likely hangs when ended abruptly'
   end
 
+  # The interrupt in the child process should cause it to
+  # exit a non-zero status, so don't check it here.
+  # This test mainly tries to catch deadlock.
   server_runner.stop
 end
 
diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb
index ee79292..e73ca76 100755
--- a/src/ruby/end2end/grpc_class_init_client.rb
+++ b/src/ruby/end2end/grpc_class_init_client.rb
@@ -34,44 +34,110 @@
 
 require_relative './end2end_common'
 
-def main
-  grpc_class = ''
-  OptionParser.new do |opts|
-    opts.on('--grpc_class=P', String) do |p|
-      grpc_class = p
+def construct_many(test_proc)
+  thds = []
+  4.times do
+    thds << Thread.new do
+      20.times do
+        test_proc.call
+      end
     end
-  end.parse!
+  end
+  20.times do
+    test_proc.call
+  end
+  thds.each(&:join)
+end
 
-  test_proc = nil
+def run_gc_stress_test(test_proc)
+  GC.disable
+  construct_many(test_proc)
 
+  GC.enable
+  construct_many(test_proc)
+
+  GC.start(full_mark: true, immediate_sweep: true)
+  construct_many(test_proc)
+end
+
+def run_concurrency_stress_test(test_proc)
+  100.times do
+    Thread.new do
+      test_proc.call
+    end
+  end
+
+  test_proc.call
+
+  fail 'exception thrown while child thread initing class'
+end
+
+# default (no gc_stress and no concurrency_stress)
+def run_default_test(test_proc)
+  thd = Thread.new do
+    test_proc.call
+  end
+  test_proc.call
+  thd.join
+end
+
+def get_test_proc(grpc_class)
   case grpc_class
   when 'channel'
-    test_proc = proc do
+    return proc do
       GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
     end
   when 'server'
-    test_proc = proc do
+    return proc do
       GRPC::Core::Server.new({})
     end
   when 'channel_credentials'
-    test_proc = proc do
+    return proc do
       GRPC::Core::ChannelCredentials.new
     end
   when 'call_credentials'
-    test_proc = proc do
+    return proc do
       GRPC::Core::CallCredentials.new(proc { |noop| noop })
     end
   when 'compression_options'
-    test_proc = proc do
+    return proc do
       GRPC::Core::CompressionOptions.new
     end
   else
     fail "bad --grpc_class=#{grpc_class} param"
   end
+end
 
-  th = Thread.new { test_proc.call }
-  test_proc.call
-  th.join
+def main
+  grpc_class = ''
+  stress_test = ''
+  OptionParser.new do |opts|
+    opts.on('--grpc_class=P', String) do |p|
+      grpc_class = p
+    end
+    opts.on('--stress_test=P') do |p|
+      stress_test = p
+    end
+  end.parse!
+
+  test_proc = get_test_proc(grpc_class)
+
+  # the different test configs need to be ran
+  # in separate processes, since each one tests
+  # clean shutdown in a different way
+  case stress_test
+  when 'gc'
+    p 'run gc stress'
+    run_gc_stress_test(test_proc)
+  when 'concurrency'
+    p 'run concurrency stress'
+    run_concurrency_stress_test(test_proc)
+  when ''
+    p 'run default'
+    run_default_test(test_proc)
+  else
+    fail "bad --stress_test=#{stress_test} param"
+  end
 end
 
 main
diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb
index 764d029..c65ed54 100755
--- a/src/ruby/end2end/grpc_class_init_driver.rb
+++ b/src/ruby/end2end/grpc_class_init_driver.rb
@@ -38,29 +38,40 @@
                             call_credentials
                             compression_options )
 
-  native_grpc_classes.each do |grpc_class|
-    STDERR.puts 'start client'
-    this_dir = File.expand_path(File.dirname(__FILE__))
-    client_path = File.join(this_dir, 'grpc_class_init_client.rb')
-    client_pid = Process.spawn(RbConfig.ruby,
-                               client_path,
-                               "--grpc_class=#{grpc_class}")
-    begin
-      Timeout.timeout(10) do
-        Process.wait(client_pid)
-      end
-    rescue Timeout::Error
-      STDERR.puts "timeout waiting for client pid #{client_pid}"
-      Process.kill('SIGKILL', client_pid)
-      Process.wait(client_pid)
-      STDERR.puts 'killed client child'
-      raise 'Timed out waiting for client process. ' \
-        'It likely hangs when the first constructed gRPC object has ' \
-        "type: #{grpc_class}"
-    end
+  # there is room for false positives in this test,
+  # do a few runs for each config
+  4.times do
+    native_grpc_classes.each do |grpc_class|
+      ['', 'gc', 'concurrency'].each do |stress_test_type|
+        STDERR.puts 'start client'
+        this_dir = File.expand_path(File.dirname(__FILE__))
+        client_path = File.join(this_dir, 'grpc_class_init_client.rb')
+        client_pid = Process.spawn(RbConfig.ruby,
+                                   client_path,
+                                   "--grpc_class=#{grpc_class}",
+                                   "--stress_test=#{stress_test_type}")
+        begin
+          Timeout.timeout(10) do
+            Process.wait(client_pid)
+          end
+        rescue Timeout::Error
+          STDERR.puts "timeout waiting for client pid #{client_pid}"
+          Process.kill('SIGKILL', client_pid)
+          Process.wait(client_pid)
+          STDERR.puts 'killed client child'
+          raise 'Timed out waiting for client process. ' \
+            'It likely hangs when the first constructed gRPC object has ' \
+            "type: #{grpc_class}"
+        end
 
-    client_exit_code = $CHILD_STATUS
-    fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
+        client_exit_code = $CHILD_STATUS
+        # concurrency stress test type is expected to exit with a
+        # non-zero status due to an exception being raised
+        if client_exit_code != 0 && stress_test_type != 'concurrency'
+          fail "client failed, exit code #{client_exit_code}"
+        end
+      end
+    end
   end
 end
 
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
index 389fc5b..0c6a374 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_client.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
@@ -46,6 +46,8 @@
     end
   end.parse!
 
+  trap('SIGINT') { exit 0 }
+
   thd = Thread.new do
     child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
                                                    {},
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
index 670cda0..79a8c13 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
@@ -63,6 +63,11 @@
       'SIGINT is sent while there is an active connectivity_state call'
   end
 
+  client_exit_code = $CHILD_STATUS
+  if client_exit_code != 0
+    fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
+  end
+
   server_runner.stop
 end
 
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index fb610f5..e02dd08 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -34,9 +34,9 @@
 #include <ruby/ruby.h>
 #include <ruby/thread.h>
 
-#include "rb_grpc_imports.generated.h"
 #include "rb_byte_buffer.h"
 #include "rb_channel.h"
+#include "rb_grpc_imports.generated.h"
 
 #include <grpc/grpc.h>
 #include <grpc/grpc_security.h>
@@ -68,29 +68,52 @@
 /* Used during the conversion of a hash to channel args during channel setup */
 static VALUE grpc_rb_cChannelArgs;
 
+typedef struct bg_watched_channel {
+  grpc_channel *channel;
+  // these fields must only be accessed under global_connection_polling_mu
+  struct bg_watched_channel *next;
+  int channel_destroyed;
+  int refcount;
+} bg_watched_channel;
+
 /* grpc_rb_channel wraps a grpc_channel. */
 typedef struct grpc_rb_channel {
   VALUE credentials;
 
-  /* The actual channel */
-  grpc_channel *wrapped;
-  int request_safe_destroy;
-  int safe_to_destroy;
-  grpc_connectivity_state current_connectivity_state;
-
-  int mu_init_done;
-  int abort_watch_connectivity_state;
-  gpr_mu channel_mu;
-  gpr_cv channel_cv;
+  /* The actual channel (protected in a wrapper to tell when it's safe to
+   * destroy) */
+  bg_watched_channel *bg_wrapped;
 } grpc_rb_channel;
 
-/* Forward declarations of functions involved in temporary fix to
- * https://github.com/grpc/grpc/issues/9941 */
+typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type;
+
+typedef struct watch_state_op {
+  watch_state_op_type op_type;
+  // from event.success
+  union {
+    struct {
+      int success;
+      // has been called back due to a cq next call
+      int called_back;
+    } api_callback_args;
+    struct {
+      bg_watched_channel *bg;
+    } continuous_watch_callback_args;
+  } op;
+} watch_state_op;
+
+static bg_watched_channel *bg_watched_channel_list_head = NULL;
+
 static void grpc_rb_channel_try_register_connection_polling(
-    grpc_rb_channel *wrapper);
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
-static void *wait_until_channel_polling_thread_started_no_gil(void*);
-static void wait_until_channel_polling_thread_started_unblocking_func(void*);
+    bg_watched_channel *bg);
+static void *wait_until_channel_polling_thread_started_no_gil(void *);
+static void *channel_init_try_register_connection_polling_without_gil(
+    void *arg);
+
+typedef struct channel_init_try_register_stack {
+  grpc_channel *channel;
+  grpc_rb_channel *wrapper;
+} channel_init_try_register_stack;
 
 static grpc_completion_queue *channel_polling_cq;
 static gpr_mu global_connection_polling_mu;
@@ -98,6 +121,42 @@
 static int abort_channel_polling = 0;
 static int channel_polling_thread_started = 0;
 
+static int bg_watched_channel_list_lookup(bg_watched_channel *bg);
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+    grpc_channel *channel);
+static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
+static void run_poll_channels_loop_unblocking_func(void *arg);
+
+// Needs to be called under global_connection_polling_mu
+static void grpc_rb_channel_watch_connection_state_op_complete(
+    watch_state_op *op, int success) {
+  GPR_ASSERT(!op->op.api_callback_args.called_back);
+  op->op.api_callback_args.called_back = 1;
+  op->op.api_callback_args.success = success;
+  // wake up the watch API call thats waiting on this op
+  gpr_cv_broadcast(&global_connection_polling_cv);
+}
+
+/* Avoids destroying a channel twice. */
+static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
+  gpr_mu_lock(&global_connection_polling_mu);
+  GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+  if (!bg->channel_destroyed) {
+    grpc_channel_destroy(bg->channel);
+    bg->channel_destroyed = 1;
+  }
+  bg->refcount--;
+  if (bg->refcount == 0) {
+    bg_watched_channel_list_free_and_remove(bg);
+  }
+  gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+static void *channel_safe_destroy_without_gil(void *arg) {
+  grpc_rb_channel_safe_destroy((bg_watched_channel *)arg);
+  return NULL;
+}
+
 /* Destroys Channel instances. */
 static void grpc_rb_channel_free(void *p) {
   grpc_rb_channel *ch = NULL;
@@ -106,14 +165,13 @@
   };
   ch = (grpc_rb_channel *)p;
 
-  if (ch->wrapped != NULL) {
-    grpc_rb_channel_safe_destroy(ch);
-    ch->wrapped = NULL;
-  }
-
-  if (ch->mu_init_done) {
-    gpr_mu_destroy(&ch->channel_mu);
-    gpr_cv_destroy(&ch->channel_cv);
+  if (ch->bg_wrapped != NULL) {
+    /* assumption made here: it's ok to directly gpr_mu_lock the global
+     * connection polling mutex becuse we're in a finalizer,
+     * and we can count on this thread to not be interrupted or
+     * yield the gil. */
+    grpc_rb_channel_safe_destroy(ch->bg_wrapped);
+    ch->bg_wrapped = NULL;
   }
 
   xfree(p);
@@ -146,7 +204,7 @@
 /* Allocates grpc_rb_channel instances. */
 static VALUE grpc_rb_channel_alloc(VALUE cls) {
   grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
-  wrapper->wrapped = NULL;
+  wrapper->bg_wrapped = NULL;
   wrapper->credentials = Qnil;
   return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
 }
@@ -168,17 +226,18 @@
   grpc_channel_credentials *creds = NULL;
   char *target_chars = NULL;
   grpc_channel_args args;
+  channel_init_try_register_stack stack;
   MEMZERO(&args, grpc_channel_args, 1);
 
   grpc_ruby_once_init();
-  rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL,
-                             wait_until_channel_polling_thread_started_unblocking_func, NULL);
+  rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil,
+                             NULL, run_poll_channels_loop_unblocking_func,
+                             NULL);
 
   /* "3" == 3 mandatory args */
   rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  wrapper->mu_init_done = 0;
   target_chars = StringValueCStr(target);
   grpc_rb_hash_convert_to_channel_args(channel_args, &args);
   if (TYPE(credentials) == T_SYMBOL) {
@@ -195,24 +254,11 @@
   }
 
   GPR_ASSERT(ch);
-
-  wrapper->wrapped = ch;
-
-  gpr_mu_init(&wrapper->channel_mu);
-  gpr_cv_init(&wrapper->channel_cv);
-  wrapper->mu_init_done = 1;
-
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->abort_watch_connectivity_state = 0;
-  wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
-  wrapper->safe_to_destroy = 0;
-  wrapper->request_safe_destroy = 0;
-
-  gpr_cv_broadcast(&wrapper->channel_cv);
-  gpr_mu_unlock(&wrapper->channel_mu);
-
-
-  grpc_rb_channel_try_register_connection_polling(wrapper);
+  stack.channel = ch;
+  stack.wrapper = wrapper;
+  rb_thread_call_without_gvl(
+      channel_init_try_register_connection_polling_without_gil, &stack, NULL,
+      NULL);
 
   if (args.args != NULL) {
     xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -223,10 +269,35 @@
     return Qnil;
   }
   rb_ivar_set(self, id_target, target);
-  wrapper->wrapped = ch;
   return self;
 }
 
+typedef struct get_state_stack {
+  grpc_channel *channel;
+  int try_to_connect;
+  int out;
+} get_state_stack;
+
+static void *get_state_without_gil(void *arg) {
+  get_state_stack *stack = (get_state_stack *)arg;
+
+  gpr_mu_lock(&global_connection_polling_mu);
+  GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
+  if (abort_channel_polling) {
+    // Assume that this channel has been destroyed by the
+    // background thread.
+    // The case in which the channel polling thread
+    // failed to start just always shows shutdown state.
+    stack->out = GRPC_CHANNEL_SHUTDOWN;
+  } else {
+    stack->out = grpc_channel_check_connectivity_state(stack->channel,
+                                                       stack->try_to_connect);
+  }
+  gpr_mu_unlock(&global_connection_polling_mu);
+
+  return NULL;
+}
+
 /*
   call-seq:
     ch.connectivity_state       -> state
@@ -239,59 +310,60 @@
 static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
                                                     VALUE self) {
   VALUE try_to_connect_param = Qfalse;
-  int grpc_try_to_connect = 0;
   grpc_rb_channel *wrapper = NULL;
-  grpc_channel *ch = NULL;
+  get_state_stack stack;
 
   /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
   rb_scan_args(argc, argv, "01", &try_to_connect_param);
-  grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
-  return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
+
+  stack.channel = wrapper->bg_wrapped->channel;
+  stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
+  rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
+
+  return LONG2NUM(stack.out);
 }
 
 typedef struct watch_state_stack {
-  grpc_rb_channel *wrapper;
+  grpc_channel *channel;
   gpr_timespec deadline;
   int last_state;
 } watch_state_stack;
 
-static void *watch_channel_state_without_gvl(void *arg) {
-  watch_state_stack *stack = (watch_state_stack*)arg;
-  gpr_timespec deadline = stack->deadline;
-  grpc_rb_channel *wrapper = stack->wrapper;
-  int last_state = stack->last_state;
-  void *return_value = (void*)0;
+static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
+  watch_state_stack *stack = (watch_state_stack *)arg;
+  watch_state_op *op = NULL;
+  void *success = (void *)0;
 
-  gpr_mu_lock(&wrapper->channel_mu);
-  while(wrapper->current_connectivity_state == last_state &&
-        !wrapper->request_safe_destroy &&
-        !wrapper->safe_to_destroy &&
-        !wrapper->abort_watch_connectivity_state &&
-        gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
-    gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+  gpr_mu_lock(&global_connection_polling_mu);
+  // its unsafe to do a "watch" after "channel polling abort" because the cq has
+  // been shut down.
+  if (abort_channel_polling) {
+    gpr_mu_unlock(&global_connection_polling_mu);
+    return (void *)0;
   }
-  if (wrapper->current_connectivity_state != last_state) {
-    return_value = (void*)1;
+  op = gpr_zalloc(sizeof(watch_state_op));
+  op->op_type = WATCH_STATE_API;
+  grpc_channel_watch_connectivity_state(stack->channel, stack->last_state,
+                                        stack->deadline, channel_polling_cq,
+                                        op);
+
+  while (!op->op.api_callback_args.called_back) {
+    gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
+                gpr_inf_future(GPR_CLOCK_REALTIME));
   }
-  gpr_mu_unlock(&wrapper->channel_mu);
+  if (op->op.api_callback_args.success) {
+    success = (void *)1;
+  }
+  gpr_free(op);
+  gpr_mu_unlock(&global_connection_polling_mu);
 
-  return return_value;
-}
-
-static void watch_channel_state_unblocking_func(void *arg) {
-  grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->abort_watch_connectivity_state = 1;
-  gpr_cv_broadcast(&wrapper->channel_cv);
-  gpr_mu_unlock(&wrapper->channel_mu);
+  return success;
 }
 
 /* Wait until the channel's connectivity state becomes different from
@@ -306,28 +378,31 @@
                                                       VALUE deadline) {
   grpc_rb_channel *wrapper = NULL;
   watch_state_stack stack;
-  void* out;
+  void *op_success = 0;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
 
-  if (wrapper->wrapped == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
 
   if (!FIXNUM_P(last_state)) {
-    rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
+    rb_raise(
+        rb_eTypeError,
+        "bad type for last_state. want a GRPC::Core::ChannelState constant");
     return Qnil;
   }
 
-  stack.wrapper = wrapper;
-  stack.deadline = grpc_rb_time_timeval(deadline, 0);
+  stack.channel = wrapper->bg_wrapped->channel;
+  stack.deadline = grpc_rb_time_timeval(deadline, 0),
   stack.last_state = NUM2LONG(last_state);
-  out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
-  if (out) {
-    return Qtrue;
-  }
-  return Qfalse;
+
+  op_success = rb_thread_call_without_gvl(
+      wait_for_watch_state_op_complete_without_gvl, &stack,
+      run_poll_channels_loop_unblocking_func, NULL);
+
+  return op_success ? Qtrue : Qfalse;
 }
 
 /* Create a call given a grpc_channel, in order to call method. The request
@@ -339,7 +414,6 @@
   grpc_rb_channel *wrapper = NULL;
   grpc_call *call = NULL;
   grpc_call *parent_call = NULL;
-  grpc_channel *ch = NULL;
   grpc_completion_queue *cq = NULL;
   int flags = GRPC_PROPAGATE_DEFAULTS;
   grpc_slice method_slice;
@@ -361,8 +435,7 @@
 
   cq = grpc_completion_queue_create(NULL);
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
@@ -370,8 +443,8 @@
   method_slice =
       grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
 
-  call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
-                                  host_slice_ptr,
+  call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call,
+                                  flags, cq, method_slice, host_slice_ptr,
                                   grpc_rb_time_timeval(deadline,
                                                        /* absolute time */ 0),
                                   NULL);
@@ -396,15 +469,16 @@
 }
 
 /* Closes the channel, calling it's destroy method */
+/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
+ * this */
 static VALUE grpc_rb_channel_destroy(VALUE self) {
   grpc_rb_channel *wrapper = NULL;
-  grpc_channel *ch = NULL;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch != NULL) {
-    grpc_rb_channel_safe_destroy(wrapper);
-    wrapper->wrapped = NULL;
+  if (wrapper->bg_wrapped != NULL) {
+    rb_thread_call_without_gvl(channel_safe_destroy_without_gil,
+                               wrapper->bg_wrapped, NULL, NULL);
+    wrapper->bg_wrapped = NULL;
   }
 
   return Qnil;
@@ -417,64 +491,114 @@
   char *target = NULL;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  target = grpc_channel_get_target(wrapper->wrapped);
+  target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
   res = rb_str_new2(target);
   gpr_free(target);
 
   return res;
 }
 
-// Either start polling channel connection state or signal that it's free to
-// destroy.
-// Not safe to call while a channel's connection state is polled.
-static void grpc_rb_channel_try_register_connection_polling(
-  grpc_rb_channel *wrapper) {
-  grpc_connectivity_state conn_state;
-  gpr_timespec sleep_time = gpr_time_add(
-      gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
+/* Needs to be called under global_connection_polling_mu */
+static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
+  bg_watched_channel *cur = bg_watched_channel_list_head;
 
-  GPR_ASSERT(wrapper);
-  GPR_ASSERT(wrapper->wrapped);
-  gpr_mu_lock(&wrapper->channel_mu);
-  if (wrapper->request_safe_destroy) {
-    wrapper->safe_to_destroy = 1;
-    gpr_cv_broadcast(&wrapper->channel_cv);
-    gpr_mu_unlock(&wrapper->channel_mu);
-    return;
+  while (cur != NULL) {
+    if (cur == target) {
+      return 1;
+    }
+    cur = cur->next;
   }
-  gpr_mu_lock(&global_connection_polling_mu);
 
-  GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
-  conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
-  if (conn_state != wrapper->current_connectivity_state) {
-    wrapper->current_connectivity_state = conn_state;
-    gpr_cv_broadcast(&wrapper->channel_cv);
-  }
-  // avoid posting work to the channel polling cq if it's been shutdown
-  if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
-    grpc_channel_watch_connectivity_state(
-        wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
-  } else {
-    wrapper->safe_to_destroy = 1;
-    gpr_cv_broadcast(&wrapper->channel_cv);
-  }
-  gpr_mu_unlock(&global_connection_polling_mu);
-  gpr_mu_unlock(&wrapper->channel_mu);
+  return 0;
 }
 
-// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->request_safe_destroy = 1;
+/* Needs to be called under global_connection_polling_mu */
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+    grpc_channel *channel) {
+  bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
 
-  while (!wrapper->safe_to_destroy) {
-    gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
-                gpr_inf_future(GPR_CLOCK_REALTIME));
+  watched->channel = channel;
+  watched->next = bg_watched_channel_list_head;
+  watched->refcount = 1;
+  bg_watched_channel_list_head = watched;
+  return watched;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+static void bg_watched_channel_list_free_and_remove(
+    bg_watched_channel *target) {
+  bg_watched_channel *bg = NULL;
+
+  GPR_ASSERT(bg_watched_channel_list_lookup(target));
+  GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
+  if (bg_watched_channel_list_head == target) {
+    bg_watched_channel_list_head = target->next;
+    gpr_free(target);
+    return;
   }
-  GPR_ASSERT(wrapper->safe_to_destroy);
-  gpr_mu_unlock(&wrapper->channel_mu);
+  bg = bg_watched_channel_list_head;
+  while (bg != NULL && bg->next != NULL) {
+    if (bg->next == target) {
+      bg->next = bg->next->next;
+      gpr_free(target);
+      return;
+    }
+    bg = bg->next;
+  }
+  GPR_ASSERT(0);
+}
 
-  grpc_channel_destroy(wrapper->wrapped);
+/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
+ * it onto the background thread for constant watches. */
+static void *channel_init_try_register_connection_polling_without_gil(
+    void *arg) {
+  channel_init_try_register_stack *stack =
+      (channel_init_try_register_stack *)arg;
+
+  gpr_mu_lock(&global_connection_polling_mu);
+  stack->wrapper->bg_wrapped =
+      bg_watched_channel_list_create_and_add(stack->channel);
+  grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped);
+  gpr_mu_unlock(&global_connection_polling_mu);
+  return NULL;
+}
+
+// Needs to be called under global_connection_poolling_mu
+static void grpc_rb_channel_try_register_connection_polling(
+    bg_watched_channel *bg) {
+  grpc_connectivity_state conn_state;
+  watch_state_op *op = NULL;
+
+  GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
+
+  if (bg->refcount == 0) {
+    GPR_ASSERT(bg->channel_destroyed);
+    bg_watched_channel_list_free_and_remove(bg);
+    return;
+  }
+  GPR_ASSERT(bg->refcount == 1);
+  if (bg->channel_destroyed) {
+    GPR_ASSERT(abort_channel_polling);
+    return;
+  }
+  if (abort_channel_polling) {
+    return;
+  }
+
+  conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
+  if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
+    return;
+  }
+  GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+  // prevent bg from being free'd by GC while background thread is watching it
+  bg->refcount++;
+
+  op = gpr_zalloc(sizeof(watch_state_op));
+  op->op_type = CONTINUOUS_WATCH;
+  op->op.continuous_watch_callback_args.bg = bg;
+  grpc_channel_watch_connectivity_state(bg->channel, conn_state,
+                                        gpr_inf_future(GPR_CLOCK_REALTIME),
+                                        channel_polling_cq, op);
 }
 
 // Note this loop breaks out with a single call of
@@ -485,6 +609,8 @@
 // early and falls back to current behavior.
 static void *run_poll_channels_loop_no_gil(void *arg) {
   grpc_event event;
+  watch_state_op *op = NULL;
+  bg_watched_channel *bg = NULL;
   (void)arg;
   gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
 
@@ -500,29 +626,70 @@
     if (event.type == GRPC_QUEUE_SHUTDOWN) {
       break;
     }
+    gpr_mu_lock(&global_connection_polling_mu);
     if (event.type == GRPC_OP_COMPLETE) {
-      grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
+      op = (watch_state_op *)event.tag;
+      if (op->op_type == CONTINUOUS_WATCH) {
+        bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg;
+        bg->refcount--;
+        grpc_rb_channel_try_register_connection_polling(bg);
+        gpr_free(op);
+      } else if (op->op_type == WATCH_STATE_API) {
+        grpc_rb_channel_watch_connection_state_op_complete(
+            (watch_state_op *)event.tag, event.success);
+      } else {
+        GPR_ASSERT(0);
+      }
     }
+    gpr_mu_unlock(&global_connection_polling_mu);
   }
   grpc_completion_queue_destroy(channel_polling_cq);
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
+  gpr_log(GPR_DEBUG,
+          "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling "
+          "loop");
   return NULL;
 }
 
 // Notify the channel polling loop to cleanup and shutdown.
 static void run_poll_channels_loop_unblocking_func(void *arg) {
+  bg_watched_channel *bg = NULL;
   (void)arg;
+
   gpr_mu_lock(&global_connection_polling_mu);
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling");
+  gpr_log(GPR_DEBUG,
+          "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting "
+          "connection polling");
+  // early out after first time through
+  if (abort_channel_polling) {
+    gpr_mu_unlock(&global_connection_polling_mu);
+    return;
+  }
   abort_channel_polling = 1;
+
+  // force pending watches to end by switching to shutdown state
+  bg = bg_watched_channel_list_head;
+  while (bg != NULL) {
+    if (!bg->channel_destroyed) {
+      grpc_channel_destroy(bg->channel);
+      bg->channel_destroyed = 1;
+    }
+    bg = bg->next;
+  }
+
   grpc_completion_queue_shutdown(channel_polling_cq);
+  gpr_cv_broadcast(&global_connection_polling_cv);
   gpr_mu_unlock(&global_connection_polling_mu);
+  gpr_log(GPR_DEBUG,
+          "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting "
+          "connection polling");
 }
 
 // Poll channel connectivity states in background thread without the GIL.
 static VALUE run_poll_channels_loop(VALUE arg) {
   (void)arg;
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
+  gpr_log(
+      GPR_DEBUG,
+      "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
   rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
                              run_poll_channels_loop_unblocking_func, NULL);
 
@@ -542,13 +709,13 @@
   return NULL;
 }
 
-static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) {
+static void *set_abort_channel_polling_without_gil(void *arg) {
   (void)arg;
   gpr_mu_lock(&global_connection_polling_mu);
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling");
   abort_channel_polling = 1;
   gpr_cv_broadcast(&global_connection_polling_cv);
   gpr_mu_unlock(&global_connection_polling_mu);
+  return NULL;
 }
 
 /* Temporary fix for
@@ -576,10 +743,8 @@
 
   if (!RTEST(background_thread)) {
     gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread");
-    gpr_mu_lock(&global_connection_polling_mu);
-    abort_channel_polling = 1;
-    gpr_cv_broadcast(&global_connection_polling_cv);
-    gpr_mu_unlock(&global_connection_polling_mu);
+    rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL,
+                               NULL, NULL);
   }
 }
 
@@ -658,5 +823,5 @@
 grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
   grpc_rb_channel *wrapper = NULL;
   TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  return wrapper->wrapped;
+  return wrapper->bg_wrapped->channel;
 }
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 9e85bbc..f1a08a7 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -106,17 +106,17 @@
   grpc_rb_event *event = NULL;
   (void)param;
   gpr_mu_lock(&event_queue.mu);
-  while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
+  while (!event_queue.abort) {
+    if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
+      gpr_mu_unlock(&event_queue.mu);
+      return event;
+    }
     gpr_cv_wait(&event_queue.cv,
                 &event_queue.mu,
                 gpr_inf_future(GPR_CLOCK_REALTIME));
-    if (event_queue.abort) {
-      gpr_mu_unlock(&event_queue.mu);
-      return NULL;
-    }
   }
   gpr_mu_unlock(&event_queue.mu);
-  return event;
+  return NULL;
 }
 
 static void grpc_rb_event_unblocking_func(void *arg) {
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 584b5db..e0be5d7 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -295,11 +295,12 @@
 
 static void grpc_ruby_once_init_internal() {
   grpc_init();
-  grpc_rb_event_queue_thread_start();
-  grpc_rb_channel_polling_thread_start();
   atexit(grpc_rb_shutdown);
 }
 
+static VALUE bg_thread_init_rb_mu = Qundef;
+static int bg_thread_init_done = 0;
+
 void grpc_ruby_once_init() {
   /* ruby_vm_at_exit doesn't seem to be working. It would crash once every
    * blue moon, and some users are getting it repeatedly. See the discussions
@@ -312,6 +313,18 @@
    * schedule our initialization and destruction only once.
    */
   gpr_once_init(&g_once_init, grpc_ruby_once_init_internal);
+
+  // Avoid calling calling into ruby library (when creating threads here)
+  // in gpr_once_init. In general, it appears to be unsafe to call
+  // into the ruby library while holding a non-ruby mutex, because a gil yield
+  // could end up trying to lock onto that same mutex and deadlocking.
+  rb_mutex_lock(bg_thread_init_rb_mu);
+  if (!bg_thread_init_done) {
+    grpc_rb_event_queue_thread_start();
+    grpc_rb_channel_polling_thread_start();
+    bg_thread_init_done = 1;
+  }
+  rb_mutex_unlock(bg_thread_init_rb_mu);
 }
 
 void Init_grpc_c() {
@@ -320,6 +333,9 @@
     return;
   }
 
+  bg_thread_init_rb_mu = rb_mutex_new();
+  rb_global_variable(&bg_thread_init_rb_mu);
+
   grpc_rb_mGRPC = rb_define_module("GRPC");
   grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
   grpc_rb_sNewServerRpc =
diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb
index 940d68b..c8a7856 100644
--- a/src/ruby/spec/channel_connection_spec.rb
+++ b/src/ruby/spec/channel_connection_spec.rb
@@ -28,6 +28,10 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 require 'grpc'
+require 'timeout'
+
+include Timeout
+include GRPC::Core
 
 # A test message
 class EchoMsg
@@ -62,7 +66,7 @@
 EchoStub = EchoService.rpc_stub_class
 
 def start_server(port = 0)
-  @srv = GRPC::RpcServer.new
+  @srv = GRPC::RpcServer.new(pool_size: 1)
   server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
   @srv.handle(EchoService)
   @server_thd = Thread.new { @srv.run }
@@ -138,4 +142,32 @@
 
     stop_server
   end
+
+  it 'concurrent watches on the same channel' do
+    timeout(180) do
+      port = start_server
+      ch = GRPC::Core::Channel.new("localhost:#{port}", {},
+                                   :this_channel_is_insecure)
+      stop_server
+
+      thds = []
+      50.times do
+        thds << Thread.new do
+          while ch.connectivity_state(true) != ConnectivityStates::READY
+            ch.watch_connectivity_state(
+              ConnectivityStates::READY, Time.now + 60)
+            break
+          end
+        end
+      end
+
+      sleep 0.01
+
+      start_server(port)
+
+      thds.each(&:join)
+
+      stop_server
+    end
+  end
 end