blob: 508c473c6ac824891089aaf27d38d7fcbb0d83bf [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Craig Tiller9eb0fde2017-03-31 16:59:30 -070019#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080020
Mark D. Roth4c0fe492016-08-31 13:51:55 -070021#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070023#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080024
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080025#include <grpc/support/alloc.h>
26#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070027#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028#include <grpc/support/sync.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9eb0fde2017-03-31 16:59:30 -070031#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
32#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
34#include "src/core/ext/filters/client_channel/resolver_registry.h"
35#include "src/core/ext/filters/client_channel/retry_throttle.h"
36#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070037#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/channel/channel_args.h"
39#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080040#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070042#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070043#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080044#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070045#include "src/core/lib/support/string.h"
46#include "src/core/lib/surface/channel.h"
47#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070048#include "src/core/lib/transport/metadata.h"
49#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070050#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070051#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070052
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053/* Client channel implementation */
54
ncteisen7712c7c2017-07-12 23:11:27 -070055grpc_tracer_flag grpc_client_channel_trace =
56 GRPC_TRACER_INITIALIZER(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070057
Mark D. Roth26b7be42016-10-24 10:08:07 -070058/*************************************************************************
59 * METHOD-CONFIG TABLE
60 */
61
Mark D. Roth9d480942016-10-19 14:18:05 -070062typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080063 /* zero so it can be default initialized */
64 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070065 WAIT_FOR_READY_FALSE,
66 WAIT_FOR_READY_TRUE
67} wait_for_ready_value;
68
Mark D. Roth95b627b2017-02-24 11:02:58 -080069typedef struct {
70 gpr_refcount refs;
Craig Tiller89c14282017-07-19 15:32:27 -070071 grpc_millis timeout;
Mark D. Roth9d480942016-10-19 14:18:05 -070072 wait_for_ready_value wait_for_ready;
73} method_parameters;
74
Mark D. Roth722de8d2017-02-27 10:50:44 -080075static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080076 method_parameters *method_params) {
77 gpr_ref(&method_params->refs);
78 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070079}
80
Mark D. Roth95b627b2017-02-24 11:02:58 -080081static void method_parameters_unref(method_parameters *method_params) {
82 if (gpr_unref(&method_params->refs)) {
83 gpr_free(method_params);
84 }
85}
86
Mark D. Roth95b627b2017-02-24 11:02:58 -080087static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
88 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -080089}
90
Mark D. Roth95b627b2017-02-24 11:02:58 -080091static bool parse_wait_for_ready(grpc_json *field,
92 wait_for_ready_value *wait_for_ready) {
93 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
94 return false;
95 }
96 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
97 : WAIT_FOR_READY_FALSE;
98 return true;
99}
100
Craig Tiller89c14282017-07-19 15:32:27 -0700101static bool parse_timeout(grpc_json *field, grpc_millis *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800102 if (field->type != GRPC_JSON_STRING) return false;
103 size_t len = strlen(field->value);
104 if (field->value[len - 1] != 's') return false;
105 char *buf = gpr_strdup(field->value);
106 buf[len - 1] = '\0'; // Remove trailing 's'.
107 char *decimal_point = strchr(buf, '.');
Craig Tiller89c14282017-07-19 15:32:27 -0700108 int nanos = 0;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800109 if (decimal_point != NULL) {
110 *decimal_point = '\0';
Craig Tiller89c14282017-07-19 15:32:27 -0700111 nanos = gpr_parse_nonnegative_int(decimal_point + 1);
112 if (nanos == -1) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800113 gpr_free(buf);
114 return false;
115 }
116 // There should always be exactly 3, 6, or 9 fractional digits.
117 int multiplier = 1;
118 switch (strlen(decimal_point + 1)) {
119 case 9:
120 break;
121 case 6:
122 multiplier *= 1000;
123 break;
124 case 3:
125 multiplier *= 1000000;
126 break;
127 default: // Unsupported number of digits.
128 gpr_free(buf);
129 return false;
130 }
Craig Tiller89c14282017-07-19 15:32:27 -0700131 nanos *= multiplier;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800132 }
Craig Tiller89c14282017-07-19 15:32:27 -0700133 int seconds = gpr_parse_nonnegative_int(buf);
Mark D. Roth95b627b2017-02-24 11:02:58 -0800134 gpr_free(buf);
Craig Tiller89c14282017-07-19 15:32:27 -0700135 if (seconds == -1) return false;
136 *timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800137 return true;
138}
139
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700140static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700141 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Craig Tiller89c14282017-07-19 15:32:27 -0700142 grpc_millis timeout = 0;
Mark D. Roth47f10842016-11-03 08:45:27 -0700143 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700144 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800145 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700146 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800147 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700148 } else if (strcmp(field->key, "timeout") == 0) {
Craig Tiller89c14282017-07-19 15:32:27 -0700149 if (timeout > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800150 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700151 }
152 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700153 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800154 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700155 value->timeout = timeout;
156 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700157 return value;
158}
159
Alexander Polcync3b1f182017-04-18 13:51:36 -0700160struct external_connectivity_watcher;
161
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700162/*************************************************************************
163 * CHANNEL-WIDE FUNCTIONS
164 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800165
Craig Tiller800dacb2015-10-06 09:10:26 -0700166typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700167 /** resolver for this channel */
168 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700169 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700170 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700171 /** is deadline checking enabled? */
172 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700173 /** client channel factory */
174 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700175
Craig Tillerbefafe62017-02-09 11:30:54 -0800176 /** combiner protecting all variables below in this data structure */
177 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700178 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700179 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800180 /** retry throttle data */
181 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700182 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800183 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700184 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700185 grpc_channel_args *resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700186 /** a list of closures that are all waiting for resolver result to come in */
187 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700188 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700189 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700190 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700191 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700192 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700193 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800194 /** owning stack */
195 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800196 /** interested parties (owned) */
197 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800198
Alexander Polcync3b1f182017-04-18 13:51:36 -0700199 /* external_connectivity_watcher_list head is guarded by its own mutex, since
200 * counts need to be grabbed immediately without polling on a cq */
201 gpr_mu external_connectivity_watcher_list_mu;
202 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
203
Craig Tiller613dafa2017-02-09 12:00:43 -0800204 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800205 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800206 gpr_mu info_mu;
207 char *info_lb_policy_name;
208 /** service config in JSON form */
209 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800210} channel_data;
211
Craig Tillerd6c98df2015-08-18 09:33:44 -0700212/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700213 resolver, to watch for state changes from the lb_policy. When a state
214 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700215typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700216 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700217 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700218 grpc_connectivity_state state;
219 grpc_lb_policy *lb_policy;
220} lb_policy_connectivity_watcher;
221
Craig Tiller2400bf52017-02-09 16:25:19 -0800222static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
223 grpc_lb_policy *lb_policy,
224 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700225
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800226static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
227 channel_data *chand,
228 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700229 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800230 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700231 /* TODO: Improve failure handling:
232 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
233 * - Hand over pending picks from old policies during the switch that happens
234 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700235 if (chand->lb_policy != NULL) {
236 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
237 /* cancel picks with wait_for_ready=false */
238 grpc_lb_policy_cancel_picks_locked(
239 exec_ctx, chand->lb_policy,
240 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
241 /* check= */ 0, GRPC_ERROR_REF(error));
242 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
243 /* cancel all picks */
244 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
245 /* mask= */ 0, /* check= */ 0,
246 GRPC_ERROR_REF(error));
247 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800248 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700249 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
250 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
251 grpc_connectivity_state_name(state));
252 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700253 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
254 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800255}
256
Craig Tiller804ff712016-05-05 16:25:40 -0700257static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800258 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700259 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700260 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800261 /* check if the notification is for the latest policy */
262 if (w->lb_policy == w->chand->lb_policy) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700263 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
264 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
265 w->lb_policy, grpc_connectivity_state_name(w->state));
266 }
Craig Tillerc5de8352017-02-09 14:08:05 -0800267 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
268 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800269 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800270 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
271 w->chand->lb_policy = NULL;
272 }
273 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
274 GRPC_ERROR_REF(error), "lb_changed");
275 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800276 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800277 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800278 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800279 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700280 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700281}
282
Craig Tiller2400bf52017-02-09 16:25:19 -0800283static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
284 grpc_lb_policy *lb_policy,
285 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700286 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800287 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700288 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700289 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700290 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700291 w->state = current_state;
292 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800293 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
294 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700295}
296
Mark D. Roth60751fe2017-07-07 12:50:33 -0700297static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
298 channel_data *chand) {
299 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
300 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
301 }
302 GPR_ASSERT(!chand->started_resolving);
303 chand->started_resolving = true;
304 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
305 grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
306 &chand->on_resolver_result_changed);
307}
308
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800309typedef struct {
310 char *server_name;
311 grpc_server_retry_throttle_data *retry_throttle_data;
312} service_config_parsing_state;
313
314static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
315 service_config_parsing_state *parsing_state = arg;
316 if (strcmp(field->key, "retryThrottling") == 0) {
317 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
318 if (field->type != GRPC_JSON_OBJECT) return;
319 int max_milli_tokens = 0;
320 int milli_token_ratio = 0;
321 for (grpc_json *sub_field = field->child; sub_field != NULL;
322 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800323 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800324 if (strcmp(sub_field->key, "maxTokens") == 0) {
325 if (max_milli_tokens != 0) return; // Duplicate.
326 if (sub_field->type != GRPC_JSON_NUMBER) return;
327 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
328 if (max_milli_tokens == -1) return;
329 max_milli_tokens *= 1000;
330 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
331 if (milli_token_ratio != 0) return; // Duplicate.
332 if (sub_field->type != GRPC_JSON_NUMBER) return;
333 // We support up to 3 decimal digits.
334 size_t whole_len = strlen(sub_field->value);
335 uint32_t multiplier = 1;
336 uint32_t decimal_value = 0;
337 const char *decimal_point = strchr(sub_field->value, '.');
338 if (decimal_point != NULL) {
339 whole_len = (size_t)(decimal_point - sub_field->value);
340 multiplier = 1000;
341 size_t decimal_len = strlen(decimal_point + 1);
342 if (decimal_len > 3) decimal_len = 3;
343 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
344 &decimal_value)) {
345 return;
346 }
347 uint32_t decimal_multiplier = 1;
348 for (size_t i = 0; i < (3 - decimal_len); ++i) {
349 decimal_multiplier *= 10;
350 }
351 decimal_value *= decimal_multiplier;
352 }
353 uint32_t whole_value;
354 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
355 &whole_value)) {
356 return;
357 }
358 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800359 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800360 }
361 }
362 parsing_state->retry_throttle_data =
363 grpc_retry_throttle_map_get_data_for_server(
364 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
365 }
366}
367
Craig Tillerbefafe62017-02-09 11:30:54 -0800368static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
369 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700370 channel_data *chand = arg;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700371 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
372 gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
373 grpc_error_string(error));
374 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700375 // Extract the following fields from the resolver result, if non-NULL.
Mark D. Roth15494b52017-07-12 15:26:55 -0700376 bool lb_policy_updated = false;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700377 char *lb_policy_name = NULL;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700378 bool lb_policy_name_changed = false;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700379 grpc_lb_policy *new_lb_policy = NULL;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800380 char *service_config_json = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700381 grpc_server_retry_throttle_data *retry_throttle_data = NULL;
382 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth046cf762016-09-26 11:13:51 -0700383 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700384 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700385 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700386 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700387 if (channel_arg != NULL) {
388 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
389 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700390 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700391 // Special case: If at least one balancer address is present, we use
392 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700393 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700394 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700395 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700396 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700397 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700398 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700399 if (addresses->addresses[i].is_balancer) {
400 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700401 break;
402 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700403 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700404 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700405 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
406 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700407 "resolver requested LB policy %s but provided at least one "
408 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700409 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700410 }
411 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700412 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700413 }
414 // Use pick_first if nothing was specified and we didn't select grpclb
415 // above.
416 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700417 grpc_lb_policy_args lb_policy_args;
418 lb_policy_args.args = chand->resolver_result;
419 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800420 lb_policy_args.combiner = chand->combiner;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700421 // Check to see if we're already using the right LB policy.
422 // Note: It's safe to use chand->info_lb_policy_name here without
423 // taking a lock on chand->info_mu, because this function is the
424 // only thing that modifies its value, and it can only be invoked
425 // once at any given time.
Mark D. Roth60751fe2017-07-07 12:50:33 -0700426 lb_policy_name_changed =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700427 chand->info_lb_policy_name == NULL ||
428 strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700429 if (chand->lb_policy != NULL && !lb_policy_name_changed) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700430 // Continue using the same LB policy. Update with new addresses.
Mark D. Roth15494b52017-07-12 15:26:55 -0700431 lb_policy_updated = true;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700432 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
433 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700434 // Instantiate new LB policy.
435 new_lb_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700436 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700437 if (new_lb_policy == NULL) {
438 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700439 }
Craig Tiller45724b32015-09-22 10:42:19 -0700440 }
Mark D. Roth41124992016-11-03 11:22:20 -0700441 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700442 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700443 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700444 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700445 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800446 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700447 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800448 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700449 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800450 channel_arg =
451 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
452 GPR_ASSERT(channel_arg != NULL);
453 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700454 grpc_uri *uri =
455 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800456 GPR_ASSERT(uri->path[0] != '\0');
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700457 service_config_parsing_state parsing_state;
458 memset(&parsing_state, 0, sizeof(parsing_state));
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800459 parsing_state.server_name =
460 uri->path[0] == '/' ? uri->path + 1 : uri->path;
461 grpc_service_config_parse_global_params(
462 service_config, parse_retry_throttle_params, &parsing_state);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800463 grpc_uri_destroy(uri);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700464 retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700465 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800466 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700467 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700468 grpc_service_config_destroy(service_config);
469 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700470 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700471 // Before we clean up, save a copy of lb_policy_name, since it might
472 // be pointing to data inside chand->resolver_result.
473 // The copy will be saved in chand->lb_policy_name below.
474 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800475 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700476 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700477 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700478 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
479 gpr_log(GPR_DEBUG,
480 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
481 "service_config=\"%s\"",
482 chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
483 service_config_json);
484 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700485 // Now swap out fields in chand. Note that the new values may still
486 // be NULL if (e.g.) the resolver failed to return results or the
487 // results did not contain the necessary data.
488 //
489 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800490 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700491 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800492 gpr_free(chand->info_lb_policy_name);
493 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700494 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800495 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800496 gpr_free(chand->info_service_config_json);
497 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800498 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800499 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700500 // Swap out the retry throttle data.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800501 if (chand->retry_throttle_data != NULL) {
502 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
503 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700504 chand->retry_throttle_data = retry_throttle_data;
505 // Swap out the method params table.
Mark D. Roth9d480942016-10-19 14:18:05 -0700506 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800507 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700508 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700509 chand->method_params_table = method_params_table;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700510 // If we have a new LB policy or are shutting down (in which case
511 // new_lb_policy will be NULL), swap out the LB policy, unreffing the
512 // old one and removing its fds from chand->interested_parties.
513 // Note that we do NOT do this if either (a) we updated the existing
514 // LB policy above or (b) we failed to create the new LB policy (in
515 // which case we want to continue using the most recent one we had).
516 if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
517 chand->resolver == NULL) {
518 if (chand->lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700519 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
520 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
521 chand->lb_policy);
522 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700523 grpc_pollset_set_del_pollset_set(exec_ctx,
524 chand->lb_policy->interested_parties,
525 chand->interested_parties);
526 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700527 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700528 chand->lb_policy = new_lb_policy;
529 }
530 // Now that we've swapped out the relevant fields of chand, check for
531 // error or shutdown.
532 if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700533 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
534 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
535 }
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800536 if (chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700537 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
538 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
539 }
Craig Tiller972470b2017-02-09 15:05:36 -0800540 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800541 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
542 chand->resolver = NULL;
543 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800544 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700545 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700546 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700547 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700548 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700549 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
550 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
551 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
552 "Channel disconnected", &error, 1));
553 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
554 &chand->waiting_for_resolver_result_closures);
555 } else { // Not shutting down.
556 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
557 grpc_error *state_error =
558 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
559 if (new_lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700560 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
561 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
562 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700563 GRPC_ERROR_UNREF(state_error);
564 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
565 &state_error);
566 grpc_pollset_set_add_pollset_set(exec_ctx,
567 new_lb_policy->interested_parties,
568 chand->interested_parties);
569 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
570 &chand->waiting_for_resolver_result_closures);
571 if (chand->exit_idle_when_lb_policy_arrives) {
572 grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
573 chand->exit_idle_when_lb_policy_arrives = false;
574 }
575 watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
576 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700577 if (!lb_policy_updated) {
578 set_channel_connectivity_state_locked(exec_ctx, chand, state,
579 GRPC_ERROR_REF(state_error),
580 "new_lb+resolver");
581 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700582 grpc_resolver_next_locked(exec_ctx, chand->resolver,
583 &chand->resolver_result,
584 &chand->on_resolver_result_changed);
585 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700586 }
Craig Tiller3f475422015-06-25 10:43:05 -0700587}
588
Craig Tillera8610c02017-02-14 10:05:11 -0800589static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
590 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800591 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800592 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700593 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700594
Craig Tillera82950e2015-09-22 12:33:20 -0700595 if (op->on_connectivity_state_change != NULL) {
596 grpc_connectivity_state_notify_on_state_change(
597 exec_ctx, &chand->state_tracker, op->connectivity_state,
598 op->on_connectivity_state_change);
599 op->on_connectivity_state_change = NULL;
600 op->connectivity_state = NULL;
601 }
602
Craig Tiller26dab312015-12-07 14:43:47 -0800603 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800604 if (chand->lb_policy == NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700605 GRPC_CLOSURE_SCHED(
ncteisen4b36a3d2017-03-13 19:08:06 -0700606 exec_ctx, op->send_ping,
607 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800608 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800609 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800610 op->bind_pollset = NULL;
611 }
612 op->send_ping = NULL;
613 }
614
Craig Tiller1c51edc2016-05-07 16:18:43 -0700615 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
616 if (chand->resolver != NULL) {
617 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700618 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700619 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800620 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700621 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
622 chand->resolver = NULL;
623 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700624 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700625 GRPC_ERROR_REF(op->disconnect_with_error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700626 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
627 &chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700628 }
629 if (chand->lb_policy != NULL) {
630 grpc_pollset_set_del_pollset_set(exec_ctx,
631 chand->lb_policy->interested_parties,
632 chand->interested_parties);
633 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
634 chand->lb_policy = NULL;
635 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700636 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700637 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700638 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800639 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
640
ncteisen274bbbe2017-06-08 14:57:11 -0700641 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800642}
643
644static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
645 grpc_channel_element *elem,
646 grpc_transport_op *op) {
647 channel_data *chand = elem->channel_data;
648
Craig Tillerbefafe62017-02-09 11:30:54 -0800649 GPR_ASSERT(op->set_accept_stream == false);
650 if (op->bind_pollset != NULL) {
651 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
652 op->bind_pollset);
653 }
654
Craig Tillerc55c1022017-03-10 10:26:42 -0800655 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800656 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700657 GRPC_CLOSURE_SCHED(
Craig Tillerc55c1022017-03-10 10:26:42 -0800658 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -0700659 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700660 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800661 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700662}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800663
Mark D. Rothb2d24882016-10-27 15:44:07 -0700664static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
665 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700666 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700667 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800668 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700669 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800670 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700671 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800672 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700673 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800674 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800675 *info->service_config_json =
676 chand->info_service_config_json == NULL
677 ? NULL
678 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800679 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800680 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700681}
682
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700683/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800684static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800685 grpc_channel_element *elem,
686 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700687 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700688 GPR_ASSERT(args->is_last);
689 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800690 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700691 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800692 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700693 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
694
695 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
696 chand->external_connectivity_watcher_list_head = NULL;
697 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
698
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800699 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700700 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800701 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700702 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800703 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700704 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
705 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800706 // Record client channel factory.
707 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
708 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700709 if (arg == NULL) {
710 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
711 "Missing client channel factory in args for client channel filter");
712 }
713 if (arg->type != GRPC_ARG_POINTER) {
714 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
715 "client channel factory arg must be a pointer");
716 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800717 grpc_client_channel_factory_ref(arg->value.pointer.p);
718 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800719 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800720 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700721 if (arg == NULL) {
722 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
723 "Missing server uri in args for client channel filter");
724 }
725 if (arg->type != GRPC_ARG_STRING) {
726 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
727 "server uri arg must be a string");
728 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800729 char *proxy_name = NULL;
730 grpc_channel_args *new_args = NULL;
731 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
732 &proxy_name, &new_args);
733 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800734 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800735 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
736 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800737 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800738 if (proxy_name != NULL) gpr_free(proxy_name);
739 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800740 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700741 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800742 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700743 chand->deadline_checking_enabled =
744 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800745 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700746}
747
Craig Tiller972470b2017-02-09 15:05:36 -0800748static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
749 grpc_error *error) {
750 grpc_resolver *resolver = arg;
751 grpc_resolver_shutdown_locked(exec_ctx, resolver);
752 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
753}
754
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700755/* Destructor for channel_data */
756static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
757 grpc_channel_element *elem) {
758 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700759 if (chand->resolver != NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700760 GRPC_CLOSURE_SCHED(
761 exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
Craig Tilleree4b1452017-05-12 10:56:03 -0700762 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800763 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700764 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700765 if (chand->client_channel_factory != NULL) {
766 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
767 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700768 if (chand->lb_policy != NULL) {
769 grpc_pollset_set_del_pollset_set(exec_ctx,
770 chand->lb_policy->interested_parties,
771 chand->interested_parties);
772 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
773 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800774 gpr_free(chand->info_lb_policy_name);
775 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800776 if (chand->retry_throttle_data != NULL) {
777 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
778 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700779 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800780 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700781 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700782 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800783 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800784 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800785 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700786 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700787}
788
789/*************************************************************************
790 * PER-CALL FUNCTIONS
791 */
792
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700793// Max number of batches that can be pending on a call at any given
794// time. This includes:
795// recv_initial_metadata
796// send_initial_metadata
797// recv_message
798// send_message
799// recv_trailing_metadata
800// send_trailing_metadata
801#define MAX_WAITING_BATCHES 6
802
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700803/** Call data. Holds a pointer to grpc_subchannel_call and the
804 associated machinery to create such a pointer.
805 Handles queueing of stream ops until a call object is ready, waiting
806 for initial metadata before trying to create a call object,
807 and handling cancellation gracefully. */
808typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700809 // State for handling deadlines.
810 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700811 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
812 // and this struct both independently store a pointer to the call
813 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700814 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700815 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700816
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800817 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700818 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700819 grpc_millis deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700820 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800821 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700822
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700823 /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
824 bit is 0), or a pointer to an error (if the lowest bit is 1) */
825 gpr_atm subchannel_call_or_error;
Craig Tillerd426cac2017-03-13 12:30:45 -0700826 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700827
Mark D. Roth60751fe2017-07-07 12:50:33 -0700828 grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
829 grpc_closure lb_pick_closure;
830
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700831 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700832 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700833 grpc_polling_entity *pollent;
834
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700835 grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
836 size_t waiting_for_pick_batches_count;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700837
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700838 grpc_transport_stream_op_batch_payload *initial_metadata_payload;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700839
840 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200841
842 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800843
844 grpc_closure on_complete;
845 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700846} call_data;
847
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700848typedef struct {
849 grpc_subchannel_call *subchannel_call;
850 grpc_error *error;
851} call_or_error;
852
853static call_or_error get_call_or_error(call_data *p) {
854 gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
855 if (c == 0)
856 return (call_or_error){NULL, NULL};
857 else if (c & 1)
858 return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
859 else
860 return (call_or_error){(grpc_subchannel_call *)c, NULL};
861}
862
Craig Tiller03155722017-05-23 23:51:51 +0000863static bool set_call_or_error(call_data *p, call_or_error coe) {
864 // this should always be under a lock
865 call_or_error existing = get_call_or_error(p);
866 if (existing.error != GRPC_ERROR_NONE) {
867 GRPC_ERROR_UNREF(coe.error);
868 return false;
869 }
870 GPR_ASSERT(existing.subchannel_call == NULL);
871 if (coe.error != GRPC_ERROR_NONE) {
Craig Tiller26e69f62017-05-24 15:09:23 -0700872 GPR_ASSERT(coe.subchannel_call == NULL);
873 gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
Craig Tiller03155722017-05-23 23:51:51 +0000874 } else {
Craig Tiller26e69f62017-05-24 15:09:23 -0700875 GPR_ASSERT(coe.subchannel_call != NULL);
876 gpr_atm_rel_store(&p->subchannel_call_or_error,
877 (gpr_atm)coe.subchannel_call);
Craig Tiller03155722017-05-23 23:51:51 +0000878 }
879 return true;
880}
881
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800882grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
883 grpc_call_element *call_elem) {
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700884 return get_call_or_error(call_elem->call_data).subchannel_call;
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800885}
886
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700887static void waiting_for_pick_batches_add_locked(
888 call_data *calld, grpc_transport_stream_op_batch *batch) {
889 GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
890 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
891 batch;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700892}
893
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700894static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700895 grpc_call_element *elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700896 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700897 call_data *calld = elem->call_data;
898 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
899 gpr_log(GPR_DEBUG,
900 "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
901 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
902 grpc_error_string(error));
903 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700904 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700905 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700906 exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700907 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700908 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700909 GRPC_ERROR_UNREF(error);
910}
911
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700912static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700913 grpc_call_element *elem) {
914 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700915 if (calld->waiting_for_pick_batches_count == 0) return;
916 call_or_error coe = get_call_or_error(calld);
917 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700918 waiting_for_pick_batches_fail_locked(exec_ctx, elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700919 GRPC_ERROR_REF(coe.error));
Craig Tiller57726ca2016-09-12 11:59:45 -0700920 return;
921 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700922 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
923 gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
924 " pending batches to subchannel_call=%p",
925 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
926 coe.subchannel_call);
927 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700928 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
929 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
930 calld->waiting_for_pick_batches[i]);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700931 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700932 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700933}
934
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700935// Applies service config to the call. Must be invoked once we know
936// that the resolver has returned results to the channel.
937static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
938 grpc_call_element *elem) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700939 channel_data *chand = elem->channel_data;
940 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700941 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
942 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
943 chand, calld);
944 }
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700945 if (chand->retry_throttle_data != NULL) {
946 calld->retry_throttle_data =
947 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
948 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700949 if (chand->method_params_table != NULL) {
950 calld->method_params = grpc_method_config_table_get(
951 exec_ctx, chand->method_params_table, calld->path);
952 if (calld->method_params != NULL) {
953 method_parameters_ref(calld->method_params);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700954 // If the deadline from the service config is shorter than the one
955 // from the client API, reset the deadline timer.
956 if (chand->deadline_checking_enabled &&
Craig Tiller89c14282017-07-19 15:32:27 -0700957 calld->method_params->timeout != 0) {
958 const grpc_millis per_method_deadline =
959 grpc_timespec_to_millis(calld->call_start_time) +
960 calld->method_params->timeout;
961 if (per_method_deadline < calld->deadline) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700962 calld->deadline = per_method_deadline;
963 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
964 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700965 }
966 }
967 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700968}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700969
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700970static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700971 grpc_call_element *elem,
972 grpc_error *error) {
973 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700974 grpc_subchannel_call *subchannel_call = NULL;
975 const grpc_connected_subchannel_call_args call_args = {
976 .pollent = calld->pollent,
977 .path = calld->path,
978 .start_time = calld->call_start_time,
979 .deadline = calld->deadline,
980 .arena = calld->arena,
981 .context = calld->subchannel_call_context};
982 grpc_error *new_error = grpc_connected_subchannel_create_call(
983 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700984 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
985 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
986 elem->channel_data, calld, subchannel_call,
987 grpc_error_string(new_error));
988 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700989 GPR_ASSERT(set_call_or_error(
990 calld, (call_or_error){.subchannel_call = subchannel_call}));
991 if (new_error != GRPC_ERROR_NONE) {
992 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700993 waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700994 } else {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700995 waiting_for_pick_batches_resume_locked(exec_ctx, elem);
Craig Tiller11c17d42017-03-13 13:36:34 -0700996 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700997 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -0700998}
999
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001000static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
1001 grpc_call_element *elem,
Craig Tillerbefafe62017-02-09 11:30:54 -08001002 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001003 call_data *calld = elem->call_data;
1004 channel_data *chand = elem->channel_data;
Yuchen Zeng19656b12016-09-01 18:00:45 -07001005 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
1006 chand->interested_parties);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001007 call_or_error coe = get_call_or_error(calld);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001008 if (calld->connected_subchannel == NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001009 // Failed to create subchannel.
Craig Tillerd3ec4aa2017-05-18 10:22:43 -07001010 grpc_error *failure =
1011 error == GRPC_ERROR_NONE
1012 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1013 "Call dropped by load balancing policy")
1014 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1015 "Failed to create subchannel", &error, 1);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001016 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1017 gpr_log(GPR_DEBUG,
1018 "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
1019 calld, grpc_error_string(failure));
1020 }
Craig Tiller03155722017-05-23 23:51:51 +00001021 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001022 waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001023 } else if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001024 /* already cancelled before subchannel became ready */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001025 grpc_error *child_errors[] = {error, coe.error};
ncteisen4b36a3d2017-03-13 19:08:06 -07001026 grpc_error *cancellation_error =
1027 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001028 "Cancelled before creating subchannel", child_errors,
1029 GPR_ARRAY_SIZE(child_errors));
David Garcia Quintas68a9e382016-12-13 10:50:40 -08001030 /* if due to deadline, attach the deadline exceeded status to the error */
Craig Tiller89c14282017-07-19 15:32:27 -07001031 if (calld->deadline < grpc_exec_ctx_now(exec_ctx)) {
David Garcia Quintas68a9e382016-12-13 10:50:40 -08001032 cancellation_error =
1033 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
1034 GRPC_STATUS_DEADLINE_EXCEEDED);
1035 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001036 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1037 gpr_log(GPR_DEBUG,
1038 "chand=%p calld=%p: cancelled before subchannel became ready: %s",
1039 chand, calld, grpc_error_string(cancellation_error));
1040 }
1041 waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001042 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07001043 /* Create call on subchannel. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001044 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001045 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001046 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001047 GRPC_ERROR_UNREF(error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001048}
1049
1050static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
1051 call_data *calld = elem->call_data;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001052 grpc_subchannel_call *subchannel_call =
1053 get_call_or_error(calld).subchannel_call;
1054 if (subchannel_call == NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001055 return NULL;
1056 } else {
1057 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
1058 }
1059}
1060
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001061/** Return true if subchannel is available immediately (in which case
1062 subchannel_ready_locked() should not be called), or false otherwise (in
1063 which case subchannel_ready_locked() should be called when the subchannel
1064 is available). */
1065static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1066 grpc_call_element *elem);
1067
Craig Tiller577c9b22015-11-02 14:11:15 -08001068typedef struct {
Craig Tiller577c9b22015-11-02 14:11:15 -08001069 grpc_call_element *elem;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001070 bool cancelled;
Craig Tiller577c9b22015-11-02 14:11:15 -08001071 grpc_closure closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001072} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08001073
Mark D. Roth60751fe2017-07-07 12:50:33 -07001074static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
1075 void *arg,
1076 grpc_error *error) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001077 pick_after_resolver_result_args *args = arg;
1078 if (args->cancelled) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001079 /* cancelled, do nothing */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001080 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1081 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
1082 }
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001083 } else {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001084 channel_data *chand = args->elem->channel_data;
1085 call_data *calld = args->elem->call_data;
1086 if (error != GRPC_ERROR_NONE) {
1087 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1088 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
1089 chand, calld);
1090 }
1091 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
1092 } else {
1093 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1094 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
1095 chand, calld);
1096 }
1097 if (pick_subchannel_locked(exec_ctx, args->elem)) {
1098 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
1099 }
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001100 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001101 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001102 gpr_free(args);
Craig Tiller577c9b22015-11-02 14:11:15 -08001103}
1104
Mark D. Roth60751fe2017-07-07 12:50:33 -07001105static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
1106 grpc_call_element *elem) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001107 channel_data *chand = elem->channel_data;
1108 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001109 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1110 gpr_log(GPR_DEBUG,
1111 "chand=%p calld=%p: deferring pick pending resolver result", chand,
1112 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001113 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001114 pick_after_resolver_result_args *args =
1115 (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
1116 args->elem = elem;
1117 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
1118 args, grpc_combiner_scheduler(chand->combiner));
1119 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
1120 &args->closure, GRPC_ERROR_NONE);
1121}
1122
1123static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
1124 grpc_call_element *elem,
1125 grpc_error *error) {
1126 channel_data *chand = elem->channel_data;
1127 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001128 // If we don't yet have a resolver result, then a closure for
Mark D. Roth60751fe2017-07-07 12:50:33 -07001129 // pick_after_resolver_result_done_locked() will have been added to
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001130 // chand->waiting_for_resolver_result_closures, and it may not be invoked
1131 // until after this call has been destroyed. We mark the operation as
Mark D. Roth60751fe2017-07-07 12:50:33 -07001132 // cancelled, so that when pick_after_resolver_result_done_locked()
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001133 // is called, it will be a no-op. We also immediately invoke
1134 // subchannel_ready_locked() to propagate the error back to the caller.
1135 for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
Mark D. Roth64a317c2017-05-02 08:27:08 -07001136 closure != NULL; closure = closure->next_data.next) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001137 pick_after_resolver_result_args *args = closure->cb_arg;
1138 if (!args->cancelled && args->elem == elem) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001139 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1140 gpr_log(GPR_DEBUG,
1141 "chand=%p calld=%p: "
1142 "cancelling pick waiting for resolver result",
1143 chand, calld);
1144 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001145 args->cancelled = true;
1146 subchannel_ready_locked(exec_ctx, elem,
1147 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1148 "Pick cancelled", &error, 1));
Mark D. Roth64a317c2017-05-02 08:27:08 -07001149 }
1150 }
1151 GRPC_ERROR_UNREF(error);
1152}
1153
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001154// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
1155// Unrefs the LB policy after invoking subchannel_ready_locked().
1156static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1157 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001158 grpc_call_element *elem = arg;
1159 channel_data *chand = elem->channel_data;
1160 call_data *calld = elem->call_data;
1161 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1162 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
1163 chand, calld);
1164 }
1165 GPR_ASSERT(calld->lb_policy != NULL);
1166 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1167 calld->lb_policy = NULL;
1168 subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001169}
1170
1171// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
1172// If the pick was completed synchronously, unrefs the LB policy and
1173// returns true.
1174static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
1175 grpc_call_element *elem,
1176 const grpc_lb_policy_pick_args *inputs) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001177 channel_data *chand = elem->channel_data;
1178 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001179 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1180 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
1181 chand, calld, chand->lb_policy);
1182 }
1183 // Keep a ref to the LB policy in calld while the pick is pending.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001184 GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001185 calld->lb_policy = chand->lb_policy;
1186 GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001187 grpc_combiner_scheduler(chand->combiner));
1188 const bool pick_done = grpc_lb_policy_pick_locked(
1189 exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001190 calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001191 if (pick_done) {
1192 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001193 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1194 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
1195 chand, calld);
1196 }
1197 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1198 calld->lb_policy = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001199 }
1200 return pick_done;
1201}
Craig Tiller577c9b22015-11-02 14:11:15 -08001202
Mark D. Roth60751fe2017-07-07 12:50:33 -07001203static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
1204 grpc_call_element *elem,
1205 grpc_error *error) {
1206 channel_data *chand = elem->channel_data;
1207 call_data *calld = elem->call_data;
1208 GPR_ASSERT(calld->lb_policy != NULL);
1209 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1210 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
1211 chand, calld, calld->lb_policy);
1212 }
1213 grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
1214 &calld->connected_subchannel, error);
1215}
1216
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001217static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1218 grpc_call_element *elem) {
1219 GPR_TIMER_BEGIN("pick_subchannel", 0);
1220 channel_data *chand = elem->channel_data;
1221 call_data *calld = elem->call_data;
1222 bool pick_done = false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001223 if (chand->lb_policy != NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001224 apply_service_config_to_call_locked(exec_ctx, elem);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001225 // If the application explicitly set wait_for_ready, use that.
1226 // Otherwise, if the service config specified a value for this
1227 // method, use that.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001228 uint32_t initial_metadata_flags =
1229 calld->initial_metadata_payload->send_initial_metadata
1230 .send_initial_metadata_flags;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001231 const bool wait_for_ready_set_from_api =
1232 initial_metadata_flags &
1233 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1234 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001235 calld->method_params != NULL &&
1236 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001237 if (!wait_for_ready_set_from_api &&
1238 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001239 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001240 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1241 } else {
1242 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1243 }
1244 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001245 const grpc_lb_policy_pick_args inputs = {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001246 calld->initial_metadata_payload->send_initial_metadata
1247 .send_initial_metadata,
1248 initial_metadata_flags, &calld->lb_token_mdelem};
1249 pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
1250 } else if (chand->resolver != NULL) {
1251 if (!chand->started_resolving) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001252 start_resolving_locked(exec_ctx, chand);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001253 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001254 pick_after_resolver_result_start_locked(exec_ctx, elem);
Craig Tiller0eab6972016-04-23 12:59:57 -07001255 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001256 subchannel_ready_locked(
1257 exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001258 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001259 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001260 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001261}
1262
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001263static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1264 void *arg,
1265 grpc_error *error_ignored) {
1266 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001267 grpc_transport_stream_op_batch *batch = arg;
1268 grpc_call_element *elem = batch->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001269 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001270 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001271 /* need to recheck that another thread hasn't set the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001272 call_or_error coe = get_call_or_error(calld);
1273 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001274 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1275 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1276 chand, calld, grpc_error_string(coe.error));
1277 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001278 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001279 exec_ctx, batch, GRPC_ERROR_REF(coe.error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001280 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001281 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001282 if (coe.subchannel_call != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001283 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1284 gpr_log(GPR_DEBUG,
1285 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
1286 calld, coe.subchannel_call);
1287 }
1288 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001289 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001290 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001291 // Add to waiting-for-pick list. If we succeed in getting a
1292 // subchannel call below, we'll handle this batch (along with any
1293 // other waiting batches) in waiting_for_pick_batches_resume_locked().
Mark D. Roth60751fe2017-07-07 12:50:33 -07001294 waiting_for_pick_batches_add_locked(calld, batch);
1295 // If this is a cancellation, cancel the pending pick (if any) and
1296 // fail any pending batches.
1297 if (batch->cancel_stream) {
1298 grpc_error *error = batch->payload->cancel_stream.cancel_error;
1299 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1300 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
1301 calld, grpc_error_string(error));
1302 }
Craig Tiller03155722017-05-23 23:51:51 +00001303 /* Stash a copy of cancel_error in our call data, so that we can use
1304 it for subsequent operations. This ensures that if the call is
Mark D. Roth60751fe2017-07-07 12:50:33 -07001305 cancelled before any batches are passed down (e.g., if the deadline
Craig Tiller03155722017-05-23 23:51:51 +00001306 is in the past when the call starts), we can return the right
Mark D. Roth60751fe2017-07-07 12:50:33 -07001307 error to the caller when the first batch does get passed down. */
Craig Tiller03155722017-05-23 23:51:51 +00001308 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001309 if (calld->lb_policy != NULL) {
1310 pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
1311 } else {
1312 pick_after_resolver_result_cancel_locked(exec_ctx, elem,
1313 GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001314 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001315 waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001316 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001317 }
1318 /* if we don't have a subchannel, try to get one */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001319 if (batch->send_initial_metadata) {
1320 GPR_ASSERT(calld->connected_subchannel == NULL);
1321 calld->initial_metadata_payload = batch->payload;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001322 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001323 /* If a subchannel is not available immediately, the polling entity from
1324 call_data should be provided to channel_data's interested_parties, so
1325 that IO of the lb_policy and resolver could be done under it. */
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001326 if (pick_subchannel_locked(exec_ctx, elem)) {
1327 // Pick was returned synchronously.
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001328 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Rothd7389b42017-05-17 12:22:17 -07001329 if (calld->connected_subchannel == NULL) {
Mark D. Rothd7389b42017-05-17 12:22:17 -07001330 grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1331 "Call dropped by load balancing policy");
Craig Tiller26e69f62017-05-24 15:09:23 -07001332 set_call_or_error(calld,
1333 (call_or_error){.error = GRPC_ERROR_REF(error)});
Mark D. Roth60751fe2017-07-07 12:50:33 -07001334 waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001335 } else {
1336 // Create subchannel call.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001337 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -07001338 }
Yuchen Zeng19656b12016-09-01 18:00:45 -07001339 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001340 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1341 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001342 }
1343 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001344done:
1345 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
1346 "start_transport_stream_op_batch");
1347 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001348}
1349
Mark D. Rothde144102017-03-15 10:11:03 -07001350static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001351 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001352 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001353 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001354 if (error == GRPC_ERROR_NONE) {
1355 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001356 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001357 } else {
1358 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001359 // decide whether or not to retry. Note that we should only
1360 // record failures whose statuses match the configured retryable
1361 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001362 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001363 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001364 }
1365 }
ncteisen274bbbe2017-06-08 14:57:11 -07001366 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
Mark D. Roth95039b52017-02-24 07:59:45 -08001367 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001368}
1369
Craig Tillerbe9691a2017-02-14 10:00:42 -08001370/* The logic here is fairly complicated, due to (a) the fact that we
1371 need to handle the case where we receive the send op before the
1372 initial metadata op, and (b) the need for efficiency, especially in
1373 the streaming case.
1374
1375 We use double-checked locking to initially see if initialization has been
1376 performed. If it has not, we acquire the combiner and perform initialization.
1377 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001378static void cc_start_transport_stream_op_batch(
1379 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001380 grpc_transport_stream_op_batch *batch) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001381 call_data *calld = elem->call_data;
1382 channel_data *chand = elem->channel_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001383 if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
1384 GRPC_TRACER_ON(grpc_trace_channel)) {
1385 grpc_call_log_op(GPR_INFO, elem, batch);
1386 }
Craig Tiller3be7dd02017-04-03 14:30:03 -07001387 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001388 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001389 batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001390 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001391 // Intercept on_complete for recv_trailing_metadata so that we can
1392 // check retry throttle status.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001393 if (batch->recv_trailing_metadata) {
1394 GPR_ASSERT(batch->on_complete != NULL);
1395 calld->original_on_complete = batch->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001396 GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
1397 grpc_schedule_on_exec_ctx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001398 batch->on_complete = &calld->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001399 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001400 /* try to (atomically) get the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001401 call_or_error coe = get_call_or_error(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001402 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001403 if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001404 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1405 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1406 chand, calld, grpc_error_string(coe.error));
1407 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001408 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001409 exec_ctx, batch, GRPC_ERROR_REF(coe.error));
1410 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001411 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001412 if (coe.subchannel_call != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001413 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1414 gpr_log(GPR_DEBUG,
1415 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
1416 calld, coe.subchannel_call);
1417 }
1418 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
1419 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001420 }
1421 /* we failed; lock and figure out what to do */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001422 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1423 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
1424 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001425 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001426 batch->handler_private.extra_arg = elem;
ncteisen274bbbe2017-06-08 14:57:11 -07001427 GRPC_CLOSURE_SCHED(
Mark D. Roth60751fe2017-07-07 12:50:33 -07001428 exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1429 start_transport_stream_op_batch_locked, batch,
Craig Tilleree4b1452017-05-12 10:56:03 -07001430 grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -08001431 GRPC_ERROR_NONE);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001432done:
Craig Tillera0f3abd2017-03-31 15:42:16 -07001433 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001434}
1435
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001436/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001437static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1438 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001439 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001440 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001441 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001442 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001443 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001444 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07001445 calld->deadline = args->deadline;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001446 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001447 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001448 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001449 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001450 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001451 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001452}
1453
1454/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001455static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1456 grpc_call_element *elem,
1457 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001458 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001459 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001460 channel_data *chand = elem->channel_data;
1461 if (chand->deadline_checking_enabled) {
1462 grpc_deadline_state_destroy(exec_ctx, elem);
1463 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001464 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001465 if (calld->method_params != NULL) {
1466 method_parameters_unref(calld->method_params);
1467 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001468 call_or_error coe = get_call_or_error(calld);
1469 GRPC_ERROR_UNREF(coe.error);
1470 if (coe.subchannel_call != NULL) {
1471 grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
1472 then_schedule_closure);
Craig Tillerd426cac2017-03-13 12:30:45 -07001473 then_schedule_closure = NULL;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001474 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
1475 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001476 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001477 GPR_ASSERT(calld->lb_policy == NULL);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001478 GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001479 if (calld->connected_subchannel != NULL) {
1480 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1481 "picked");
1482 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001483 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1484 if (calld->subchannel_call_context[i].value != NULL) {
1485 calld->subchannel_call_context[i].destroy(
1486 calld->subchannel_call_context[i].value);
1487 }
1488 }
ncteisen274bbbe2017-06-08 14:57:11 -07001489 GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001490}
1491
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001492static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1493 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001494 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001495 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001496 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001497}
1498
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001499/*************************************************************************
1500 * EXPORTED SYMBOLS
1501 */
1502
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001503const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001504 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001505 cc_start_transport_op,
1506 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001507 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001508 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001509 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001510 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001511 cc_init_channel_elem,
1512 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001513 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001514 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001515 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001516};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001517
Craig Tiller613dafa2017-02-09 12:00:43 -08001518static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1519 grpc_error *error_ignored) {
1520 channel_data *chand = arg;
1521 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001522 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001523 } else {
1524 chand->exit_idle_when_lb_policy_arrives = true;
1525 if (!chand->started_resolving && chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001526 start_resolving_locked(exec_ctx, chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08001527 }
1528 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001529 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001530}
1531
Craig Tillera82950e2015-09-22 12:33:20 -07001532grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1533 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001534 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001535 grpc_connectivity_state out =
1536 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001537 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001538 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07001539 GRPC_CLOSURE_SCHED(
1540 exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -07001541 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001542 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001543 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001544 return out;
1545}
1546
Alexander Polcync3b1f182017-04-18 13:51:36 -07001547typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001548 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001549 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001550 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001551 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001552 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001553 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001554 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001555} external_connectivity_watcher;
1556
Alexander Polcync3b1f182017-04-18 13:51:36 -07001557static external_connectivity_watcher *lookup_external_connectivity_watcher(
1558 channel_data *chand, grpc_closure *on_complete) {
1559 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1560 external_connectivity_watcher *w =
1561 chand->external_connectivity_watcher_list_head;
1562 while (w != NULL && w->on_complete != on_complete) {
1563 w = w->next;
1564 }
1565 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1566 return w;
1567}
1568
1569static void external_connectivity_watcher_list_append(
1570 channel_data *chand, external_connectivity_watcher *w) {
1571 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1572
1573 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1574 GPR_ASSERT(!w->next);
1575 w->next = chand->external_connectivity_watcher_list_head;
1576 chand->external_connectivity_watcher_list_head = w;
1577 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1578}
1579
1580static void external_connectivity_watcher_list_remove(
1581 channel_data *chand, external_connectivity_watcher *too_remove) {
1582 GPR_ASSERT(
1583 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1584 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1585 if (too_remove == chand->external_connectivity_watcher_list_head) {
1586 chand->external_connectivity_watcher_list_head = too_remove->next;
1587 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1588 return;
1589 }
1590 external_connectivity_watcher *w =
1591 chand->external_connectivity_watcher_list_head;
1592 while (w != NULL) {
1593 if (w->next == too_remove) {
1594 w->next = w->next->next;
1595 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1596 return;
1597 }
1598 w = w->next;
1599 }
1600 GPR_UNREACHABLE_CODE(return );
1601}
1602
1603int grpc_client_channel_num_external_connectivity_watchers(
1604 grpc_channel_element *elem) {
1605 channel_data *chand = elem->channel_data;
1606 int count = 0;
1607
1608 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1609 external_connectivity_watcher *w =
1610 chand->external_connectivity_watcher_list_head;
1611 while (w != NULL) {
1612 count++;
1613 w = w->next;
1614 }
1615 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1616
1617 return count;
1618}
1619
Craig Tiller1d881fb2015-12-01 07:39:04 -08001620static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001621 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001622 external_connectivity_watcher *w = arg;
1623 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001624 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1625 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001626 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1627 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001628 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001629 gpr_free(w);
ncteisen274bbbe2017-06-08 14:57:11 -07001630 GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08001631}
1632
Craig Tillera8610c02017-02-14 10:05:11 -08001633static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1634 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001635 external_connectivity_watcher *w = arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001636 external_connectivity_watcher *found = NULL;
1637 if (w->state != NULL) {
1638 external_connectivity_watcher_list_append(w->chand, w);
ncteisen274bbbe2017-06-08 14:57:11 -07001639 GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1640 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
Alexander Polcync3b1f182017-04-18 13:51:36 -07001641 grpc_schedule_on_exec_ctx);
1642 grpc_connectivity_state_notify_on_state_change(
1643 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1644 } else {
1645 GPR_ASSERT(w->watcher_timer_init == NULL);
1646 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1647 if (found) {
1648 GPR_ASSERT(found->on_complete == w->on_complete);
1649 grpc_connectivity_state_notify_on_state_change(
1650 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1651 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001652 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1653 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001654 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1655 "external_connectivity_watcher");
1656 gpr_free(w);
1657 }
Craig Tiller86c99582015-11-25 15:22:26 -08001658}
1659
Craig Tillera82950e2015-09-22 12:33:20 -07001660void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001661 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1662 grpc_polling_entity pollent, grpc_connectivity_state *state,
1663 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001664 channel_data *chand = elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001665 external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001666 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001667 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001668 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001669 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001670 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001671 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1672 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001673 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1674 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07001675 GRPC_CLOSURE_SCHED(
Craig Tiller613dafa2017-02-09 12:00:43 -08001676 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -07001677 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07001678 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001679 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001680}