blob: 7add4325895efa21d74a0032415b442541f50917 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Craig Tiller9eb0fde2017-03-31 16:59:30 -070019#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080020
Mark D. Roth4c0fe492016-08-31 13:51:55 -070021#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070023#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080024
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080025#include <grpc/support/alloc.h>
26#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070027#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028#include <grpc/support/sync.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9eb0fde2017-03-31 16:59:30 -070031#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
32#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
34#include "src/core/ext/filters/client_channel/resolver_registry.h"
35#include "src/core/ext/filters/client_channel/retry_throttle.h"
36#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070037#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/channel/channel_args.h"
39#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080040#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070042#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070043#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080044#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070045#include "src/core/lib/support/string.h"
46#include "src/core/lib/surface/channel.h"
47#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070048#include "src/core/lib/transport/metadata.h"
49#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070050#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070051#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070052
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053/* Client channel implementation */
54
Mark D. Roth60751fe2017-07-07 12:50:33 -070055grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false);
56
Mark D. Roth26b7be42016-10-24 10:08:07 -070057/*************************************************************************
58 * METHOD-CONFIG TABLE
59 */
60
Mark D. Roth9d480942016-10-19 14:18:05 -070061typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080062 /* zero so it can be default initialized */
63 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070064 WAIT_FOR_READY_FALSE,
65 WAIT_FOR_READY_TRUE
66} wait_for_ready_value;
67
Mark D. Roth95b627b2017-02-24 11:02:58 -080068typedef struct {
69 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070070 gpr_timespec timeout;
71 wait_for_ready_value wait_for_ready;
72} method_parameters;
73
Mark D. Roth722de8d2017-02-27 10:50:44 -080074static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080075 method_parameters *method_params) {
76 gpr_ref(&method_params->refs);
77 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070078}
79
Mark D. Roth95b627b2017-02-24 11:02:58 -080080static void method_parameters_unref(method_parameters *method_params) {
81 if (gpr_unref(&method_params->refs)) {
82 gpr_free(method_params);
83 }
84}
85
Mark D. Roth95b627b2017-02-24 11:02:58 -080086static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
87 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -080088}
89
Mark D. Roth95b627b2017-02-24 11:02:58 -080090static bool parse_wait_for_ready(grpc_json *field,
91 wait_for_ready_value *wait_for_ready) {
92 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
93 return false;
94 }
95 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
96 : WAIT_FOR_READY_FALSE;
97 return true;
98}
99
Mark D. Roth722de8d2017-02-27 10:50:44 -0800100static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800101 if (field->type != GRPC_JSON_STRING) return false;
102 size_t len = strlen(field->value);
103 if (field->value[len - 1] != 's') return false;
104 char *buf = gpr_strdup(field->value);
105 buf[len - 1] = '\0'; // Remove trailing 's'.
106 char *decimal_point = strchr(buf, '.');
107 if (decimal_point != NULL) {
108 *decimal_point = '\0';
109 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
110 if (timeout->tv_nsec == -1) {
111 gpr_free(buf);
112 return false;
113 }
114 // There should always be exactly 3, 6, or 9 fractional digits.
115 int multiplier = 1;
116 switch (strlen(decimal_point + 1)) {
117 case 9:
118 break;
119 case 6:
120 multiplier *= 1000;
121 break;
122 case 3:
123 multiplier *= 1000000;
124 break;
125 default: // Unsupported number of digits.
126 gpr_free(buf);
127 return false;
128 }
129 timeout->tv_nsec *= multiplier;
130 }
131 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
132 gpr_free(buf);
133 if (timeout->tv_sec == -1) return false;
134 return true;
135}
136
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700137static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700138 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700139 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
140 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700141 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800142 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700143 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800144 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700145 } else if (strcmp(field->key, "timeout") == 0) {
146 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800147 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700148 }
149 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700150 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800151 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700152 value->timeout = timeout;
153 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700154 return value;
155}
156
Alexander Polcync3b1f182017-04-18 13:51:36 -0700157struct external_connectivity_watcher;
158
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700159/*************************************************************************
160 * CHANNEL-WIDE FUNCTIONS
161 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800162
Craig Tiller800dacb2015-10-06 09:10:26 -0700163typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700164 /** resolver for this channel */
165 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700166 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700167 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700168 /** is deadline checking enabled? */
169 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700170 /** client channel factory */
171 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700172
Craig Tillerbefafe62017-02-09 11:30:54 -0800173 /** combiner protecting all variables below in this data structure */
174 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700175 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700176 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800177 /** retry throttle data */
178 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700179 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800180 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700181 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700182 grpc_channel_args *resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700183 /** a list of closures that are all waiting for resolver result to come in */
184 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700185 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700186 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700187 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700188 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700189 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700190 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800191 /** owning stack */
192 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800193 /** interested parties (owned) */
194 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800195
Alexander Polcync3b1f182017-04-18 13:51:36 -0700196 /* external_connectivity_watcher_list head is guarded by its own mutex, since
197 * counts need to be grabbed immediately without polling on a cq */
198 gpr_mu external_connectivity_watcher_list_mu;
199 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
200
Craig Tiller613dafa2017-02-09 12:00:43 -0800201 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800202 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800203 gpr_mu info_mu;
204 char *info_lb_policy_name;
205 /** service config in JSON form */
206 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207} channel_data;
208
Craig Tillerd6c98df2015-08-18 09:33:44 -0700209/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700210 resolver, to watch for state changes from the lb_policy. When a state
211 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700212typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700213 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700214 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700215 grpc_connectivity_state state;
216 grpc_lb_policy *lb_policy;
217} lb_policy_connectivity_watcher;
218
Craig Tiller2400bf52017-02-09 16:25:19 -0800219static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
220 grpc_lb_policy *lb_policy,
221 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700222
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800223static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
224 channel_data *chand,
225 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700226 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800227 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700228 /* TODO: Improve failure handling:
229 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
230 * - Hand over pending picks from old policies during the switch that happens
231 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700232 if (chand->lb_policy != NULL) {
233 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
234 /* cancel picks with wait_for_ready=false */
235 grpc_lb_policy_cancel_picks_locked(
236 exec_ctx, chand->lb_policy,
237 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
238 /* check= */ 0, GRPC_ERROR_REF(error));
239 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
240 /* cancel all picks */
241 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
242 /* mask= */ 0, /* check= */ 0,
243 GRPC_ERROR_REF(error));
244 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800245 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700246 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
247 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
248 grpc_connectivity_state_name(state));
249 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700250 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
251 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800252}
253
Craig Tiller804ff712016-05-05 16:25:40 -0700254static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800255 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700256 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700257 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800258 /* check if the notification is for the latest policy */
259 if (w->lb_policy == w->chand->lb_policy) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700260 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
261 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
262 w->lb_policy, grpc_connectivity_state_name(w->state));
263 }
Craig Tillerc5de8352017-02-09 14:08:05 -0800264 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
265 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800266 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800267 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
268 w->chand->lb_policy = NULL;
269 }
270 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
271 GRPC_ERROR_REF(error), "lb_changed");
272 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800273 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800274 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800275 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800276 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700277 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700278}
279
Craig Tiller2400bf52017-02-09 16:25:19 -0800280static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
281 grpc_lb_policy *lb_policy,
282 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700283 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800284 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700285 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700286 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700287 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700288 w->state = current_state;
289 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800290 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
291 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700292}
293
Mark D. Roth60751fe2017-07-07 12:50:33 -0700294static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
295 channel_data *chand) {
296 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
297 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
298 }
299 GPR_ASSERT(!chand->started_resolving);
300 chand->started_resolving = true;
301 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
302 grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
303 &chand->on_resolver_result_changed);
304}
305
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800306typedef struct {
307 char *server_name;
308 grpc_server_retry_throttle_data *retry_throttle_data;
309} service_config_parsing_state;
310
311static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
312 service_config_parsing_state *parsing_state = arg;
313 if (strcmp(field->key, "retryThrottling") == 0) {
314 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
315 if (field->type != GRPC_JSON_OBJECT) return;
316 int max_milli_tokens = 0;
317 int milli_token_ratio = 0;
318 for (grpc_json *sub_field = field->child; sub_field != NULL;
319 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800320 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800321 if (strcmp(sub_field->key, "maxTokens") == 0) {
322 if (max_milli_tokens != 0) return; // Duplicate.
323 if (sub_field->type != GRPC_JSON_NUMBER) return;
324 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
325 if (max_milli_tokens == -1) return;
326 max_milli_tokens *= 1000;
327 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
328 if (milli_token_ratio != 0) return; // Duplicate.
329 if (sub_field->type != GRPC_JSON_NUMBER) return;
330 // We support up to 3 decimal digits.
331 size_t whole_len = strlen(sub_field->value);
332 uint32_t multiplier = 1;
333 uint32_t decimal_value = 0;
334 const char *decimal_point = strchr(sub_field->value, '.');
335 if (decimal_point != NULL) {
336 whole_len = (size_t)(decimal_point - sub_field->value);
337 multiplier = 1000;
338 size_t decimal_len = strlen(decimal_point + 1);
339 if (decimal_len > 3) decimal_len = 3;
340 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
341 &decimal_value)) {
342 return;
343 }
344 uint32_t decimal_multiplier = 1;
345 for (size_t i = 0; i < (3 - decimal_len); ++i) {
346 decimal_multiplier *= 10;
347 }
348 decimal_value *= decimal_multiplier;
349 }
350 uint32_t whole_value;
351 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
352 &whole_value)) {
353 return;
354 }
355 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800356 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800357 }
358 }
359 parsing_state->retry_throttle_data =
360 grpc_retry_throttle_map_get_data_for_server(
361 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
362 }
363}
364
Craig Tillerbefafe62017-02-09 11:30:54 -0800365static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
366 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700367 channel_data *chand = arg;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700368 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
369 gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
370 grpc_error_string(error));
371 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700372 // Extract the following fields from the resolver result, if non-NULL.
Mark D. Rothb2d24882016-10-27 15:44:07 -0700373 char *lb_policy_name = NULL;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700374 bool lb_policy_name_changed = false;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700375 grpc_lb_policy *new_lb_policy = NULL;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800376 char *service_config_json = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700377 grpc_server_retry_throttle_data *retry_throttle_data = NULL;
378 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth046cf762016-09-26 11:13:51 -0700379 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700380 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700381 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700382 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700383 if (channel_arg != NULL) {
384 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
385 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700386 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700387 // Special case: If at least one balancer address is present, we use
388 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700389 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700390 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700391 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700392 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700393 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700394 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700395 if (addresses->addresses[i].is_balancer) {
396 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700397 break;
398 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700399 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700400 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700401 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
402 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700403 "resolver requested LB policy %s but provided at least one "
404 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700405 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700406 }
407 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700408 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700409 }
410 // Use pick_first if nothing was specified and we didn't select grpclb
411 // above.
412 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700413 grpc_lb_policy_args lb_policy_args;
414 lb_policy_args.args = chand->resolver_result;
415 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800416 lb_policy_args.combiner = chand->combiner;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700417 // Check to see if we're already using the right LB policy.
418 // Note: It's safe to use chand->info_lb_policy_name here without
419 // taking a lock on chand->info_mu, because this function is the
420 // only thing that modifies its value, and it can only be invoked
421 // once at any given time.
Mark D. Roth60751fe2017-07-07 12:50:33 -0700422 lb_policy_name_changed =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700423 chand->info_lb_policy_name == NULL ||
424 strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700425 if (chand->lb_policy != NULL && !lb_policy_name_changed) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700426 // Continue using the same LB policy. Update with new addresses.
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700427 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
428 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700429 // Instantiate new LB policy.
430 new_lb_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700431 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700432 if (new_lb_policy == NULL) {
433 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700434 }
Craig Tiller45724b32015-09-22 10:42:19 -0700435 }
Mark D. Roth41124992016-11-03 11:22:20 -0700436 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700437 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700438 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700439 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700440 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800441 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700442 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800443 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700444 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800445 channel_arg =
446 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
447 GPR_ASSERT(channel_arg != NULL);
448 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700449 grpc_uri *uri =
450 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800451 GPR_ASSERT(uri->path[0] != '\0');
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700452 service_config_parsing_state parsing_state;
453 memset(&parsing_state, 0, sizeof(parsing_state));
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800454 parsing_state.server_name =
455 uri->path[0] == '/' ? uri->path + 1 : uri->path;
456 grpc_service_config_parse_global_params(
457 service_config, parse_retry_throttle_params, &parsing_state);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800458 grpc_uri_destroy(uri);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700459 retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700460 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800461 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700462 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700463 grpc_service_config_destroy(service_config);
464 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700465 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700466 // Before we clean up, save a copy of lb_policy_name, since it might
467 // be pointing to data inside chand->resolver_result.
468 // The copy will be saved in chand->lb_policy_name below.
469 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800470 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700471 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700472 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700473 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
474 gpr_log(GPR_DEBUG,
475 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
476 "service_config=\"%s\"",
477 chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
478 service_config_json);
479 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700480 // Now swap out fields in chand. Note that the new values may still
481 // be NULL if (e.g.) the resolver failed to return results or the
482 // results did not contain the necessary data.
483 //
484 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800485 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700486 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800487 gpr_free(chand->info_lb_policy_name);
488 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700489 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800490 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800491 gpr_free(chand->info_service_config_json);
492 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800493 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800494 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700495 // Swap out the retry throttle data.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800496 if (chand->retry_throttle_data != NULL) {
497 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
498 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700499 chand->retry_throttle_data = retry_throttle_data;
500 // Swap out the method params table.
Mark D. Roth9d480942016-10-19 14:18:05 -0700501 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800502 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700503 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700504 chand->method_params_table = method_params_table;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700505 // If we have a new LB policy or are shutting down (in which case
506 // new_lb_policy will be NULL), swap out the LB policy, unreffing the
507 // old one and removing its fds from chand->interested_parties.
508 // Note that we do NOT do this if either (a) we updated the existing
509 // LB policy above or (b) we failed to create the new LB policy (in
510 // which case we want to continue using the most recent one we had).
511 if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
512 chand->resolver == NULL) {
513 if (chand->lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700514 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
515 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
516 chand->lb_policy);
517 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700518 grpc_pollset_set_del_pollset_set(exec_ctx,
519 chand->lb_policy->interested_parties,
520 chand->interested_parties);
521 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700522 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700523 chand->lb_policy = new_lb_policy;
524 }
525 // Now that we've swapped out the relevant fields of chand, check for
526 // error or shutdown.
527 if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700528 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
529 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
530 }
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800531 if (chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700532 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
533 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
534 }
Craig Tiller972470b2017-02-09 15:05:36 -0800535 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800536 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
537 chand->resolver = NULL;
538 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800539 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700540 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700541 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700542 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700543 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700544 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
545 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
546 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
547 "Channel disconnected", &error, 1));
548 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
549 &chand->waiting_for_resolver_result_closures);
550 } else { // Not shutting down.
551 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
552 grpc_error *state_error =
553 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
554 if (new_lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700555 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
556 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
557 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700558 GRPC_ERROR_UNREF(state_error);
559 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
560 &state_error);
561 grpc_pollset_set_add_pollset_set(exec_ctx,
562 new_lb_policy->interested_parties,
563 chand->interested_parties);
564 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
565 &chand->waiting_for_resolver_result_closures);
566 if (chand->exit_idle_when_lb_policy_arrives) {
567 grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
568 chand->exit_idle_when_lb_policy_arrives = false;
569 }
570 watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
571 }
572 set_channel_connectivity_state_locked(
573 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
574 grpc_resolver_next_locked(exec_ctx, chand->resolver,
575 &chand->resolver_result,
576 &chand->on_resolver_result_changed);
577 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700578 }
Craig Tiller3f475422015-06-25 10:43:05 -0700579}
580
Craig Tillera8610c02017-02-14 10:05:11 -0800581static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
582 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800583 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800584 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700585 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700586
Craig Tillera82950e2015-09-22 12:33:20 -0700587 if (op->on_connectivity_state_change != NULL) {
588 grpc_connectivity_state_notify_on_state_change(
589 exec_ctx, &chand->state_tracker, op->connectivity_state,
590 op->on_connectivity_state_change);
591 op->on_connectivity_state_change = NULL;
592 op->connectivity_state = NULL;
593 }
594
Craig Tiller26dab312015-12-07 14:43:47 -0800595 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800596 if (chand->lb_policy == NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700597 GRPC_CLOSURE_SCHED(
ncteisen4b36a3d2017-03-13 19:08:06 -0700598 exec_ctx, op->send_ping,
599 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800600 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800601 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800602 op->bind_pollset = NULL;
603 }
604 op->send_ping = NULL;
605 }
606
Craig Tiller1c51edc2016-05-07 16:18:43 -0700607 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
608 if (chand->resolver != NULL) {
609 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700610 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700611 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800612 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700613 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
614 chand->resolver = NULL;
615 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700616 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700617 GRPC_ERROR_REF(op->disconnect_with_error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700618 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
619 &chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700620 }
621 if (chand->lb_policy != NULL) {
622 grpc_pollset_set_del_pollset_set(exec_ctx,
623 chand->lb_policy->interested_parties,
624 chand->interested_parties);
625 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
626 chand->lb_policy = NULL;
627 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700628 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700629 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700630 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800631 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
632
ncteisen274bbbe2017-06-08 14:57:11 -0700633 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800634}
635
636static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
637 grpc_channel_element *elem,
638 grpc_transport_op *op) {
639 channel_data *chand = elem->channel_data;
640
Craig Tillerbefafe62017-02-09 11:30:54 -0800641 GPR_ASSERT(op->set_accept_stream == false);
642 if (op->bind_pollset != NULL) {
643 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
644 op->bind_pollset);
645 }
646
Craig Tillerc55c1022017-03-10 10:26:42 -0800647 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800648 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700649 GRPC_CLOSURE_SCHED(
Craig Tillerc55c1022017-03-10 10:26:42 -0800650 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -0700651 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700652 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800653 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700654}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800655
Mark D. Rothb2d24882016-10-27 15:44:07 -0700656static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
657 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700658 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700659 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800660 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700661 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800662 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700663 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800664 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700665 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800666 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800667 *info->service_config_json =
668 chand->info_service_config_json == NULL
669 ? NULL
670 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800671 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800672 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700673}
674
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700675/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800676static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800677 grpc_channel_element *elem,
678 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700679 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700680 GPR_ASSERT(args->is_last);
681 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800682 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700683 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800684 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700685 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
686
687 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
688 chand->external_connectivity_watcher_list_head = NULL;
689 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
690
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800691 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700692 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800693 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700694 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800695 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700696 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
697 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800698 // Record client channel factory.
699 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
700 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700701 if (arg == NULL) {
702 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
703 "Missing client channel factory in args for client channel filter");
704 }
705 if (arg->type != GRPC_ARG_POINTER) {
706 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
707 "client channel factory arg must be a pointer");
708 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800709 grpc_client_channel_factory_ref(arg->value.pointer.p);
710 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800711 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800712 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700713 if (arg == NULL) {
714 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
715 "Missing server uri in args for client channel filter");
716 }
717 if (arg->type != GRPC_ARG_STRING) {
718 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
719 "server uri arg must be a string");
720 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800721 char *proxy_name = NULL;
722 grpc_channel_args *new_args = NULL;
723 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
724 &proxy_name, &new_args);
725 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800726 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800727 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
728 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800729 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800730 if (proxy_name != NULL) gpr_free(proxy_name);
731 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800732 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700733 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800734 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700735 chand->deadline_checking_enabled =
736 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800737 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700738}
739
Craig Tiller972470b2017-02-09 15:05:36 -0800740static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
741 grpc_error *error) {
742 grpc_resolver *resolver = arg;
743 grpc_resolver_shutdown_locked(exec_ctx, resolver);
744 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
745}
746
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700747/* Destructor for channel_data */
748static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
749 grpc_channel_element *elem) {
750 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700751 if (chand->resolver != NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700752 GRPC_CLOSURE_SCHED(
753 exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
Craig Tilleree4b1452017-05-12 10:56:03 -0700754 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800755 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700756 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700757 if (chand->client_channel_factory != NULL) {
758 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
759 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700760 if (chand->lb_policy != NULL) {
761 grpc_pollset_set_del_pollset_set(exec_ctx,
762 chand->lb_policy->interested_parties,
763 chand->interested_parties);
764 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
765 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800766 gpr_free(chand->info_lb_policy_name);
767 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800768 if (chand->retry_throttle_data != NULL) {
769 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
770 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700771 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800772 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700773 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700774 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800775 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800776 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800777 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700778 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700779}
780
781/*************************************************************************
782 * PER-CALL FUNCTIONS
783 */
784
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700785// Max number of batches that can be pending on a call at any given
786// time. This includes:
787// recv_initial_metadata
788// send_initial_metadata
789// recv_message
790// send_message
791// recv_trailing_metadata
792// send_trailing_metadata
793#define MAX_WAITING_BATCHES 6
794
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700795/** Call data. Holds a pointer to grpc_subchannel_call and the
796 associated machinery to create such a pointer.
797 Handles queueing of stream ops until a call object is ready, waiting
798 for initial metadata before trying to create a call object,
799 and handling cancellation gracefully. */
800typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700801 // State for handling deadlines.
802 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700803 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
804 // and this struct both independently store a pointer to the call
805 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700806 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700807 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700808
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800809 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700810 gpr_timespec call_start_time;
811 gpr_timespec deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700812 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800813 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700814
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700815 /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
816 bit is 0), or a pointer to an error (if the lowest bit is 1) */
817 gpr_atm subchannel_call_or_error;
Craig Tillerd426cac2017-03-13 12:30:45 -0700818 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700819
Mark D. Roth60751fe2017-07-07 12:50:33 -0700820 grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
821 grpc_closure lb_pick_closure;
822
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700823 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700824 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700825 grpc_polling_entity *pollent;
826
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700827 grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
828 size_t waiting_for_pick_batches_count;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700829
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700830 grpc_transport_stream_op_batch_payload *initial_metadata_payload;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700831
832 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200833
834 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800835
836 grpc_closure on_complete;
837 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700838} call_data;
839
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700840typedef struct {
841 grpc_subchannel_call *subchannel_call;
842 grpc_error *error;
843} call_or_error;
844
845static call_or_error get_call_or_error(call_data *p) {
846 gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
847 if (c == 0)
848 return (call_or_error){NULL, NULL};
849 else if (c & 1)
850 return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
851 else
852 return (call_or_error){(grpc_subchannel_call *)c, NULL};
853}
854
Craig Tiller03155722017-05-23 23:51:51 +0000855static bool set_call_or_error(call_data *p, call_or_error coe) {
856 // this should always be under a lock
857 call_or_error existing = get_call_or_error(p);
858 if (existing.error != GRPC_ERROR_NONE) {
859 GRPC_ERROR_UNREF(coe.error);
860 return false;
861 }
862 GPR_ASSERT(existing.subchannel_call == NULL);
863 if (coe.error != GRPC_ERROR_NONE) {
Craig Tiller26e69f62017-05-24 15:09:23 -0700864 GPR_ASSERT(coe.subchannel_call == NULL);
865 gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
Craig Tiller03155722017-05-23 23:51:51 +0000866 } else {
Craig Tiller26e69f62017-05-24 15:09:23 -0700867 GPR_ASSERT(coe.subchannel_call != NULL);
868 gpr_atm_rel_store(&p->subchannel_call_or_error,
869 (gpr_atm)coe.subchannel_call);
Craig Tiller03155722017-05-23 23:51:51 +0000870 }
871 return true;
872}
873
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800874grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
875 grpc_call_element *call_elem) {
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700876 return get_call_or_error(call_elem->call_data).subchannel_call;
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800877}
878
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700879static void waiting_for_pick_batches_add_locked(
880 call_data *calld, grpc_transport_stream_op_batch *batch) {
881 GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
882 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
883 batch;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700884}
885
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700886static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700887 grpc_call_element *elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700888 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700889 call_data *calld = elem->call_data;
890 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
891 gpr_log(GPR_DEBUG,
892 "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
893 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
894 grpc_error_string(error));
895 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700896 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700897 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700898 exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700899 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700900 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700901 GRPC_ERROR_UNREF(error);
902}
903
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700904static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700905 grpc_call_element *elem) {
906 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700907 if (calld->waiting_for_pick_batches_count == 0) return;
908 call_or_error coe = get_call_or_error(calld);
909 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700910 waiting_for_pick_batches_fail_locked(exec_ctx, elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700911 GRPC_ERROR_REF(coe.error));
Craig Tiller57726ca2016-09-12 11:59:45 -0700912 return;
913 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700914 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
915 gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
916 " pending batches to subchannel_call=%p",
917 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
918 coe.subchannel_call);
919 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700920 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
921 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
922 calld->waiting_for_pick_batches[i]);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700923 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700924 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700925}
926
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700927// Applies service config to the call. Must be invoked once we know
928// that the resolver has returned results to the channel.
929static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
930 grpc_call_element *elem) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700931 channel_data *chand = elem->channel_data;
932 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700933 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
934 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
935 chand, calld);
936 }
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700937 if (chand->retry_throttle_data != NULL) {
938 calld->retry_throttle_data =
939 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
940 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700941 if (chand->method_params_table != NULL) {
942 calld->method_params = grpc_method_config_table_get(
943 exec_ctx, chand->method_params_table, calld->path);
944 if (calld->method_params != NULL) {
945 method_parameters_ref(calld->method_params);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700946 // If the deadline from the service config is shorter than the one
947 // from the client API, reset the deadline timer.
948 if (chand->deadline_checking_enabled &&
949 gpr_time_cmp(calld->method_params->timeout,
Craig Tiller11c17d42017-03-13 13:36:34 -0700950 gpr_time_0(GPR_TIMESPAN)) != 0) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700951 const gpr_timespec per_method_deadline =
Craig Tiller11c17d42017-03-13 13:36:34 -0700952 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700953 if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
954 calld->deadline = per_method_deadline;
955 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
956 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700957 }
958 }
959 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700960}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700961
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700962static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700963 grpc_call_element *elem,
964 grpc_error *error) {
965 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700966 grpc_subchannel_call *subchannel_call = NULL;
967 const grpc_connected_subchannel_call_args call_args = {
968 .pollent = calld->pollent,
969 .path = calld->path,
970 .start_time = calld->call_start_time,
971 .deadline = calld->deadline,
972 .arena = calld->arena,
973 .context = calld->subchannel_call_context};
974 grpc_error *new_error = grpc_connected_subchannel_create_call(
975 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700976 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
977 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
978 elem->channel_data, calld, subchannel_call,
979 grpc_error_string(new_error));
980 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700981 GPR_ASSERT(set_call_or_error(
982 calld, (call_or_error){.subchannel_call = subchannel_call}));
983 if (new_error != GRPC_ERROR_NONE) {
984 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700985 waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700986 } else {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700987 waiting_for_pick_batches_resume_locked(exec_ctx, elem);
Craig Tiller11c17d42017-03-13 13:36:34 -0700988 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700989 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -0700990}
991
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700992static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
993 grpc_call_element *elem,
Craig Tillerbefafe62017-02-09 11:30:54 -0800994 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700995 call_data *calld = elem->call_data;
996 channel_data *chand = elem->channel_data;
Yuchen Zeng19656b12016-09-01 18:00:45 -0700997 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
998 chand->interested_parties);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700999 call_or_error coe = get_call_or_error(calld);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001000 if (calld->connected_subchannel == NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001001 // Failed to create subchannel.
Craig Tillerd3ec4aa2017-05-18 10:22:43 -07001002 grpc_error *failure =
1003 error == GRPC_ERROR_NONE
1004 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1005 "Call dropped by load balancing policy")
1006 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1007 "Failed to create subchannel", &error, 1);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001008 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1009 gpr_log(GPR_DEBUG,
1010 "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
1011 calld, grpc_error_string(failure));
1012 }
Craig Tiller03155722017-05-23 23:51:51 +00001013 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001014 waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001015 } else if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001016 /* already cancelled before subchannel became ready */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001017 grpc_error *child_errors[] = {error, coe.error};
ncteisen4b36a3d2017-03-13 19:08:06 -07001018 grpc_error *cancellation_error =
1019 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001020 "Cancelled before creating subchannel", child_errors,
1021 GPR_ARRAY_SIZE(child_errors));
David Garcia Quintas68a9e382016-12-13 10:50:40 -08001022 /* if due to deadline, attach the deadline exceeded status to the error */
1023 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
1024 cancellation_error =
1025 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
1026 GRPC_STATUS_DEADLINE_EXCEEDED);
1027 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001028 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1029 gpr_log(GPR_DEBUG,
1030 "chand=%p calld=%p: cancelled before subchannel became ready: %s",
1031 chand, calld, grpc_error_string(cancellation_error));
1032 }
1033 waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001034 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07001035 /* Create call on subchannel. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001036 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001037 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001038 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001039 GRPC_ERROR_UNREF(error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001040}
1041
1042static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
1043 call_data *calld = elem->call_data;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001044 grpc_subchannel_call *subchannel_call =
1045 get_call_or_error(calld).subchannel_call;
1046 if (subchannel_call == NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001047 return NULL;
1048 } else {
1049 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
1050 }
1051}
1052
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001053/** Return true if subchannel is available immediately (in which case
1054 subchannel_ready_locked() should not be called), or false otherwise (in
1055 which case subchannel_ready_locked() should be called when the subchannel
1056 is available). */
1057static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1058 grpc_call_element *elem);
1059
Craig Tiller577c9b22015-11-02 14:11:15 -08001060typedef struct {
Craig Tiller577c9b22015-11-02 14:11:15 -08001061 grpc_call_element *elem;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001062 bool cancelled;
Craig Tiller577c9b22015-11-02 14:11:15 -08001063 grpc_closure closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001064} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08001065
Mark D. Roth60751fe2017-07-07 12:50:33 -07001066static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
1067 void *arg,
1068 grpc_error *error) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001069 pick_after_resolver_result_args *args = arg;
1070 if (args->cancelled) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001071 /* cancelled, do nothing */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001072 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1073 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
1074 }
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001075 } else {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001076 channel_data *chand = args->elem->channel_data;
1077 call_data *calld = args->elem->call_data;
1078 if (error != GRPC_ERROR_NONE) {
1079 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1080 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
1081 chand, calld);
1082 }
1083 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
1084 } else {
1085 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1086 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
1087 chand, calld);
1088 }
1089 if (pick_subchannel_locked(exec_ctx, args->elem)) {
1090 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
1091 }
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001092 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001093 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001094 gpr_free(args);
Craig Tiller577c9b22015-11-02 14:11:15 -08001095}
1096
Mark D. Roth60751fe2017-07-07 12:50:33 -07001097static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
1098 grpc_call_element *elem) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001099 channel_data *chand = elem->channel_data;
1100 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001101 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1102 gpr_log(GPR_DEBUG,
1103 "chand=%p calld=%p: deferring pick pending resolver result", chand,
1104 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001105 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001106 pick_after_resolver_result_args *args =
1107 (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
1108 args->elem = elem;
1109 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
1110 args, grpc_combiner_scheduler(chand->combiner));
1111 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
1112 &args->closure, GRPC_ERROR_NONE);
1113}
1114
1115static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
1116 grpc_call_element *elem,
1117 grpc_error *error) {
1118 channel_data *chand = elem->channel_data;
1119 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001120 // If we don't yet have a resolver result, then a closure for
Mark D. Roth60751fe2017-07-07 12:50:33 -07001121 // pick_after_resolver_result_done_locked() will have been added to
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001122 // chand->waiting_for_resolver_result_closures, and it may not be invoked
1123 // until after this call has been destroyed. We mark the operation as
Mark D. Roth60751fe2017-07-07 12:50:33 -07001124 // cancelled, so that when pick_after_resolver_result_done_locked()
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001125 // is called, it will be a no-op. We also immediately invoke
1126 // subchannel_ready_locked() to propagate the error back to the caller.
1127 for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
Mark D. Roth64a317c2017-05-02 08:27:08 -07001128 closure != NULL; closure = closure->next_data.next) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001129 pick_after_resolver_result_args *args = closure->cb_arg;
1130 if (!args->cancelled && args->elem == elem) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001131 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1132 gpr_log(GPR_DEBUG,
1133 "chand=%p calld=%p: "
1134 "cancelling pick waiting for resolver result",
1135 chand, calld);
1136 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001137 args->cancelled = true;
1138 subchannel_ready_locked(exec_ctx, elem,
1139 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1140 "Pick cancelled", &error, 1));
Mark D. Roth64a317c2017-05-02 08:27:08 -07001141 }
1142 }
1143 GRPC_ERROR_UNREF(error);
1144}
1145
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001146// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
1147// Unrefs the LB policy after invoking subchannel_ready_locked().
1148static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1149 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001150 grpc_call_element *elem = arg;
1151 channel_data *chand = elem->channel_data;
1152 call_data *calld = elem->call_data;
1153 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1154 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
1155 chand, calld);
1156 }
1157 GPR_ASSERT(calld->lb_policy != NULL);
1158 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1159 calld->lb_policy = NULL;
1160 subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001161}
1162
1163// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
1164// If the pick was completed synchronously, unrefs the LB policy and
1165// returns true.
1166static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
1167 grpc_call_element *elem,
1168 const grpc_lb_policy_pick_args *inputs) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001169 channel_data *chand = elem->channel_data;
1170 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001171 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1172 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
1173 chand, calld, chand->lb_policy);
1174 }
1175 // Keep a ref to the LB policy in calld while the pick is pending.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001176 GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001177 calld->lb_policy = chand->lb_policy;
1178 GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001179 grpc_combiner_scheduler(chand->combiner));
1180 const bool pick_done = grpc_lb_policy_pick_locked(
1181 exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001182 calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001183 if (pick_done) {
1184 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001185 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1186 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
1187 chand, calld);
1188 }
1189 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1190 calld->lb_policy = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001191 }
1192 return pick_done;
1193}
Craig Tiller577c9b22015-11-02 14:11:15 -08001194
Mark D. Roth60751fe2017-07-07 12:50:33 -07001195static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
1196 grpc_call_element *elem,
1197 grpc_error *error) {
1198 channel_data *chand = elem->channel_data;
1199 call_data *calld = elem->call_data;
1200 GPR_ASSERT(calld->lb_policy != NULL);
1201 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1202 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
1203 chand, calld, calld->lb_policy);
1204 }
1205 grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
1206 &calld->connected_subchannel, error);
1207}
1208
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001209static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1210 grpc_call_element *elem) {
1211 GPR_TIMER_BEGIN("pick_subchannel", 0);
1212 channel_data *chand = elem->channel_data;
1213 call_data *calld = elem->call_data;
1214 bool pick_done = false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001215 if (chand->lb_policy != NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001216 apply_service_config_to_call_locked(exec_ctx, elem);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001217 // If the application explicitly set wait_for_ready, use that.
1218 // Otherwise, if the service config specified a value for this
1219 // method, use that.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001220 uint32_t initial_metadata_flags =
1221 calld->initial_metadata_payload->send_initial_metadata
1222 .send_initial_metadata_flags;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001223 const bool wait_for_ready_set_from_api =
1224 initial_metadata_flags &
1225 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1226 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001227 calld->method_params != NULL &&
1228 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001229 if (!wait_for_ready_set_from_api &&
1230 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001231 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001232 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1233 } else {
1234 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1235 }
1236 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001237 const grpc_lb_policy_pick_args inputs = {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001238 calld->initial_metadata_payload->send_initial_metadata
1239 .send_initial_metadata,
1240 initial_metadata_flags, &calld->lb_token_mdelem};
1241 pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
1242 } else if (chand->resolver != NULL) {
1243 if (!chand->started_resolving) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001244 start_resolving_locked(exec_ctx, chand);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001245 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001246 pick_after_resolver_result_start_locked(exec_ctx, elem);
Craig Tiller0eab6972016-04-23 12:59:57 -07001247 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001248 subchannel_ready_locked(
1249 exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001250 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001251 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001252 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001253}
1254
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001255static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1256 void *arg,
1257 grpc_error *error_ignored) {
1258 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001259 grpc_transport_stream_op_batch *batch = arg;
1260 grpc_call_element *elem = batch->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001261 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001262 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001263 /* need to recheck that another thread hasn't set the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001264 call_or_error coe = get_call_or_error(calld);
1265 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001266 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1267 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1268 chand, calld, grpc_error_string(coe.error));
1269 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001270 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001271 exec_ctx, batch, GRPC_ERROR_REF(coe.error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001272 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001273 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001274 if (coe.subchannel_call != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001275 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1276 gpr_log(GPR_DEBUG,
1277 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
1278 calld, coe.subchannel_call);
1279 }
1280 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001281 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001282 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001283 // Add to waiting-for-pick list. If we succeed in getting a
1284 // subchannel call below, we'll handle this batch (along with any
1285 // other waiting batches) in waiting_for_pick_batches_resume_locked().
Mark D. Roth60751fe2017-07-07 12:50:33 -07001286 waiting_for_pick_batches_add_locked(calld, batch);
1287 // If this is a cancellation, cancel the pending pick (if any) and
1288 // fail any pending batches.
1289 if (batch->cancel_stream) {
1290 grpc_error *error = batch->payload->cancel_stream.cancel_error;
1291 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1292 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
1293 calld, grpc_error_string(error));
1294 }
Craig Tiller03155722017-05-23 23:51:51 +00001295 /* Stash a copy of cancel_error in our call data, so that we can use
1296 it for subsequent operations. This ensures that if the call is
Mark D. Roth60751fe2017-07-07 12:50:33 -07001297 cancelled before any batches are passed down (e.g., if the deadline
Craig Tiller03155722017-05-23 23:51:51 +00001298 is in the past when the call starts), we can return the right
Mark D. Roth60751fe2017-07-07 12:50:33 -07001299 error to the caller when the first batch does get passed down. */
Craig Tiller03155722017-05-23 23:51:51 +00001300 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001301 if (calld->lb_policy != NULL) {
1302 pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
1303 } else {
1304 pick_after_resolver_result_cancel_locked(exec_ctx, elem,
1305 GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001306 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001307 waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001308 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001309 }
1310 /* if we don't have a subchannel, try to get one */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001311 if (batch->send_initial_metadata) {
1312 GPR_ASSERT(calld->connected_subchannel == NULL);
1313 calld->initial_metadata_payload = batch->payload;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001314 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001315 /* If a subchannel is not available immediately, the polling entity from
1316 call_data should be provided to channel_data's interested_parties, so
1317 that IO of the lb_policy and resolver could be done under it. */
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001318 if (pick_subchannel_locked(exec_ctx, elem)) {
1319 // Pick was returned synchronously.
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001320 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Rothd7389b42017-05-17 12:22:17 -07001321 if (calld->connected_subchannel == NULL) {
Mark D. Rothd7389b42017-05-17 12:22:17 -07001322 grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1323 "Call dropped by load balancing policy");
Craig Tiller26e69f62017-05-24 15:09:23 -07001324 set_call_or_error(calld,
1325 (call_or_error){.error = GRPC_ERROR_REF(error)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001326 waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001327 } else {
1328 // Create subchannel call.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001329 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -07001330 }
Yuchen Zeng19656b12016-09-01 18:00:45 -07001331 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001332 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1333 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001334 }
1335 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001336done:
1337 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
1338 "start_transport_stream_op_batch");
1339 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001340}
1341
Mark D. Rothde144102017-03-15 10:11:03 -07001342static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001343 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001344 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001345 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001346 if (error == GRPC_ERROR_NONE) {
1347 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001348 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001349 } else {
1350 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001351 // decide whether or not to retry. Note that we should only
1352 // record failures whose statuses match the configured retryable
1353 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001354 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001355 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001356 }
1357 }
ncteisen274bbbe2017-06-08 14:57:11 -07001358 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
Mark D. Roth95039b52017-02-24 07:59:45 -08001359 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001360}
1361
Craig Tillerbe9691a2017-02-14 10:00:42 -08001362/* The logic here is fairly complicated, due to (a) the fact that we
1363 need to handle the case where we receive the send op before the
1364 initial metadata op, and (b) the need for efficiency, especially in
1365 the streaming case.
1366
1367 We use double-checked locking to initially see if initialization has been
1368 performed. If it has not, we acquire the combiner and perform initialization.
1369 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001370static void cc_start_transport_stream_op_batch(
1371 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001372 grpc_transport_stream_op_batch *batch) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001373 call_data *calld = elem->call_data;
1374 channel_data *chand = elem->channel_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001375 if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
1376 GRPC_TRACER_ON(grpc_trace_channel)) {
1377 grpc_call_log_op(GPR_INFO, elem, batch);
1378 }
Craig Tiller3be7dd02017-04-03 14:30:03 -07001379 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001380 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001381 batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001382 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001383 // Intercept on_complete for recv_trailing_metadata so that we can
1384 // check retry throttle status.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001385 if (batch->recv_trailing_metadata) {
1386 GPR_ASSERT(batch->on_complete != NULL);
1387 calld->original_on_complete = batch->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001388 GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
1389 grpc_schedule_on_exec_ctx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001390 batch->on_complete = &calld->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001391 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001392 /* try to (atomically) get the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001393 call_or_error coe = get_call_or_error(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001394 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001395 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001396 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1397 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1398 chand, calld, grpc_error_string(coe.error));
1399 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001400 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001401 exec_ctx, batch, GRPC_ERROR_REF(coe.error));
1402 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001403 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001404 if (coe.subchannel_call != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001405 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1406 gpr_log(GPR_DEBUG,
1407 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
1408 calld, coe.subchannel_call);
1409 }
1410 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
1411 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001412 }
1413 /* we failed; lock and figure out what to do */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001414 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1415 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
1416 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001417 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001418 batch->handler_private.extra_arg = elem;
ncteisen274bbbe2017-06-08 14:57:11 -07001419 GRPC_CLOSURE_SCHED(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001420 exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1421 start_transport_stream_op_batch_locked, batch,
Craig Tilleree4b1452017-05-12 10:56:03 -07001422 grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -08001423 GRPC_ERROR_NONE);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001424done:
Craig Tillera0f3abd2017-03-31 15:42:16 -07001425 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001426}
1427
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001428/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001429static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1430 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001431 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001432 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001433 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001434 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001435 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001436 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001437 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001438 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001439 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001440 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001441 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001442 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001443 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001444}
1445
1446/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001447static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1448 grpc_call_element *elem,
1449 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001450 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001451 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001452 channel_data *chand = elem->channel_data;
1453 if (chand->deadline_checking_enabled) {
1454 grpc_deadline_state_destroy(exec_ctx, elem);
1455 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001456 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001457 if (calld->method_params != NULL) {
1458 method_parameters_unref(calld->method_params);
1459 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001460 call_or_error coe = get_call_or_error(calld);
1461 GRPC_ERROR_UNREF(coe.error);
1462 if (coe.subchannel_call != NULL) {
1463 grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
1464 then_schedule_closure);
Craig Tillerd426cac2017-03-13 12:30:45 -07001465 then_schedule_closure = NULL;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001466 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
1467 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001468 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001469 GPR_ASSERT(calld->lb_policy == NULL);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001470 GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001471 if (calld->connected_subchannel != NULL) {
1472 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1473 "picked");
1474 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001475 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1476 if (calld->subchannel_call_context[i].value != NULL) {
1477 calld->subchannel_call_context[i].destroy(
1478 calld->subchannel_call_context[i].value);
1479 }
1480 }
ncteisen274bbbe2017-06-08 14:57:11 -07001481 GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001482}
1483
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001484static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1485 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001486 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001487 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001488 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001489}
1490
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001491/*************************************************************************
1492 * EXPORTED SYMBOLS
1493 */
1494
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001495const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001496 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001497 cc_start_transport_op,
1498 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001499 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001500 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001501 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001502 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001503 cc_init_channel_elem,
1504 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001505 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001506 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001507 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001508};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001509
Craig Tiller613dafa2017-02-09 12:00:43 -08001510static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1511 grpc_error *error_ignored) {
1512 channel_data *chand = arg;
1513 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001514 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001515 } else {
1516 chand->exit_idle_when_lb_policy_arrives = true;
1517 if (!chand->started_resolving && chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001518 start_resolving_locked(exec_ctx, chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08001519 }
1520 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001521 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001522}
1523
Craig Tillera82950e2015-09-22 12:33:20 -07001524grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1525 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001526 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001527 grpc_connectivity_state out =
1528 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001529 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001530 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07001531 GRPC_CLOSURE_SCHED(
1532 exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -07001533 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001534 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001535 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001536 return out;
1537}
1538
Alexander Polcync3b1f182017-04-18 13:51:36 -07001539typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001540 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001541 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001542 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001543 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001544 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001545 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001546 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001547} external_connectivity_watcher;
1548
Alexander Polcync3b1f182017-04-18 13:51:36 -07001549static external_connectivity_watcher *lookup_external_connectivity_watcher(
1550 channel_data *chand, grpc_closure *on_complete) {
1551 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1552 external_connectivity_watcher *w =
1553 chand->external_connectivity_watcher_list_head;
1554 while (w != NULL && w->on_complete != on_complete) {
1555 w = w->next;
1556 }
1557 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1558 return w;
1559}
1560
1561static void external_connectivity_watcher_list_append(
1562 channel_data *chand, external_connectivity_watcher *w) {
1563 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1564
1565 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1566 GPR_ASSERT(!w->next);
1567 w->next = chand->external_connectivity_watcher_list_head;
1568 chand->external_connectivity_watcher_list_head = w;
1569 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1570}
1571
1572static void external_connectivity_watcher_list_remove(
1573 channel_data *chand, external_connectivity_watcher *too_remove) {
1574 GPR_ASSERT(
1575 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1576 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1577 if (too_remove == chand->external_connectivity_watcher_list_head) {
1578 chand->external_connectivity_watcher_list_head = too_remove->next;
1579 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1580 return;
1581 }
1582 external_connectivity_watcher *w =
1583 chand->external_connectivity_watcher_list_head;
1584 while (w != NULL) {
1585 if (w->next == too_remove) {
1586 w->next = w->next->next;
1587 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1588 return;
1589 }
1590 w = w->next;
1591 }
1592 GPR_UNREACHABLE_CODE(return );
1593}
1594
1595int grpc_client_channel_num_external_connectivity_watchers(
1596 grpc_channel_element *elem) {
1597 channel_data *chand = elem->channel_data;
1598 int count = 0;
1599
1600 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1601 external_connectivity_watcher *w =
1602 chand->external_connectivity_watcher_list_head;
1603 while (w != NULL) {
1604 count++;
1605 w = w->next;
1606 }
1607 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1608
1609 return count;
1610}
1611
Craig Tiller1d881fb2015-12-01 07:39:04 -08001612static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001613 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001614 external_connectivity_watcher *w = arg;
1615 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001616 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1617 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001618 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1619 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001620 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001621 gpr_free(w);
ncteisen274bbbe2017-06-08 14:57:11 -07001622 GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08001623}
1624
Craig Tillera8610c02017-02-14 10:05:11 -08001625static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1626 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001627 external_connectivity_watcher *w = arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001628 external_connectivity_watcher *found = NULL;
1629 if (w->state != NULL) {
1630 external_connectivity_watcher_list_append(w->chand, w);
ncteisen274bbbe2017-06-08 14:57:11 -07001631 GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1632 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
Alexander Polcync3b1f182017-04-18 13:51:36 -07001633 grpc_schedule_on_exec_ctx);
1634 grpc_connectivity_state_notify_on_state_change(
1635 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1636 } else {
1637 GPR_ASSERT(w->watcher_timer_init == NULL);
1638 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1639 if (found) {
1640 GPR_ASSERT(found->on_complete == w->on_complete);
1641 grpc_connectivity_state_notify_on_state_change(
1642 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1643 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001644 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1645 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001646 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1647 "external_connectivity_watcher");
1648 gpr_free(w);
1649 }
Craig Tiller86c99582015-11-25 15:22:26 -08001650}
1651
Craig Tillera82950e2015-09-22 12:33:20 -07001652void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001653 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1654 grpc_polling_entity pollent, grpc_connectivity_state *state,
1655 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001656 channel_data *chand = elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001657 external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001658 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001659 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001660 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001661 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001662 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001663 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1664 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001665 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1666 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07001667 GRPC_CLOSURE_SCHED(
Craig Tiller613dafa2017-02-09 12:00:43 -08001668 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -07001669 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07001670 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001671 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001672}