blob: 1d11a53aa72a823bac2b8f59877fe12070e02747 [file] [log] [blame]
nnoble097ef9b2014-12-01 17:06:10 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
nnoble097ef9b2014-12-01 17:06:10 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
nnoble097ef9b2014-12-01 17:06:10 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
nnoble097ef9b2014-12-01 17:06:10 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
nnoble097ef9b2014-12-01 17:06:10 -080016 *
17 */
18
Nicolas "Pixel" Nobled51d1212016-01-31 11:33:19 +010019#include <ruby/ruby.h>
Alexander Polcyn9f498662017-03-02 19:20:07 -080020#include <ruby/thread.h>
Nicolas "Pixel" Noble9fcdc872016-05-05 06:15:34 +020021
Craig Tiller7c70b6c2017-01-23 07:48:42 -080022#include "rb_byte_buffer.h"
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -080023#include "rb_channel.h"
Craig Tiller5b1c5f22017-04-19 09:52:18 -070024#include "rb_grpc_imports.generated.h"
nnoble097ef9b2014-12-01 17:06:10 -080025
nnoble097ef9b2014-12-01 17:06:10 -080026#include <grpc/grpc.h>
nnoble0c475f02014-12-05 15:37:39 -080027#include <grpc/grpc_security.h>
Tim Emiola623a74d2015-08-11 09:24:20 -070028#include <grpc/support/alloc.h>
murgatroid999acc40f2016-06-30 13:54:09 -070029#include <grpc/support/log.h>
murgatroid9991633cb2016-07-19 10:36:01 -070030#include <grpc/support/time.h>
nnoble097ef9b2014-12-01 17:06:10 -080031#include "rb_call.h"
32#include "rb_channel_args.h"
Tim Emiola9332ea62015-10-27 23:48:29 -070033#include "rb_channel_credentials.h"
nnoble097ef9b2014-12-01 17:06:10 -080034#include "rb_completion_queue.h"
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -080035#include "rb_grpc.h"
nnoble097ef9b2014-12-01 17:06:10 -080036#include "rb_server.h"
37
38/* id_channel is the name of the hidden ivar that preserves a reference to the
39 * channel on a call, so that calls are not GCed before their channel. */
40static ID id_channel;
41
42/* id_target is the name of the hidden ivar that preserves a reference to the
Tim Emiola564719d2015-03-27 13:07:34 -070043 * target string used to create the call, preserved so that it does not get
nnoble097ef9b2014-12-01 17:06:10 -080044 * GCed before the channel */
45static ID id_target;
46
murgatroid99afe39742015-12-16 13:04:37 -080047/* id_insecure_channel is used to indicate that a channel is insecure */
48static VALUE id_insecure_channel;
49
Yuki Yugui Sonoda3c88e5d2015-04-16 20:09:00 +090050/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
51static VALUE grpc_rb_cChannel = Qnil;
Tim Emiola564719d2015-03-27 13:07:34 -070052
nnoble097ef9b2014-12-01 17:06:10 -080053/* Used during the conversion of a hash to channel args during channel setup */
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +090054static VALUE grpc_rb_cChannelArgs;
nnoble097ef9b2014-12-01 17:06:10 -080055
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070056typedef struct bg_watched_channel {
Craig Tillerbaa14a92017-11-03 09:09:36 -070057 grpc_channel* channel;
Alexander Polcyn032f3982017-05-17 13:22:55 -070058 // these fields must only be accessed under global_connection_polling_mu
Craig Tillerbaa14a92017-11-03 09:09:36 -070059 struct bg_watched_channel* next;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070060 int channel_destroyed;
Alexander Polcyn032f3982017-05-17 13:22:55 -070061 int refcount;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070062} bg_watched_channel;
nnoble097ef9b2014-12-01 17:06:10 -080063
murgatroid99a00d7c02016-03-14 15:51:56 -070064/* grpc_rb_channel wraps a grpc_channel. */
nnoble097ef9b2014-12-01 17:06:10 -080065typedef struct grpc_rb_channel {
murgatroid99a00d7c02016-03-14 15:51:56 -070066 VALUE credentials;
67
Alexander Polcyn7e444802017-05-17 13:13:55 -070068 /* The actual channel (protected in a wrapper to tell when it's safe to
69 * destroy) */
Craig Tillerbaa14a92017-11-03 09:09:36 -070070 bg_watched_channel* bg_wrapped;
nnoble097ef9b2014-12-01 17:06:10 -080071} grpc_rb_channel;
72
Alexander Polcyn7e444802017-05-17 13:13:55 -070073typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type;
Alexander Polcyn9f498662017-03-02 19:20:07 -080074
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070075typedef struct watch_state_op {
76 watch_state_op_type op_type;
77 // from event.success
78 union {
79 struct {
80 int success;
81 // has been called back due to a cq next call
82 int called_back;
83 } api_callback_args;
84 struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -070085 bg_watched_channel* bg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070086 } continuous_watch_callback_args;
87 } op;
88} watch_state_op;
89
Craig Tillerbaa14a92017-11-03 09:09:36 -070090static bg_watched_channel* bg_watched_channel_list_head = NULL;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070091
Alexander Polcyn9f498662017-03-02 19:20:07 -080092static void grpc_rb_channel_try_register_connection_polling(
Craig Tillerbaa14a92017-11-03 09:09:36 -070093 bg_watched_channel* bg);
94static void* wait_until_channel_polling_thread_started_no_gil(void*);
95static void wait_until_channel_polling_thread_started_unblocking_func(void*);
96static void* channel_init_try_register_connection_polling_without_gil(
97 void* arg);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -070098
99typedef struct channel_init_try_register_stack {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700100 grpc_channel* channel;
101 grpc_rb_channel* wrapper;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700102} channel_init_try_register_stack;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800103
Craig Tillerbaa14a92017-11-03 09:09:36 -0700104static grpc_completion_queue* channel_polling_cq;
Alexander Polcynbe301142017-03-14 16:33:44 -0700105static gpr_mu global_connection_polling_mu;
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700106static gpr_cv global_connection_polling_cv;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800107static int abort_channel_polling = 0;
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700108static int channel_polling_thread_started = 0;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800109
Craig Tillerbaa14a92017-11-03 09:09:36 -0700110static int bg_watched_channel_list_lookup(bg_watched_channel* bg);
111static bg_watched_channel* bg_watched_channel_list_create_and_add(
112 grpc_channel* channel);
113static void bg_watched_channel_list_free_and_remove(bg_watched_channel* bg);
114static void run_poll_channels_loop_unblocking_func(void* arg);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700115
116// Needs to be called under global_connection_polling_mu
Alexander Polcyn032f3982017-05-17 13:22:55 -0700117static void grpc_rb_channel_watch_connection_state_op_complete(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700118 watch_state_op* op, int success) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700119 GPR_ASSERT(!op->op.api_callback_args.called_back);
120 op->op.api_callback_args.called_back = 1;
121 op->op.api_callback_args.success = success;
122 // wake up the watch API call thats waiting on this op
123 gpr_cv_broadcast(&global_connection_polling_cv);
124}
125
126/* Avoids destroying a channel twice. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700127static void grpc_rb_channel_safe_destroy(bg_watched_channel* bg) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700128 gpr_mu_lock(&global_connection_polling_mu);
129 GPR_ASSERT(bg_watched_channel_list_lookup(bg));
130 if (!bg->channel_destroyed) {
131 grpc_channel_destroy(bg->channel);
132 bg->channel_destroyed = 1;
133 }
134 bg->refcount--;
135 if (bg->refcount == 0) {
136 bg_watched_channel_list_free_and_remove(bg);
137 }
138 gpr_mu_unlock(&global_connection_polling_mu);
139}
140
Craig Tillerbaa14a92017-11-03 09:09:36 -0700141static void* channel_safe_destroy_without_gil(void* arg) {
142 grpc_rb_channel_safe_destroy((bg_watched_channel*)arg);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700143 return NULL;
144}
nnoble097ef9b2014-12-01 17:06:10 -0800145
146/* Destroys Channel instances. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700147static void grpc_rb_channel_free(void* p) {
148 grpc_rb_channel* ch = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800149 if (p == NULL) {
150 return;
151 };
Craig Tillerbaa14a92017-11-03 09:09:36 -0700152 ch = (grpc_rb_channel*)p;
nnoble097ef9b2014-12-01 17:06:10 -0800153
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700154 if (ch->bg_wrapped != NULL) {
155 /* assumption made here: it's ok to directly gpr_mu_lock the global
156 * connection polling mutex becuse we're in a finalizer,
Alexander Polcyn032f3982017-05-17 13:22:55 -0700157 * and we can count on this thread to not be interrupted or
158 * yield the gil. */
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700159 grpc_rb_channel_safe_destroy(ch->bg_wrapped);
160 ch->bg_wrapped = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800161 }
162
163 xfree(p);
164}
165
166/* Protects the mark object from GC */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700167static void grpc_rb_channel_mark(void* p) {
168 grpc_rb_channel* channel = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800169 if (p == NULL) {
170 return;
171 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700172 channel = (grpc_rb_channel*)p;
murgatroid99a00d7c02016-03-14 15:51:56 -0700173 if (channel->credentials != Qnil) {
174 rb_gc_mark(channel->credentials);
nnoble097ef9b2014-12-01 17:06:10 -0800175 }
176}
177
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800178static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
179 {grpc_rb_channel_mark,
180 grpc_rb_channel_free,
181 GRPC_RB_MEMSIZE_UNAVAILABLE,
182 {NULL, NULL}},
183 NULL,
184 NULL,
Tim Emiola9161a822015-11-11 15:58:44 -0800185#ifdef RUBY_TYPED_FREE_IMMEDIATELY
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800186 RUBY_TYPED_FREE_IMMEDIATELY
Tim Emiola9161a822015-11-11 15:58:44 -0800187#endif
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900188};
189
nnoble097ef9b2014-12-01 17:06:10 -0800190/* Allocates grpc_rb_channel instances. */
191static VALUE grpc_rb_channel_alloc(VALUE cls) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700192 grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700193 wrapper->bg_wrapped = NULL;
murgatroid99a00d7c02016-03-14 15:51:56 -0700194 wrapper->credentials = Qnil;
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900195 return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
nnoble097ef9b2014-12-01 17:06:10 -0800196}
197
nnoble0c475f02014-12-05 15:37:39 -0800198/*
199 call-seq:
murgatroid99afe39742015-12-16 13:04:37 -0800200 insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'},
201 :this_channel_is_insecure)
nnoble0c475f02014-12-05 15:37:39 -0800202 creds = ...
203 secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
204
205 Creates channel instances. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700206static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
nnoble0c475f02014-12-05 15:37:39 -0800207 VALUE channel_args = Qnil;
208 VALUE credentials = Qnil;
209 VALUE target = Qnil;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700210 grpc_rb_channel* wrapper = NULL;
211 grpc_channel* ch = NULL;
212 grpc_channel_credentials* creds = NULL;
213 char* target_chars = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800214 grpc_channel_args args;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700215 channel_init_try_register_stack stack;
Alexander Polcyn82fef0b2017-05-17 23:42:38 -0700216 int stop_waiting_for_thread_start = 0;
nnoble097ef9b2014-12-01 17:06:10 -0800217 MEMZERO(&args, grpc_channel_args, 1);
218
Alexander Polcyn2a9b5d72017-04-14 12:10:55 -0700219 grpc_ruby_once_init();
murgatroid99ce67bff2017-04-19 15:54:27 -0700220 rb_thread_call_without_gvl(
Alexander Polcyn82fef0b2017-05-17 23:42:38 -0700221 wait_until_channel_polling_thread_started_no_gil,
222 &stop_waiting_for_thread_start,
223 wait_until_channel_polling_thread_started_unblocking_func,
224 &stop_waiting_for_thread_start);
Alexander Polcyn2a9b5d72017-04-14 12:10:55 -0700225
murgatroid99afe39742015-12-16 13:04:37 -0800226 /* "3" == 3 mandatory args */
227 rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
nnoble0c475f02014-12-05 15:37:39 -0800228
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900229 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
nnoble0c475f02014-12-05 15:37:39 -0800230 target_chars = StringValueCStr(target);
nnoble097ef9b2014-12-01 17:06:10 -0800231 grpc_rb_hash_convert_to_channel_args(channel_args, &args);
murgatroid99afe39742015-12-16 13:04:37 -0800232 if (TYPE(credentials) == T_SYMBOL) {
233 if (id_insecure_channel != SYM2ID(credentials)) {
234 rb_raise(rb_eTypeError,
235 "bad creds symbol, want :this_channel_is_insecure");
236 return Qnil;
237 }
Nicolas "Pixel" Noble9d72b142015-08-08 01:45:38 +0200238 ch = grpc_insecure_channel_create(target_chars, &args, NULL);
nnoble0c475f02014-12-05 15:37:39 -0800239 } else {
murgatroid99a00d7c02016-03-14 15:51:56 -0700240 wrapper->credentials = credentials;
Tim Emiola9332ea62015-10-27 23:48:29 -0700241 creds = grpc_rb_get_wrapped_channel_credentials(credentials);
Julien Boeufc5b570f2015-08-25 17:47:55 -0700242 ch = grpc_secure_channel_create(creds, target_chars, &args, NULL);
nnoble0c475f02014-12-05 15:37:39 -0800243 }
Alexander Polcyn9f498662017-03-02 19:20:07 -0800244
245 GPR_ASSERT(ch);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700246 stack.channel = ch;
247 stack.wrapper = wrapper;
248 rb_thread_call_without_gvl(
Alexander Polcyn7e444802017-05-17 13:13:55 -0700249 channel_init_try_register_connection_polling_without_gil, &stack, NULL,
250 NULL);
Alexander Polcyn9f498662017-03-02 19:20:07 -0800251
nnoble097ef9b2014-12-01 17:06:10 -0800252 if (args.args != NULL) {
Craig Tillerb5dcec52015-01-13 11:13:42 -0800253 xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
nnoble097ef9b2014-12-01 17:06:10 -0800254 }
255 if (ch == NULL) {
256 rb_raise(rb_eRuntimeError, "could not create an rpc channel to target:%s",
257 target_chars);
Tim Emiola564719d2015-03-27 13:07:34 -0700258 return Qnil;
nnoble097ef9b2014-12-01 17:06:10 -0800259 }
260 rb_ivar_set(self, id_target, target);
nnoble097ef9b2014-12-01 17:06:10 -0800261 return self;
262}
263
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700264typedef struct get_state_stack {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700265 bg_watched_channel* bg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700266 int try_to_connect;
267 int out;
268} get_state_stack;
269
Craig Tillerbaa14a92017-11-03 09:09:36 -0700270static void* get_state_without_gil(void* arg) {
271 get_state_stack* stack = (get_state_stack*)arg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700272
273 gpr_mu_lock(&global_connection_polling_mu);
274 GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
Alexander Polcynd7455ab2017-05-17 21:39:09 -0700275 if (stack->bg->channel_destroyed) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700276 stack->out = GRPC_CHANNEL_SHUTDOWN;
277 } else {
Alexander Polcynd7455ab2017-05-17 21:39:09 -0700278 stack->out = grpc_channel_check_connectivity_state(stack->bg->channel,
Alexander Polcyn7e444802017-05-17 13:13:55 -0700279 stack->try_to_connect);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700280 }
281 gpr_mu_unlock(&global_connection_polling_mu);
282
283 return NULL;
284}
285
Tim Emiola046094d2015-08-12 16:55:57 -0700286/*
287 call-seq:
Alexander Polcyne57cd902017-03-02 16:13:46 -0800288 ch.connectivity_state -> state
289 ch.connectivity_state(true) -> state
Tim Emiola046094d2015-08-12 16:55:57 -0700290
Alexander Polcyne57cd902017-03-02 16:13:46 -0800291 Indicates the current state of the channel, whose value is one of the
292 constants defined in GRPC::Core::ConnectivityStates.
293
294 It also tries to connect if the chennel is idle in the second form. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700295static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE* argv,
Tim Emiola046094d2015-08-12 16:55:57 -0700296 VALUE self) {
Alexander Polcyne57cd902017-03-02 16:13:46 -0800297 VALUE try_to_connect_param = Qfalse;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700298 grpc_rb_channel* wrapper = NULL;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700299 get_state_stack stack;
Tim Emiola046094d2015-08-12 16:55:57 -0700300
301 /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
Alexander Polcyne57cd902017-03-02 16:13:46 -0800302 rb_scan_args(argc, argv, "01", &try_to_connect_param);
Tim Emiola046094d2015-08-12 16:55:57 -0700303
304 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700305 if (wrapper->bg_wrapped == NULL) {
Tim Emiola046094d2015-08-12 16:55:57 -0700306 rb_raise(rb_eRuntimeError, "closed!");
307 return Qnil;
308 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700309
Alexander Polcynd7455ab2017-05-17 21:39:09 -0700310 stack.bg = wrapper->bg_wrapped;
Alexander Polcyn032f3982017-05-17 13:22:55 -0700311 stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700312 rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
313
314 return LONG2NUM(stack.out);
Tim Emiola046094d2015-08-12 16:55:57 -0700315}
316
Alexander Polcynf3147b32017-03-15 11:34:08 -0700317typedef struct watch_state_stack {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700318 grpc_channel* channel;
Alexander Polcynf3147b32017-03-15 11:34:08 -0700319 gpr_timespec deadline;
Alexander Polcyn563ec532017-03-15 15:54:49 -0700320 int last_state;
Alexander Polcynf3147b32017-03-15 11:34:08 -0700321} watch_state_stack;
Tim Emiola046094d2015-08-12 16:55:57 -0700322
Craig Tillerbaa14a92017-11-03 09:09:36 -0700323static void* wait_for_watch_state_op_complete_without_gvl(void* arg) {
324 watch_state_stack* stack = (watch_state_stack*)arg;
325 watch_state_op* op = NULL;
326 void* success = (void*)0;
Tim Emiola046094d2015-08-12 16:55:57 -0700327
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700328 gpr_mu_lock(&global_connection_polling_mu);
329 // its unsafe to do a "watch" after "channel polling abort" because the cq has
330 // been shut down.
331 if (abort_channel_polling) {
332 gpr_mu_unlock(&global_connection_polling_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700333 return (void*)0;
Alexander Polcyn5b881462017-03-21 18:31:29 -0700334 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700335 op = gpr_zalloc(sizeof(watch_state_op));
336 op->op_type = WATCH_STATE_API;
Alexander Polcyn7e444802017-05-17 13:13:55 -0700337 grpc_channel_watch_connectivity_state(stack->channel, stack->last_state,
338 stack->deadline, channel_polling_cq,
339 op);
Alexander Polcyn5b881462017-03-21 18:31:29 -0700340
Alexander Polcyn7e444802017-05-17 13:13:55 -0700341 while (!op->op.api_callback_args.called_back) {
342 gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700343 gpr_inf_future(GPR_CLOCK_REALTIME));
Alexander Polcyn563ec532017-03-15 15:54:49 -0700344 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700345 if (op->op.api_callback_args.success) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700346 success = (void*)1;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700347 }
348 gpr_free(op);
349 gpr_mu_unlock(&global_connection_polling_mu);
Alexander Polcyn5b881462017-03-21 18:31:29 -0700350
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700351 return success;
Alexander Polcynf3147b32017-03-15 11:34:08 -0700352}
Craig Tillerbaa14a92017-11-03 09:09:36 -0700353static void wait_for_watch_state_op_complete_unblocking_func(void* arg) {
354 bg_watched_channel* bg = (bg_watched_channel*)arg;
Alexander Polcync24d53b2017-05-17 20:25:38 -0700355 gpr_mu_lock(&global_connection_polling_mu);
356 if (!bg->channel_destroyed) {
357 grpc_channel_destroy(bg->channel);
358 bg->channel_destroyed = 1;
359 }
360 gpr_mu_unlock(&global_connection_polling_mu);
Alexander Polcynf3147b32017-03-15 11:34:08 -0700361}
362
Alexander Polcynfcad5792017-03-14 12:26:17 -0700363/* Wait until the channel's connectivity state becomes different from
364 * "last_state", or "deadline" expires.
365 * Returns true if the the channel's connectivity state becomes
366 * different from "last_state" within "deadline".
367 * Returns false if "deadline" expires before the channel's connectivity
368 * state changes from "last_state".
369 * */
Tim Emiola046094d2015-08-12 16:55:57 -0700370static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
371 VALUE last_state,
murgatroid9991633cb2016-07-19 10:36:01 -0700372 VALUE deadline) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700373 grpc_rb_channel* wrapper = NULL;
Alexander Polcynf3147b32017-03-15 11:34:08 -0700374 watch_state_stack stack;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700375 void* op_success = 0;
murgatroid9991633cb2016-07-19 10:36:01 -0700376
Tim Emiola046094d2015-08-12 16:55:57 -0700377 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynfcad5792017-03-14 12:26:17 -0700378
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700379 if (wrapper->bg_wrapped == NULL) {
Tim Emiola046094d2015-08-12 16:55:57 -0700380 rb_raise(rb_eRuntimeError, "closed!");
381 return Qnil;
382 }
Tim Emiola046094d2015-08-12 16:55:57 -0700383
Alexander Polcynfcad5792017-03-14 12:26:17 -0700384 if (!FIXNUM_P(last_state)) {
Craig Tiller5b1c5f22017-04-19 09:52:18 -0700385 rb_raise(
386 rb_eTypeError,
387 "bad type for last_state. want a GRPC::Core::ChannelState constant");
Alexander Polcynfcad5792017-03-14 12:26:17 -0700388 return Qnil;
murgatroid9991633cb2016-07-19 10:36:01 -0700389 }
nnoble097ef9b2014-12-01 17:06:10 -0800390
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700391 stack.channel = wrapper->bg_wrapped->channel;
392 stack.deadline = grpc_rb_time_timeval(deadline, 0),
Alexander Polcyn563ec532017-03-15 15:54:49 -0700393 stack.last_state = NUM2LONG(last_state);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700394
395 op_success = rb_thread_call_without_gvl(
Alexander Polcyn7e444802017-05-17 13:13:55 -0700396 wait_for_watch_state_op_complete_without_gvl, &stack,
Alexander Polcync24d53b2017-05-17 20:25:38 -0700397 wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700398
399 return op_success ? Qtrue : Qfalse;
nnoble097ef9b2014-12-01 17:06:10 -0800400}
401
402/* Create a call given a grpc_channel, in order to call method. The request
403 is not sent until grpc_call_invoke is called. */
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800404static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
405 VALUE method, VALUE host,
406 VALUE deadline) {
nnoble097ef9b2014-12-01 17:06:10 -0800407 VALUE res = Qnil;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700408 grpc_rb_channel* wrapper = NULL;
409 grpc_call* call = NULL;
410 grpc_call* parent_call = NULL;
411 grpc_completion_queue* cq = NULL;
Tim Emiolafde3dbf2015-08-11 16:35:41 -0700412 int flags = GRPC_PROPAGATE_DEFAULTS;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800413 grpc_slice method_slice;
414 grpc_slice host_slice;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700415 grpc_slice* host_slice_ptr = NULL;
416 char* tmp_str = NULL;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800417
Tim Emiolad42c1b72015-08-11 10:50:21 -0700418 if (host != Qnil) {
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800419 host_slice =
420 grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800421 host_slice_ptr = &host_slice;
Tim Emiolad42c1b72015-08-11 10:50:21 -0700422 }
Tim Emiolafde3dbf2015-08-11 16:35:41 -0700423 if (mask != Qnil) {
424 flags = NUM2UINT(mask);
425 }
426 if (parent != Qnil) {
427 parent_call = grpc_rb_get_wrapped_call(parent);
428 }
nnoble097ef9b2014-12-01 17:06:10 -0800429
Sree Kuchibhotla8d8bb7a2017-03-22 12:37:29 -0700430 cq = grpc_completion_queue_create_for_pluck(NULL);
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900431 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700432 if (wrapper->bg_wrapped == NULL) {
nnoble097ef9b2014-12-01 17:06:10 -0800433 rb_raise(rb_eRuntimeError, "closed!");
Tim Emiola564719d2015-03-27 13:07:34 -0700434 return Qnil;
nnoble097ef9b2014-12-01 17:06:10 -0800435 }
436
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800437 method_slice =
438 grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800439
Alexander Polcyn7e444802017-05-17 13:13:55 -0700440 call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call,
441 flags, cq, method_slice, host_slice_ptr,
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800442 grpc_rb_time_timeval(deadline,
443 /* absolute time */ 0),
444 NULL);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800445
nnoble097ef9b2014-12-01 17:06:10 -0800446 if (call == NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800447 tmp_str = grpc_slice_to_c_string(method_slice);
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800448 rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str);
Tim Emiola564719d2015-03-27 13:07:34 -0700449 return Qnil;
nnoble097ef9b2014-12-01 17:06:10 -0800450 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800451
452 grpc_slice_unref(method_slice);
453 if (host_slice_ptr != NULL) {
454 grpc_slice_unref(host_slice);
455 }
456
murgatroid995ea4a992016-06-13 10:36:41 -0700457 res = grpc_rb_wrap_call(call, cq);
nnoble097ef9b2014-12-01 17:06:10 -0800458
Tim Emiola564719d2015-03-27 13:07:34 -0700459 /* Make this channel an instance attribute of the call so that it is not GCed
nnoble097ef9b2014-12-01 17:06:10 -0800460 * before the call. */
461 rb_ivar_set(res, id_channel, self);
462 return res;
463}
464
465/* Closes the channel, calling it's destroy method */
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700466/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
467 * this */
nnoble097ef9b2014-12-01 17:06:10 -0800468static VALUE grpc_rb_channel_destroy(VALUE self) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700469 grpc_rb_channel* wrapper = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800470
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900471 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700472 if (wrapper->bg_wrapped != NULL) {
Alexander Polcyn7e444802017-05-17 13:13:55 -0700473 rb_thread_call_without_gvl(channel_safe_destroy_without_gil,
474 wrapper->bg_wrapped, NULL, NULL);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700475 wrapper->bg_wrapped = NULL;
nnoble097ef9b2014-12-01 17:06:10 -0800476 }
477
478 return Qnil;
479}
480
Tim Emiola623a74d2015-08-11 09:24:20 -0700481/* Called to obtain the target that this channel accesses. */
482static VALUE grpc_rb_channel_get_target(VALUE self) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700483 grpc_rb_channel* wrapper = NULL;
Tim Emiola623a74d2015-08-11 09:24:20 -0700484 VALUE res = Qnil;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700485 char* target = NULL;
Tim Emiola623a74d2015-08-11 09:24:20 -0700486
487 TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700488 target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
Tim Emiola623a74d2015-08-11 09:24:20 -0700489 res = rb_str_new2(target);
490 gpr_free(target);
491
492 return res;
493}
494
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700495/* Needs to be called under global_connection_polling_mu */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700496static int bg_watched_channel_list_lookup(bg_watched_channel* target) {
497 bg_watched_channel* cur = bg_watched_channel_list_head;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800498
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700499 while (cur != NULL) {
500 if (cur == target) {
501 return 1;
502 }
503 cur = cur->next;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800504 }
Alexander Polcynfcad5792017-03-14 12:26:17 -0700505
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700506 return 0;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800507}
508
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700509/* Needs to be called under global_connection_polling_mu */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700510static bg_watched_channel* bg_watched_channel_list_create_and_add(
511 grpc_channel* channel) {
512 bg_watched_channel* watched = gpr_zalloc(sizeof(bg_watched_channel));
Alexander Polcyn5b881462017-03-21 18:31:29 -0700513
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700514 watched->channel = channel;
515 watched->next = bg_watched_channel_list_head;
516 watched->refcount = 1;
517 bg_watched_channel_list_head = watched;
518 return watched;
519}
520
521/* Needs to be called under global_connection_polling_mu */
Alexander Polcyn032f3982017-05-17 13:22:55 -0700522static void bg_watched_channel_list_free_and_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700523 bg_watched_channel* target) {
524 bg_watched_channel* bg = NULL;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700525
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700526 GPR_ASSERT(bg_watched_channel_list_lookup(target));
527 GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
528 if (bg_watched_channel_list_head == target) {
529 bg_watched_channel_list_head = target->next;
530 gpr_free(target);
531 return;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800532 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700533 bg = bg_watched_channel_list_head;
534 while (bg != NULL && bg->next != NULL) {
535 if (bg->next == target) {
536 bg->next = bg->next->next;
537 gpr_free(target);
538 return;
539 }
540 bg = bg->next;
541 }
542 GPR_ASSERT(0);
543}
Alexander Polcyn9f498662017-03-02 19:20:07 -0800544
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700545/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
546 * it onto the background thread for constant watches. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700547static void* channel_init_try_register_connection_polling_without_gil(
548 void* arg) {
549 channel_init_try_register_stack* stack =
550 (channel_init_try_register_stack*)arg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700551
552 gpr_mu_lock(&global_connection_polling_mu);
Alexander Polcyn7e444802017-05-17 13:13:55 -0700553 stack->wrapper->bg_wrapped =
554 bg_watched_channel_list_create_and_add(stack->channel);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700555 grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped);
556 gpr_mu_unlock(&global_connection_polling_mu);
557 return NULL;
558}
559
560// Needs to be called under global_connection_poolling_mu
Alexander Polcyn032f3982017-05-17 13:22:55 -0700561static void grpc_rb_channel_try_register_connection_polling(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700562 bg_watched_channel* bg) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700563 grpc_connectivity_state conn_state;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700564 watch_state_op* op = NULL;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700565
566 GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
567
568 if (bg->refcount == 0) {
569 GPR_ASSERT(bg->channel_destroyed);
570 bg_watched_channel_list_free_and_remove(bg);
571 return;
572 }
573 GPR_ASSERT(bg->refcount == 1);
Alexander Polcync24d53b2017-05-17 20:25:38 -0700574 if (bg->channel_destroyed || abort_channel_polling) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700575 return;
576 }
577
578 conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
579 if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
580 return;
581 }
582 GPR_ASSERT(bg_watched_channel_list_lookup(bg));
583 // prevent bg from being free'd by GC while background thread is watching it
584 bg->refcount++;
585
586 op = gpr_zalloc(sizeof(watch_state_op));
587 op->op_type = CONTINUOUS_WATCH;
588 op->op.continuous_watch_callback_args.bg = bg;
Alexander Polcyn7e444802017-05-17 13:13:55 -0700589 grpc_channel_watch_connectivity_state(bg->channel, conn_state,
590 gpr_inf_future(GPR_CLOCK_REALTIME),
591 channel_polling_cq, op);
Alexander Polcyn9f498662017-03-02 19:20:07 -0800592}
593
Alexander Polcyn69719082017-03-13 12:00:42 -0700594// Note this loop breaks out with a single call of
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700595// "run_poll_channels_loop_no_gil".
Alexander Polcyn69719082017-03-13 12:00:42 -0700596// This assumes that a ruby call the unblocking func
597// indicates process shutdown.
Alexander Polcyn9f498662017-03-02 19:20:07 -0800598// In the worst case, this stops polling channel connectivity
599// early and falls back to current behavior.
Craig Tillerbaa14a92017-11-03 09:09:36 -0700600static void* run_poll_channels_loop_no_gil(void* arg) {
Alexander Polcyn9f498662017-03-02 19:20:07 -0800601 grpc_event event;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700602 watch_state_op* op = NULL;
603 bg_watched_channel* bg = NULL;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800604 (void)arg;
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700605 gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
606
607 gpr_mu_lock(&global_connection_polling_mu);
608 GPR_ASSERT(!channel_polling_thread_started);
609 channel_polling_thread_started = 1;
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700610 gpr_cv_broadcast(&global_connection_polling_cv);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700611 gpr_mu_unlock(&global_connection_polling_mu);
612
Alexander Polcyn9f498662017-03-02 19:20:07 -0800613 for (;;) {
614 event = grpc_completion_queue_next(
615 channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
616 if (event.type == GRPC_QUEUE_SHUTDOWN) {
Alexander Polcyn9f498662017-03-02 19:20:07 -0800617 break;
618 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700619 gpr_mu_lock(&global_connection_polling_mu);
Alexander Polcyn9f498662017-03-02 19:20:07 -0800620 if (event.type == GRPC_OP_COMPLETE) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700621 op = (watch_state_op*)event.tag;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700622 if (op->op_type == CONTINUOUS_WATCH) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700623 bg = (bg_watched_channel*)op->op.continuous_watch_callback_args.bg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700624 bg->refcount--;
625 grpc_rb_channel_try_register_connection_polling(bg);
626 gpr_free(op);
Alexander Polcyn7e444802017-05-17 13:13:55 -0700627 } else if (op->op_type == WATCH_STATE_API) {
628 grpc_rb_channel_watch_connection_state_op_complete(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700629 (watch_state_op*)event.tag, event.success);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700630 } else {
631 GPR_ASSERT(0);
632 }
Alexander Polcyn9f498662017-03-02 19:20:07 -0800633 }
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700634 gpr_mu_unlock(&global_connection_polling_mu);
Alexander Polcyn9f498662017-03-02 19:20:07 -0800635 }
636 grpc_completion_queue_destroy(channel_polling_cq);
Craig Tiller5b1c5f22017-04-19 09:52:18 -0700637 gpr_log(GPR_DEBUG,
638 "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling "
639 "loop");
Alexander Polcyn9f498662017-03-02 19:20:07 -0800640 return NULL;
641}
642
643// Notify the channel polling loop to cleanup and shutdown.
Craig Tillerbaa14a92017-11-03 09:09:36 -0700644static void run_poll_channels_loop_unblocking_func(void* arg) {
645 bg_watched_channel* bg = NULL;
Alexander Polcyn9f498662017-03-02 19:20:07 -0800646 (void)arg;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700647
Alexander Polcynbe301142017-03-14 16:33:44 -0700648 gpr_mu_lock(&global_connection_polling_mu);
Craig Tiller5b1c5f22017-04-19 09:52:18 -0700649 gpr_log(GPR_DEBUG,
Alexander Polcyn7e444802017-05-17 13:13:55 -0700650 "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting "
Craig Tiller5b1c5f22017-04-19 09:52:18 -0700651 "connection polling");
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700652 // early out after first time through
653 if (abort_channel_polling) {
654 gpr_mu_unlock(&global_connection_polling_mu);
655 return;
656 }
Alexander Polcyn9f498662017-03-02 19:20:07 -0800657 abort_channel_polling = 1;
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700658
659 // force pending watches to end by switching to shutdown state
660 bg = bg_watched_channel_list_head;
Alexander Polcyn7e444802017-05-17 13:13:55 -0700661 while (bg != NULL) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700662 if (!bg->channel_destroyed) {
663 grpc_channel_destroy(bg->channel);
664 bg->channel_destroyed = 1;
665 }
666 bg = bg->next;
667 }
668
Alexander Polcyn9f498662017-03-02 19:20:07 -0800669 grpc_completion_queue_shutdown(channel_polling_cq);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700670 gpr_cv_broadcast(&global_connection_polling_cv);
Alexander Polcynbe301142017-03-14 16:33:44 -0700671 gpr_mu_unlock(&global_connection_polling_mu);
Alexander Polcyn7e444802017-05-17 13:13:55 -0700672 gpr_log(GPR_DEBUG,
673 "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting "
674 "connection polling");
Alexander Polcyn9f498662017-03-02 19:20:07 -0800675}
676
677// Poll channel connectivity states in background thread without the GIL.
678static VALUE run_poll_channels_loop(VALUE arg) {
679 (void)arg;
Craig Tiller5b1c5f22017-04-19 09:52:18 -0700680 gpr_log(
681 GPR_DEBUG,
682 "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
Alexander Polcyn9f498662017-03-02 19:20:07 -0800683 rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700684 run_poll_channels_loop_unblocking_func, NULL);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700685
Alexander Polcyn9f498662017-03-02 19:20:07 -0800686 return Qnil;
687}
688
Craig Tillerbaa14a92017-11-03 09:09:36 -0700689static void* wait_until_channel_polling_thread_started_no_gil(void* arg) {
690 int* stop_waiting = (int*)arg;
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700691 gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
692 gpr_mu_lock(&global_connection_polling_mu);
Alexander Polcyn82fef0b2017-05-17 23:42:38 -0700693 while (!channel_polling_thread_started && !abort_channel_polling &&
694 !*stop_waiting) {
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700695 gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
696 gpr_inf_future(GPR_CLOCK_REALTIME));
697 }
698 gpr_mu_unlock(&global_connection_polling_mu);
699
700 return NULL;
701}
702
murgatroid99ce67bff2017-04-19 15:54:27 -0700703static void wait_until_channel_polling_thread_started_unblocking_func(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700704 void* arg) {
705 int* stop_waiting = (int*)arg;
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700706 gpr_mu_lock(&global_connection_polling_mu);
murgatroid99ce67bff2017-04-19 15:54:27 -0700707 gpr_log(GPR_DEBUG,
Alexander Polcyn82fef0b2017-05-17 23:42:38 -0700708 "GRPC_RUBY: interrupt wait for channel polling thread to start");
709 *stop_waiting = 1;
710 gpr_cv_broadcast(&global_connection_polling_cv);
711 gpr_mu_unlock(&global_connection_polling_mu);
712}
713
Craig Tillerbaa14a92017-11-03 09:09:36 -0700714static void* set_abort_channel_polling_without_gil(void* arg) {
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700715 (void)arg;
716 gpr_mu_lock(&global_connection_polling_mu);
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700717 abort_channel_polling = 1;
718 gpr_cv_broadcast(&global_connection_polling_cv);
719 gpr_mu_unlock(&global_connection_polling_mu);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700720 return NULL;
Alexander Polcyndeeed7f2017-04-16 13:31:58 -0700721}
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700722
Alexander Polcyn9f498662017-03-02 19:20:07 -0800723/* Temporary fix for
724 * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899.
725 * Transports in idle channels can get destroyed. Normally c-core re-connects,
726 * but in grpc-ruby core never gets a thread until an RPC is made, because ruby
727 * only calls c-core's "completion_queu_pluck" API.
728 * This uses a global background thread that calls
729 * "completion_queue_next" on registered "watch_channel_connectivity_state"
730 * calls - so that c-core can reconnect if needed, when there aren't any RPC's.
731 * TODO(apolcyn) remove this when core handles new RPCs on dead connections.
732 */
Alexander Polcyn2a9b5d72017-04-14 12:10:55 -0700733void grpc_rb_channel_polling_thread_start() {
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700734 VALUE background_thread = Qnil;
735
Alexander Polcyn2a9b5d72017-04-14 12:10:55 -0700736 GPR_ASSERT(!abort_channel_polling);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700737 GPR_ASSERT(!channel_polling_thread_started);
Alexander Polcyn2a9b5d72017-04-14 12:10:55 -0700738 GPR_ASSERT(channel_polling_cq == NULL);
739
Alexander Polcynbe301142017-03-14 16:33:44 -0700740 gpr_mu_init(&global_connection_polling_mu);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700741 gpr_cv_init(&global_connection_polling_cv);
742
murgatroid99ce67bff2017-04-19 15:54:27 -0700743 channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700744 background_thread = rb_thread_create(run_poll_channels_loop, NULL);
745
746 if (!RTEST(background_thread)) {
747 gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread");
Alexander Polcyn7e444802017-05-17 13:13:55 -0700748 rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL,
749 NULL, NULL);
Alexander Polcync7fcebe2017-04-15 16:37:44 -0700750 }
Alexander Polcyn9f498662017-03-02 19:20:07 -0800751}
752
Tim Emiolabe885262015-08-11 17:43:48 -0700753static void Init_grpc_propagate_masks() {
754 /* Constants representing call propagation masks in grpc.h */
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800755 VALUE grpc_rb_mPropagateMasks =
756 rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
Tim Emiolabe885262015-08-11 17:43:48 -0700757 rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
758 UINT2NUM(GRPC_PROPAGATE_DEADLINE));
759 rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
760 UINT2NUM(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
761 rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_TRACING_CONTEXT",
762 UINT2NUM(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
763 rb_define_const(grpc_rb_mPropagateMasks, "CANCELLATION",
764 UINT2NUM(GRPC_PROPAGATE_CANCELLATION));
765 rb_define_const(grpc_rb_mPropagateMasks, "DEFAULTS",
766 UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
767}
768
Tim Emiola046094d2015-08-12 16:55:57 -0700769static void Init_grpc_connectivity_states() {
770 /* Constants representing call propagation masks in grpc.h */
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800771 VALUE grpc_rb_mConnectivityStates =
772 rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
Tim Emiola046094d2015-08-12 16:55:57 -0700773 rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
774 LONG2NUM(GRPC_CHANNEL_IDLE));
775 rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
776 LONG2NUM(GRPC_CHANNEL_CONNECTING));
777 rb_define_const(grpc_rb_mConnectivityStates, "READY",
778 LONG2NUM(GRPC_CHANNEL_READY));
779 rb_define_const(grpc_rb_mConnectivityStates, "TRANSIENT_FAILURE",
780 LONG2NUM(GRPC_CHANNEL_TRANSIENT_FAILURE));
781 rb_define_const(grpc_rb_mConnectivityStates, "FATAL_FAILURE",
Craig Tiller48ed92e2016-06-02 11:07:12 -0700782 LONG2NUM(GRPC_CHANNEL_SHUTDOWN));
Tim Emiola046094d2015-08-12 16:55:57 -0700783}
784
Tim Emiola409e6c82015-02-17 17:46:35 -0800785void Init_grpc_channel() {
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900786 grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
787 grpc_rb_cChannel =
788 rb_define_class_under(grpc_rb_mGrpcCore, "Channel", rb_cObject);
nnoble097ef9b2014-12-01 17:06:10 -0800789
790 /* Allocates an object managed by the ruby runtime */
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900791 rb_define_alloc_func(grpc_rb_cChannel, grpc_rb_channel_alloc);
nnoble097ef9b2014-12-01 17:06:10 -0800792
793 /* Provides a ruby constructor and support for dup/clone. */
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900794 rb_define_method(grpc_rb_cChannel, "initialize", grpc_rb_channel_init, -1);
795 rb_define_method(grpc_rb_cChannel, "initialize_copy",
murgatroid99cc0f4e12016-06-02 16:00:54 -0700796 grpc_rb_cannot_init_copy, 1);
nnoble097ef9b2014-12-01 17:06:10 -0800797
798 /* Add ruby analogues of the Channel methods. */
Tim Emiola046094d2015-08-12 16:55:57 -0700799 rb_define_method(grpc_rb_cChannel, "connectivity_state",
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800800 grpc_rb_channel_get_connectivity_state, -1);
Tim Emiola046094d2015-08-12 16:55:57 -0700801 rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
Alexander Polcynfcad5792017-03-14 12:26:17 -0700802 grpc_rb_channel_watch_connectivity_state, 2);
Sree Kuchibhotla98ab5202017-03-03 18:28:47 -0800803 rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
804 5);
Tim Emiola623a74d2015-08-11 09:24:20 -0700805 rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900806 rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
807 rb_define_alias(grpc_rb_cChannel, "close", "destroy");
nnoble097ef9b2014-12-01 17:06:10 -0800808
809 id_channel = rb_intern("__channel");
810 id_target = rb_intern("__target");
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900811 rb_define_const(grpc_rb_cChannel, "SSL_TARGET",
nnoble0c475f02014-12-05 15:37:39 -0800812 ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900813 rb_define_const(grpc_rb_cChannel, "ENABLE_CENSUS",
temiola71bb1372014-12-11 11:27:25 -0800814 ID2SYM(rb_intern(GRPC_ARG_ENABLE_CENSUS)));
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900815 rb_define_const(grpc_rb_cChannel, "MAX_CONCURRENT_STREAMS",
temiola71bb1372014-12-11 11:27:25 -0800816 ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
Yuki Yugui Sonodaa7d369e2015-04-11 11:48:36 +0900817 rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
Mark D. Roth7331a7a2016-08-31 13:09:27 -0700818 ID2SYM(rb_intern(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)));
murgatroid99afe39742015-12-16 13:04:37 -0800819 id_insecure_channel = rb_intern("this_channel_is_insecure");
Tim Emiolabe885262015-08-11 17:43:48 -0700820 Init_grpc_propagate_masks();
Tim Emiola046094d2015-08-12 16:55:57 -0700821 Init_grpc_connectivity_states();
nnoble097ef9b2014-12-01 17:06:10 -0800822}
823
824/* Gets the wrapped channel from the ruby wrapper */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700825grpc_channel* grpc_rb_get_wrapped_channel(VALUE v) {
826 grpc_rb_channel* wrapper = NULL;
Yuki Yugui Sonodac9b7d1c2015-04-11 14:50:09 +0900827 TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
Alexander Polcynb2c0b7b2017-04-27 00:26:25 -0700828 return wrapper->bg_wrapped->channel;
nnoble097ef9b2014-12-01 17:06:10 -0800829}