blob: 8b0d2ee61f61ee333b5d101269c22f7dbc9cdcf8 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Craig Tiller9eb0fde2017-03-31 16:59:30 -070019#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080020
Mark D. Roth4c0fe492016-08-31 13:51:55 -070021#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070023#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080024
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080025#include <grpc/support/alloc.h>
26#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070027#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028#include <grpc/support/sync.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9eb0fde2017-03-31 16:59:30 -070031#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
32#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
34#include "src/core/ext/filters/client_channel/resolver_registry.h"
35#include "src/core/ext/filters/client_channel/retry_throttle.h"
36#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070037#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/channel/channel_args.h"
39#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080040#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070042#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070043#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080044#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070045#include "src/core/lib/support/string.h"
46#include "src/core/lib/surface/channel.h"
47#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070048#include "src/core/lib/transport/metadata.h"
49#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070050#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070051#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070052
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053/* Client channel implementation */
54
ncteisen7712c7c2017-07-12 23:11:27 -070055grpc_tracer_flag grpc_client_channel_trace =
56 GRPC_TRACER_INITIALIZER(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070057
Mark D. Roth26b7be42016-10-24 10:08:07 -070058/*************************************************************************
59 * METHOD-CONFIG TABLE
60 */
61
Mark D. Roth9d480942016-10-19 14:18:05 -070062typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080063 /* zero so it can be default initialized */
64 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070065 WAIT_FOR_READY_FALSE,
66 WAIT_FOR_READY_TRUE
67} wait_for_ready_value;
68
Mark D. Roth95b627b2017-02-24 11:02:58 -080069typedef struct {
70 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070071 gpr_timespec timeout;
72 wait_for_ready_value wait_for_ready;
73} method_parameters;
74
Mark D. Roth722de8d2017-02-27 10:50:44 -080075static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080076 method_parameters *method_params) {
77 gpr_ref(&method_params->refs);
78 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070079}
80
Mark D. Roth95b627b2017-02-24 11:02:58 -080081static void method_parameters_unref(method_parameters *method_params) {
82 if (gpr_unref(&method_params->refs)) {
83 gpr_free(method_params);
84 }
85}
86
Mark D. Roth95b627b2017-02-24 11:02:58 -080087static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
88 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -080089}
90
Mark D. Roth95b627b2017-02-24 11:02:58 -080091static bool parse_wait_for_ready(grpc_json *field,
92 wait_for_ready_value *wait_for_ready) {
93 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
94 return false;
95 }
96 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
97 : WAIT_FOR_READY_FALSE;
98 return true;
99}
100
Mark D. Roth722de8d2017-02-27 10:50:44 -0800101static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800102 if (field->type != GRPC_JSON_STRING) return false;
103 size_t len = strlen(field->value);
104 if (field->value[len - 1] != 's') return false;
105 char *buf = gpr_strdup(field->value);
106 buf[len - 1] = '\0'; // Remove trailing 's'.
107 char *decimal_point = strchr(buf, '.');
108 if (decimal_point != NULL) {
109 *decimal_point = '\0';
110 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
111 if (timeout->tv_nsec == -1) {
112 gpr_free(buf);
113 return false;
114 }
115 // There should always be exactly 3, 6, or 9 fractional digits.
116 int multiplier = 1;
117 switch (strlen(decimal_point + 1)) {
118 case 9:
119 break;
120 case 6:
121 multiplier *= 1000;
122 break;
123 case 3:
124 multiplier *= 1000000;
125 break;
126 default: // Unsupported number of digits.
127 gpr_free(buf);
128 return false;
129 }
130 timeout->tv_nsec *= multiplier;
131 }
132 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
133 gpr_free(buf);
134 if (timeout->tv_sec == -1) return false;
135 return true;
136}
137
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700138static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700139 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700140 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
141 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700142 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800143 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700144 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800145 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700146 } else if (strcmp(field->key, "timeout") == 0) {
147 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800148 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700149 }
150 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700151 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800152 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700153 value->timeout = timeout;
154 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700155 return value;
156}
157
Alexander Polcync3b1f182017-04-18 13:51:36 -0700158struct external_connectivity_watcher;
159
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700160/*************************************************************************
161 * CHANNEL-WIDE FUNCTIONS
162 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800163
Craig Tiller800dacb2015-10-06 09:10:26 -0700164typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700165 /** resolver for this channel */
166 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700167 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700168 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700169 /** is deadline checking enabled? */
170 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700171 /** client channel factory */
172 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700173
Craig Tillerbefafe62017-02-09 11:30:54 -0800174 /** combiner protecting all variables below in this data structure */
175 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700176 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700177 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800178 /** retry throttle data */
179 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700180 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800181 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700182 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700183 grpc_channel_args *resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700184 /** a list of closures that are all waiting for resolver result to come in */
185 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700186 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700187 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700188 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700189 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700190 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700191 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800192 /** owning stack */
193 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800194 /** interested parties (owned) */
195 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800196
Alexander Polcync3b1f182017-04-18 13:51:36 -0700197 /* external_connectivity_watcher_list head is guarded by its own mutex, since
198 * counts need to be grabbed immediately without polling on a cq */
199 gpr_mu external_connectivity_watcher_list_mu;
200 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
201
Craig Tiller613dafa2017-02-09 12:00:43 -0800202 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800203 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800204 gpr_mu info_mu;
205 char *info_lb_policy_name;
206 /** service config in JSON form */
207 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800208} channel_data;
209
Craig Tillerd6c98df2015-08-18 09:33:44 -0700210/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700211 resolver, to watch for state changes from the lb_policy. When a state
212 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700213typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700214 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700215 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700216 grpc_connectivity_state state;
217 grpc_lb_policy *lb_policy;
218} lb_policy_connectivity_watcher;
219
Craig Tiller2400bf52017-02-09 16:25:19 -0800220static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
221 grpc_lb_policy *lb_policy,
222 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700223
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800224static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
225 channel_data *chand,
226 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700227 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800228 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700229 /* TODO: Improve failure handling:
230 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
231 * - Hand over pending picks from old policies during the switch that happens
232 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700233 if (chand->lb_policy != NULL) {
234 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
235 /* cancel picks with wait_for_ready=false */
236 grpc_lb_policy_cancel_picks_locked(
237 exec_ctx, chand->lb_policy,
238 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
239 /* check= */ 0, GRPC_ERROR_REF(error));
240 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
241 /* cancel all picks */
242 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
243 /* mask= */ 0, /* check= */ 0,
244 GRPC_ERROR_REF(error));
245 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800246 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700247 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
248 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
249 grpc_connectivity_state_name(state));
250 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700251 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
252 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800253}
254
Craig Tiller804ff712016-05-05 16:25:40 -0700255static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800256 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700257 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700258 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800259 /* check if the notification is for the latest policy */
260 if (w->lb_policy == w->chand->lb_policy) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700261 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
262 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
263 w->lb_policy, grpc_connectivity_state_name(w->state));
264 }
Craig Tillerc5de8352017-02-09 14:08:05 -0800265 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
266 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800267 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800268 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
269 w->chand->lb_policy = NULL;
270 }
271 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
272 GRPC_ERROR_REF(error), "lb_changed");
273 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800274 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800275 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800276 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800277 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700278 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700279}
280
Craig Tiller2400bf52017-02-09 16:25:19 -0800281static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
282 grpc_lb_policy *lb_policy,
283 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700284 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800285 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700286 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700287 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700288 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700289 w->state = current_state;
290 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800291 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
292 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700293}
294
Mark D. Roth60751fe2017-07-07 12:50:33 -0700295static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
296 channel_data *chand) {
297 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
298 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
299 }
300 GPR_ASSERT(!chand->started_resolving);
301 chand->started_resolving = true;
302 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
303 grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
304 &chand->on_resolver_result_changed);
305}
306
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800307typedef struct {
308 char *server_name;
309 grpc_server_retry_throttle_data *retry_throttle_data;
310} service_config_parsing_state;
311
312static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
313 service_config_parsing_state *parsing_state = arg;
314 if (strcmp(field->key, "retryThrottling") == 0) {
315 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
316 if (field->type != GRPC_JSON_OBJECT) return;
317 int max_milli_tokens = 0;
318 int milli_token_ratio = 0;
319 for (grpc_json *sub_field = field->child; sub_field != NULL;
320 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800321 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800322 if (strcmp(sub_field->key, "maxTokens") == 0) {
323 if (max_milli_tokens != 0) return; // Duplicate.
324 if (sub_field->type != GRPC_JSON_NUMBER) return;
325 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
326 if (max_milli_tokens == -1) return;
327 max_milli_tokens *= 1000;
328 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
329 if (milli_token_ratio != 0) return; // Duplicate.
330 if (sub_field->type != GRPC_JSON_NUMBER) return;
331 // We support up to 3 decimal digits.
332 size_t whole_len = strlen(sub_field->value);
333 uint32_t multiplier = 1;
334 uint32_t decimal_value = 0;
335 const char *decimal_point = strchr(sub_field->value, '.');
336 if (decimal_point != NULL) {
337 whole_len = (size_t)(decimal_point - sub_field->value);
338 multiplier = 1000;
339 size_t decimal_len = strlen(decimal_point + 1);
340 if (decimal_len > 3) decimal_len = 3;
341 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
342 &decimal_value)) {
343 return;
344 }
345 uint32_t decimal_multiplier = 1;
346 for (size_t i = 0; i < (3 - decimal_len); ++i) {
347 decimal_multiplier *= 10;
348 }
349 decimal_value *= decimal_multiplier;
350 }
351 uint32_t whole_value;
352 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
353 &whole_value)) {
354 return;
355 }
356 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800357 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800358 }
359 }
360 parsing_state->retry_throttle_data =
361 grpc_retry_throttle_map_get_data_for_server(
362 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
363 }
364}
365
Craig Tillerbefafe62017-02-09 11:30:54 -0800366static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
367 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700368 channel_data *chand = arg;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700369 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
370 gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
371 grpc_error_string(error));
372 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700373 // Extract the following fields from the resolver result, if non-NULL.
Mark D. Roth15494b52017-07-12 15:26:55 -0700374 bool lb_policy_updated = false;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700375 char *lb_policy_name = NULL;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700376 bool lb_policy_name_changed = false;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700377 grpc_lb_policy *new_lb_policy = NULL;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800378 char *service_config_json = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700379 grpc_server_retry_throttle_data *retry_throttle_data = NULL;
380 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth046cf762016-09-26 11:13:51 -0700381 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700382 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700383 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700384 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700385 if (channel_arg != NULL) {
386 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
387 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700388 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700389 // Special case: If at least one balancer address is present, we use
390 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700391 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700392 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700393 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700394 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700395 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700396 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700397 if (addresses->addresses[i].is_balancer) {
398 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700399 break;
400 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700401 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700402 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700403 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
404 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700405 "resolver requested LB policy %s but provided at least one "
406 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700407 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700408 }
409 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700410 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700411 }
412 // Use pick_first if nothing was specified and we didn't select grpclb
413 // above.
414 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700415 grpc_lb_policy_args lb_policy_args;
416 lb_policy_args.args = chand->resolver_result;
417 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800418 lb_policy_args.combiner = chand->combiner;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700419 // Check to see if we're already using the right LB policy.
420 // Note: It's safe to use chand->info_lb_policy_name here without
421 // taking a lock on chand->info_mu, because this function is the
422 // only thing that modifies its value, and it can only be invoked
423 // once at any given time.
Mark D. Roth60751fe2017-07-07 12:50:33 -0700424 lb_policy_name_changed =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700425 chand->info_lb_policy_name == NULL ||
426 strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700427 if (chand->lb_policy != NULL && !lb_policy_name_changed) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700428 // Continue using the same LB policy. Update with new addresses.
Mark D. Roth15494b52017-07-12 15:26:55 -0700429 lb_policy_updated = true;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700430 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
431 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700432 // Instantiate new LB policy.
433 new_lb_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700434 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700435 if (new_lb_policy == NULL) {
436 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700437 }
Craig Tiller45724b32015-09-22 10:42:19 -0700438 }
Mark D. Roth41124992016-11-03 11:22:20 -0700439 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700440 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700441 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700442 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700443 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800444 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700445 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800446 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700447 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800448 channel_arg =
449 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
450 GPR_ASSERT(channel_arg != NULL);
451 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700452 grpc_uri *uri =
453 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800454 GPR_ASSERT(uri->path[0] != '\0');
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700455 service_config_parsing_state parsing_state;
456 memset(&parsing_state, 0, sizeof(parsing_state));
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800457 parsing_state.server_name =
458 uri->path[0] == '/' ? uri->path + 1 : uri->path;
459 grpc_service_config_parse_global_params(
460 service_config, parse_retry_throttle_params, &parsing_state);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800461 grpc_uri_destroy(uri);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700462 retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700463 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800464 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700465 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700466 grpc_service_config_destroy(service_config);
467 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700468 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700469 // Before we clean up, save a copy of lb_policy_name, since it might
470 // be pointing to data inside chand->resolver_result.
471 // The copy will be saved in chand->lb_policy_name below.
472 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800473 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700474 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700475 }
Mark D. Roth60751fe2017-07-07 12:50:33 -0700476 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
477 gpr_log(GPR_DEBUG,
478 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
479 "service_config=\"%s\"",
480 chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
481 service_config_json);
482 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700483 // Now swap out fields in chand. Note that the new values may still
484 // be NULL if (e.g.) the resolver failed to return results or the
485 // results did not contain the necessary data.
486 //
487 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800488 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700489 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800490 gpr_free(chand->info_lb_policy_name);
491 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700492 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800493 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800494 gpr_free(chand->info_service_config_json);
495 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800496 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800497 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700498 // Swap out the retry throttle data.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800499 if (chand->retry_throttle_data != NULL) {
500 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
501 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700502 chand->retry_throttle_data = retry_throttle_data;
503 // Swap out the method params table.
Mark D. Roth9d480942016-10-19 14:18:05 -0700504 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800505 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700506 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700507 chand->method_params_table = method_params_table;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700508 // If we have a new LB policy or are shutting down (in which case
509 // new_lb_policy will be NULL), swap out the LB policy, unreffing the
510 // old one and removing its fds from chand->interested_parties.
511 // Note that we do NOT do this if either (a) we updated the existing
512 // LB policy above or (b) we failed to create the new LB policy (in
513 // which case we want to continue using the most recent one we had).
514 if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
515 chand->resolver == NULL) {
516 if (chand->lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700517 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
518 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
519 chand->lb_policy);
520 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700521 grpc_pollset_set_del_pollset_set(exec_ctx,
522 chand->lb_policy->interested_parties,
523 chand->interested_parties);
524 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700525 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700526 chand->lb_policy = new_lb_policy;
527 }
528 // Now that we've swapped out the relevant fields of chand, check for
529 // error or shutdown.
530 if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700531 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
532 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
533 }
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800534 if (chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700535 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
536 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
537 }
Craig Tiller972470b2017-02-09 15:05:36 -0800538 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800539 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
540 chand->resolver = NULL;
541 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800542 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700543 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700544 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700545 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700546 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700547 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
548 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
549 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
550 "Channel disconnected", &error, 1));
551 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
552 &chand->waiting_for_resolver_result_closures);
553 } else { // Not shutting down.
554 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
555 grpc_error *state_error =
556 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
557 if (new_lb_policy != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700558 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
559 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
560 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700561 GRPC_ERROR_UNREF(state_error);
562 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
563 &state_error);
564 grpc_pollset_set_add_pollset_set(exec_ctx,
565 new_lb_policy->interested_parties,
566 chand->interested_parties);
567 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
568 &chand->waiting_for_resolver_result_closures);
569 if (chand->exit_idle_when_lb_policy_arrives) {
570 grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
571 chand->exit_idle_when_lb_policy_arrives = false;
572 }
573 watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
574 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700575 if (!lb_policy_updated) {
576 set_channel_connectivity_state_locked(exec_ctx, chand, state,
577 GRPC_ERROR_REF(state_error),
578 "new_lb+resolver");
579 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700580 grpc_resolver_next_locked(exec_ctx, chand->resolver,
581 &chand->resolver_result,
582 &chand->on_resolver_result_changed);
583 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700584 }
Craig Tiller3f475422015-06-25 10:43:05 -0700585}
586
Craig Tillera8610c02017-02-14 10:05:11 -0800587static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
588 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800589 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800590 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700591 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700592
Craig Tillera82950e2015-09-22 12:33:20 -0700593 if (op->on_connectivity_state_change != NULL) {
594 grpc_connectivity_state_notify_on_state_change(
595 exec_ctx, &chand->state_tracker, op->connectivity_state,
596 op->on_connectivity_state_change);
597 op->on_connectivity_state_change = NULL;
598 op->connectivity_state = NULL;
599 }
600
Craig Tiller26dab312015-12-07 14:43:47 -0800601 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800602 if (chand->lb_policy == NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700603 GRPC_CLOSURE_SCHED(
ncteisen4b36a3d2017-03-13 19:08:06 -0700604 exec_ctx, op->send_ping,
605 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800606 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800607 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800608 op->bind_pollset = NULL;
609 }
610 op->send_ping = NULL;
611 }
612
Craig Tiller1c51edc2016-05-07 16:18:43 -0700613 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
614 if (chand->resolver != NULL) {
615 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700616 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700617 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800618 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700619 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
620 chand->resolver = NULL;
621 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700622 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700623 GRPC_ERROR_REF(op->disconnect_with_error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700624 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
625 &chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700626 }
627 if (chand->lb_policy != NULL) {
628 grpc_pollset_set_del_pollset_set(exec_ctx,
629 chand->lb_policy->interested_parties,
630 chand->interested_parties);
631 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
632 chand->lb_policy = NULL;
633 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700634 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700635 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700636 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800637 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
638
ncteisen274bbbe2017-06-08 14:57:11 -0700639 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800640}
641
642static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
643 grpc_channel_element *elem,
644 grpc_transport_op *op) {
645 channel_data *chand = elem->channel_data;
646
Craig Tillerbefafe62017-02-09 11:30:54 -0800647 GPR_ASSERT(op->set_accept_stream == false);
648 if (op->bind_pollset != NULL) {
649 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
650 op->bind_pollset);
651 }
652
Craig Tillerc55c1022017-03-10 10:26:42 -0800653 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800654 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700655 GRPC_CLOSURE_SCHED(
Craig Tillerc55c1022017-03-10 10:26:42 -0800656 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -0700657 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700658 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800659 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700660}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661
Mark D. Rothb2d24882016-10-27 15:44:07 -0700662static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
663 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700664 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700665 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800666 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700667 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800668 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700669 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800670 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700671 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800672 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800673 *info->service_config_json =
674 chand->info_service_config_json == NULL
675 ? NULL
676 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800677 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800678 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700679}
680
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700681/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800682static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800683 grpc_channel_element *elem,
684 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700685 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700686 GPR_ASSERT(args->is_last);
687 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800688 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700689 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800690 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700691 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
692
693 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
694 chand->external_connectivity_watcher_list_head = NULL;
695 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
696
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800697 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700698 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800699 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700700 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800701 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700702 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
703 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800704 // Record client channel factory.
705 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
706 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700707 if (arg == NULL) {
708 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
709 "Missing client channel factory in args for client channel filter");
710 }
711 if (arg->type != GRPC_ARG_POINTER) {
712 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
713 "client channel factory arg must be a pointer");
714 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800715 grpc_client_channel_factory_ref(arg->value.pointer.p);
716 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800717 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800718 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700719 if (arg == NULL) {
720 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721 "Missing server uri in args for client channel filter");
722 }
723 if (arg->type != GRPC_ARG_STRING) {
724 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
725 "server uri arg must be a string");
726 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800727 char *proxy_name = NULL;
728 grpc_channel_args *new_args = NULL;
729 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
730 &proxy_name, &new_args);
731 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800732 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800733 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
734 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800735 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800736 if (proxy_name != NULL) gpr_free(proxy_name);
737 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800738 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700739 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800740 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700741 chand->deadline_checking_enabled =
742 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800743 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700744}
745
Craig Tiller972470b2017-02-09 15:05:36 -0800746static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
747 grpc_error *error) {
748 grpc_resolver *resolver = arg;
749 grpc_resolver_shutdown_locked(exec_ctx, resolver);
750 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
751}
752
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700753/* Destructor for channel_data */
754static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
755 grpc_channel_element *elem) {
756 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700757 if (chand->resolver != NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700758 GRPC_CLOSURE_SCHED(
759 exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
Craig Tilleree4b1452017-05-12 10:56:03 -0700760 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800761 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700762 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700763 if (chand->client_channel_factory != NULL) {
764 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
765 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700766 if (chand->lb_policy != NULL) {
767 grpc_pollset_set_del_pollset_set(exec_ctx,
768 chand->lb_policy->interested_parties,
769 chand->interested_parties);
770 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
771 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800772 gpr_free(chand->info_lb_policy_name);
773 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800774 if (chand->retry_throttle_data != NULL) {
775 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
776 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700777 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800778 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700779 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700780 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800781 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800782 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800783 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700784 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700785}
786
787/*************************************************************************
788 * PER-CALL FUNCTIONS
789 */
790
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700791// Max number of batches that can be pending on a call at any given
792// time. This includes:
793// recv_initial_metadata
794// send_initial_metadata
795// recv_message
796// send_message
797// recv_trailing_metadata
798// send_trailing_metadata
Mark D. Roth764cf042017-09-01 09:00:06 -0700799// We also add room for a single cancel_stream batch.
800#define MAX_WAITING_BATCHES 7
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700801
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700802/** Call data. Holds a pointer to grpc_subchannel_call and the
803 associated machinery to create such a pointer.
804 Handles queueing of stream ops until a call object is ready, waiting
805 for initial metadata before trying to create a call object,
806 and handling cancellation gracefully. */
807typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700808 // State for handling deadlines.
809 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700810 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700811 // and this struct both independently store pointers to the call stack
812 // and call combiner. If/when we have time, find a way to avoid this
813 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700814 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700815
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800816 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700817 gpr_timespec call_start_time;
818 gpr_timespec deadline;
Mark D. Roth764cf042017-09-01 09:00:06 -0700819 gpr_arena *arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700820 grpc_call_stack *owning_call;
Mark D. Roth764cf042017-09-01 09:00:06 -0700821 grpc_call_combiner *call_combiner;
822
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700823 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800824 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700825
Mark D. Roth764cf042017-09-01 09:00:06 -0700826 grpc_subchannel_call *subchannel_call;
827 grpc_error *error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700828
Mark D. Roth60751fe2017-07-07 12:50:33 -0700829 grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
830 grpc_closure lb_pick_closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700831 grpc_closure lb_pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700832
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700833 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700834 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700835 grpc_polling_entity *pollent;
836
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700837 grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
838 size_t waiting_for_pick_batches_count;
Mark D. Roth764cf042017-09-01 09:00:06 -0700839 grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700840
Mark D. Roth764cf042017-09-01 09:00:06 -0700841 grpc_transport_stream_op_batch *initial_metadata_batch;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200842
843 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800844
845 grpc_closure on_complete;
846 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700847} call_data;
848
Mark D. Rothbf199612017-08-29 16:59:07 -0700849grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
Mark D. Roth764cf042017-09-01 09:00:06 -0700850 grpc_call_element *elem) {
851 call_data *calld = elem->call_data;
852 return calld->subchannel_call;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700853}
854
Mark D. Roth764cf042017-09-01 09:00:06 -0700855// This is called via the call combiner, so access to calld is synchronized.
856static void waiting_for_pick_batches_add(
Mark D. Rothbf199612017-08-29 16:59:07 -0700857 call_data *calld, grpc_transport_stream_op_batch *batch) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700858 if (batch->send_initial_metadata) {
859 GPR_ASSERT(calld->initial_metadata_batch == NULL);
860 calld->initial_metadata_batch = batch;
861 } else {
862 GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
863 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
864 batch;
865 }
Mark D. Rothbf199612017-08-29 16:59:07 -0700866}
867
Mark D. Roth764cf042017-09-01 09:00:06 -0700868// This is called via the call combiner, so access to calld is synchronized.
869static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
870 void *arg, grpc_error *error) {
871 call_data *calld = arg;
872 if (calld->waiting_for_pick_batches_count > 0) {
873 --calld->waiting_for_pick_batches_count;
874 grpc_transport_stream_op_batch_finish_with_failure(
875 exec_ctx,
876 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count],
877 GRPC_ERROR_REF(error), calld->call_combiner);
878 }
879}
880
881// This is called via the call combiner, so access to calld is synchronized.
882static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
883 grpc_call_element *elem,
884 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700885 call_data *calld = elem->call_data;
886 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
887 gpr_log(GPR_DEBUG,
888 "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
889 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
890 grpc_error_string(error));
891 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700892 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700893 GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
894 fail_pending_batch_in_call_combiner, calld,
895 grpc_schedule_on_exec_ctx);
896 GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
897 &calld->handle_pending_batch_in_call_combiner[i],
898 GRPC_ERROR_REF(error),
899 "waiting_for_pick_batches_fail");
Mark D. Roth76e264b2017-08-25 09:03:33 -0700900 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700901 if (calld->initial_metadata_batch != NULL) {
902 grpc_transport_stream_op_batch_finish_with_failure(
903 exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error),
904 calld->call_combiner);
905 } else {
906 GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
907 "waiting_for_pick_batches_fail");
908 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700909 GRPC_ERROR_UNREF(error);
910}
911
Mark D. Roth764cf042017-09-01 09:00:06 -0700912// This is called via the call combiner, so access to calld is synchronized.
913static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
914 void *arg, grpc_error *ignored) {
915 call_data *calld = arg;
916 if (calld->waiting_for_pick_batches_count > 0) {
917 --calld->waiting_for_pick_batches_count;
918 grpc_subchannel_call_process_op(
919 exec_ctx, calld->subchannel_call,
920 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]);
Mark D. Rothbf199612017-08-29 16:59:07 -0700921 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700922}
923
924// This is called via the call combiner, so access to calld is synchronized.
925static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
926 grpc_call_element *elem) {
927 channel_data *chand = elem->channel_data;
928 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700929 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
930 gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
931 " pending batches to subchannel_call=%p",
Mark D. Roth764cf042017-09-01 09:00:06 -0700932 chand, calld, calld->waiting_for_pick_batches_count,
933 calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700934 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700935 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700936 GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
937 run_pending_batch_in_call_combiner, calld,
938 grpc_schedule_on_exec_ctx);
939 GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
940 &calld->handle_pending_batch_in_call_combiner[i],
941 GRPC_ERROR_NONE,
942 "waiting_for_pick_batches_resume");
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700943 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700944 GPR_ASSERT(calld->initial_metadata_batch != NULL);
945 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
946 calld->initial_metadata_batch);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700947}
948
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700949// Applies service config to the call. Must be invoked once we know
950// that the resolver has returned results to the channel.
951static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
952 grpc_call_element *elem) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700953 channel_data *chand = elem->channel_data;
954 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700955 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
956 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
957 chand, calld);
958 }
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700959 if (chand->retry_throttle_data != NULL) {
960 calld->retry_throttle_data =
961 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
962 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700963 if (chand->method_params_table != NULL) {
964 calld->method_params = grpc_method_config_table_get(
965 exec_ctx, chand->method_params_table, calld->path);
966 if (calld->method_params != NULL) {
967 method_parameters_ref(calld->method_params);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700968 // If the deadline from the service config is shorter than the one
969 // from the client API, reset the deadline timer.
970 if (chand->deadline_checking_enabled &&
971 gpr_time_cmp(calld->method_params->timeout,
Craig Tiller11c17d42017-03-13 13:36:34 -0700972 gpr_time_0(GPR_TIMESPAN)) != 0) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700973 const gpr_timespec per_method_deadline =
Craig Tiller11c17d42017-03-13 13:36:34 -0700974 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700975 if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
976 calld->deadline = per_method_deadline;
977 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
978 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700979 }
980 }
981 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700982}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700983
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700984static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700985 grpc_call_element *elem,
986 grpc_error *error) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700987 channel_data *chand = elem->channel_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700988 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700989 const grpc_connected_subchannel_call_args call_args = {
990 .pollent = calld->pollent,
991 .path = calld->path,
992 .start_time = calld->call_start_time,
993 .deadline = calld->deadline,
994 .arena = calld->arena,
Mark D. Roth764cf042017-09-01 09:00:06 -0700995 .context = calld->subchannel_call_context,
996 .call_combiner = calld->call_combiner};
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700997 grpc_error *new_error = grpc_connected_subchannel_create_call(
Mark D. Roth764cf042017-09-01 09:00:06 -0700998 exec_ctx, calld->connected_subchannel, &call_args,
999 &calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001000 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1001 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth764cf042017-09-01 09:00:06 -07001002 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001003 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001004 if (new_error != GRPC_ERROR_NONE) {
1005 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth764cf042017-09-01 09:00:06 -07001006 waiting_for_pick_batches_fail(exec_ctx, elem, new_error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001007 } else {
Mark D. Roth764cf042017-09-01 09:00:06 -07001008 waiting_for_pick_batches_resume(exec_ctx, elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07001009 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001010 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07001011}
1012
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001013static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
1014 grpc_call_element *elem,
Craig Tillerbefafe62017-02-09 11:30:54 -08001015 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001016 call_data *calld = elem->call_data;
1017 channel_data *chand = elem->channel_data;
Yuchen Zeng19656b12016-09-01 18:00:45 -07001018 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
1019 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001020 if (calld->connected_subchannel == NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001021 // Failed to create subchannel.
Mark D. Roth764cf042017-09-01 09:00:06 -07001022 GRPC_ERROR_UNREF(calld->error);
1023 calld->error = error == GRPC_ERROR_NONE
1024 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1025 "Call dropped by load balancing policy")
1026 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1027 "Failed to create subchannel", &error, 1);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001028 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1029 gpr_log(GPR_DEBUG,
1030 "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
Mark D. Roth764cf042017-09-01 09:00:06 -07001031 calld, grpc_error_string(calld->error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001032 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001033 waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001034 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07001035 /* Create call on subchannel. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001036 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001037 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001038 GRPC_ERROR_UNREF(error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001039}
1040
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001041/** Return true if subchannel is available immediately (in which case
1042 subchannel_ready_locked() should not be called), or false otherwise (in
1043 which case subchannel_ready_locked() should be called when the subchannel
1044 is available). */
1045static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1046 grpc_call_element *elem);
1047
Craig Tiller577c9b22015-11-02 14:11:15 -08001048typedef struct {
Craig Tiller577c9b22015-11-02 14:11:15 -08001049 grpc_call_element *elem;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001050 bool finished;
Craig Tiller577c9b22015-11-02 14:11:15 -08001051 grpc_closure closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001052 grpc_closure cancel_closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001053} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08001054
Mark D. Roth764cf042017-09-01 09:00:06 -07001055// Note: This runs under the client_channel combiner, but will NOT be
1056// holding the call combiner.
1057static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
1058 void *arg,
1059 grpc_error *error) {
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001060 pick_after_resolver_result_args *args = arg;
1061 if (args->finished) {
1062 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001063 return;
Mark D. Roth764cf042017-09-01 09:00:06 -07001064 }
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001065 args->finished = true;
1066 grpc_call_element *elem = args->elem;
1067 channel_data *chand = elem->channel_data;
1068 call_data *calld = elem->call_data;
1069 // If we don't yet have a resolver result, then a closure for
1070 // pick_after_resolver_result_done_locked() will have been added to
1071 // chand->waiting_for_resolver_result_closures, and it may not be invoked
1072 // until after this call has been destroyed. We mark the operation as
1073 // finished, so that when pick_after_resolver_result_done_locked()
1074 // is called, it will be a no-op. We also immediately invoke
1075 // subchannel_ready_locked() to propagate the error back to the caller.
1076 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1077 gpr_log(GPR_DEBUG,
1078 "chand=%p calld=%p: cancelling pick waiting for resolver result",
1079 chand, calld);
1080 }
1081 // Note: Although we are not in the call combiner here, we are
1082 // basically stealing the call combiner from the pending pick, so
1083 // it's safe to call subchannel_ready_locked() here -- we are
1084 // essentially calling it here instead of calling it in
1085 // pick_after_resolver_result_done_locked().
1086 subchannel_ready_locked(
1087 exec_ctx, elem,
1088 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Pick cancelled",
1089 &error, 1));
Mark D. Roth764cf042017-09-01 09:00:06 -07001090}
1091
Mark D. Roth60751fe2017-07-07 12:50:33 -07001092static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
1093 void *arg,
1094 grpc_error *error) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001095 pick_after_resolver_result_args *args = arg;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001096 if (args->finished) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001097 /* cancelled, do nothing */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001098 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1099 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
1100 }
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001101 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001102 return;
1103 }
1104 args->finished = true;
1105 grpc_call_element *elem = args->elem;
1106 channel_data *chand = elem->channel_data;
1107 call_data *calld = elem->call_data;
1108 grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
1109 NULL);
1110 if (error != GRPC_ERROR_NONE) {
1111 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1112 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
1113 chand, calld);
1114 }
1115 subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001116 } else {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001117 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1118 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
1119 chand, calld);
1120 }
1121 if (pick_subchannel_locked(exec_ctx, elem)) {
1122 subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001123 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001124 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001125}
1126
Mark D. Roth60751fe2017-07-07 12:50:33 -07001127static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
1128 grpc_call_element *elem) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001129 channel_data *chand = elem->channel_data;
1130 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001131 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1132 gpr_log(GPR_DEBUG,
1133 "chand=%p calld=%p: deferring pick pending resolver result", chand,
1134 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001135 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001136 pick_after_resolver_result_args *args =
1137 (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
1138 args->elem = elem;
1139 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
1140 args, grpc_combiner_scheduler(chand->combiner));
1141 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
1142 &args->closure, GRPC_ERROR_NONE);
Mark D. Roth764cf042017-09-01 09:00:06 -07001143 grpc_call_combiner_set_notify_on_cancel(
1144 exec_ctx, calld->call_combiner,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001145 GRPC_CLOSURE_INIT(&args->cancel_closure,
1146 pick_after_resolver_result_cancel_locked, args,
Mark D. Roth764cf042017-09-01 09:00:06 -07001147 grpc_combiner_scheduler(chand->combiner)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001148}
1149
Mark D. Roth764cf042017-09-01 09:00:06 -07001150// Note: This runs under the client_channel combiner, but will NOT be
1151// holding the call combiner.
1152static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg,
1153 grpc_error *error) {
1154 grpc_call_element *elem = arg;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001155 channel_data *chand = elem->channel_data;
1156 call_data *calld = elem->call_data;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001157 if (error != GRPC_ERROR_NONE && calld->lb_policy != NULL) {
Mark D. Roth764cf042017-09-01 09:00:06 -07001158 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1159 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
1160 chand, calld, calld->lb_policy);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001161 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001162 grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
1163 &calld->connected_subchannel,
1164 GRPC_ERROR_REF(error));
Mark D. Roth64a317c2017-05-02 08:27:08 -07001165 }
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001166 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel");
Mark D. Roth64a317c2017-05-02 08:27:08 -07001167}
1168
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001169// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
Mark D. Roth764cf042017-09-01 09:00:06 -07001170// Unrefs the LB policy and invokes subchannel_ready_locked().
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001171static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1172 grpc_error *error) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001173 grpc_call_element *elem = arg;
1174 channel_data *chand = elem->channel_data;
1175 call_data *calld = elem->call_data;
1176 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1177 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
1178 chand, calld);
1179 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001180 grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001181 GPR_ASSERT(calld->lb_policy != NULL);
1182 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1183 calld->lb_policy = NULL;
1184 subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001185}
1186
1187// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
1188// If the pick was completed synchronously, unrefs the LB policy and
1189// returns true.
1190static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
1191 grpc_call_element *elem,
1192 const grpc_lb_policy_pick_args *inputs) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001193 channel_data *chand = elem->channel_data;
1194 call_data *calld = elem->call_data;
Mark D. Roth60751fe2017-07-07 12:50:33 -07001195 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1196 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
1197 chand, calld, chand->lb_policy);
1198 }
1199 // Keep a ref to the LB policy in calld while the pick is pending.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001200 GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001201 calld->lb_policy = chand->lb_policy;
1202 GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001203 grpc_combiner_scheduler(chand->combiner));
1204 const bool pick_done = grpc_lb_policy_pick_locked(
1205 exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001206 calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001207 if (pick_done) {
1208 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001209 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1210 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
1211 chand, calld);
1212 }
1213 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1214 calld->lb_policy = NULL;
Mark D. Roth764cf042017-09-01 09:00:06 -07001215 } else {
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001216 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
Mark D. Roth764cf042017-09-01 09:00:06 -07001217 grpc_call_combiner_set_notify_on_cancel(
1218 exec_ctx, calld->call_combiner,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001219 GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure,
1220 pick_callback_cancel_locked, elem,
1221 grpc_combiner_scheduler(chand->combiner)));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001222 }
1223 return pick_done;
1224}
Craig Tiller577c9b22015-11-02 14:11:15 -08001225
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001226static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1227 grpc_call_element *elem) {
1228 GPR_TIMER_BEGIN("pick_subchannel", 0);
1229 channel_data *chand = elem->channel_data;
1230 call_data *calld = elem->call_data;
1231 bool pick_done = false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001232 if (chand->lb_policy != NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001233 apply_service_config_to_call_locked(exec_ctx, elem);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001234 // If the application explicitly set wait_for_ready, use that.
1235 // Otherwise, if the service config specified a value for this
1236 // method, use that.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001237 uint32_t initial_metadata_flags =
Mark D. Roth764cf042017-09-01 09:00:06 -07001238 calld->initial_metadata_batch->payload->send_initial_metadata
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001239 .send_initial_metadata_flags;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001240 const bool wait_for_ready_set_from_api =
1241 initial_metadata_flags &
1242 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1243 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001244 calld->method_params != NULL &&
1245 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001246 if (!wait_for_ready_set_from_api &&
1247 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001248 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001249 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1250 } else {
1251 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1252 }
1253 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001254 const grpc_lb_policy_pick_args inputs = {
Mark D. Roth764cf042017-09-01 09:00:06 -07001255 calld->initial_metadata_batch->payload->send_initial_metadata
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001256 .send_initial_metadata,
1257 initial_metadata_flags, &calld->lb_token_mdelem};
1258 pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
1259 } else if (chand->resolver != NULL) {
1260 if (!chand->started_resolving) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001261 start_resolving_locked(exec_ctx, chand);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001262 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001263 pick_after_resolver_result_start_locked(exec_ctx, elem);
Craig Tiller0eab6972016-04-23 12:59:57 -07001264 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001265 subchannel_ready_locked(
1266 exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001267 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001268 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001269 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001270}
1271
Mark D. Roth764cf042017-09-01 09:00:06 -07001272static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg,
1273 grpc_error *error_ignored) {
1274 GPR_TIMER_BEGIN("start_pick_locked", 0);
1275 grpc_call_element *elem = (grpc_call_element *)arg;
1276 call_data *calld = (call_data *)elem->call_data;
1277 channel_data *chand = (channel_data *)elem->channel_data;
1278 GPR_ASSERT(calld->connected_subchannel == NULL);
1279 if (pick_subchannel_locked(exec_ctx, elem)) {
1280 // Pick was returned synchronously.
1281 if (calld->connected_subchannel == NULL) {
1282 GRPC_ERROR_UNREF(calld->error);
1283 calld->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1284 "Call dropped by load balancing policy");
1285 waiting_for_pick_batches_fail(exec_ctx, elem,
1286 GRPC_ERROR_REF(calld->error));
Mark D. Rothbf199612017-08-29 16:59:07 -07001287 } else {
Mark D. Roth764cf042017-09-01 09:00:06 -07001288 // Create subchannel call.
1289 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
Mark D. Rothbf199612017-08-29 16:59:07 -07001290 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001291 } else {
1292 // Pick will be done asynchronously. Add the call's polling entity to
1293 // the channel's interested_parties, so that I/O for the resolver
1294 // and LB policy can be done under it.
1295 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1296 chand->interested_parties);
Mark D. Rothbf199612017-08-29 16:59:07 -07001297 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001298 GPR_TIMER_END("start_pick_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001299}
1300
Mark D. Rothde144102017-03-15 10:11:03 -07001301static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001302 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001303 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001304 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001305 if (error == GRPC_ERROR_NONE) {
1306 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001307 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001308 } else {
1309 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001310 // decide whether or not to retry. Note that we should only
1311 // record failures whose statuses match the configured retryable
1312 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001313 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001314 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001315 }
1316 }
ncteisen274bbbe2017-06-08 14:57:11 -07001317 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
Mark D. Roth95039b52017-02-24 07:59:45 -08001318 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001319}
1320
Craig Tillere1b51da2017-03-31 15:44:33 -07001321static void cc_start_transport_stream_op_batch(
1322 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001323 grpc_transport_stream_op_batch *batch) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001324 call_data *calld = elem->call_data;
Yuchen Zeng19656b12016-09-01 18:00:45 -07001325 channel_data *chand = elem->channel_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001326 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001327 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001328 batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001329 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001330 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
1331 // If we've previously been cancelled, immediately fail any new batches.
1332 if (calld->error != GRPC_ERROR_NONE) {
1333 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1334 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1335 chand, calld, grpc_error_string(calld->error));
1336 }
1337 grpc_transport_stream_op_batch_finish_with_failure(
1338 exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
1339 goto done;
1340 }
1341 if (batch->cancel_stream) {
1342 // Stash a copy of cancel_error in our call data, so that we can use
1343 // it for subsequent operations. This ensures that if the call is
1344 // cancelled before any batches are passed down (e.g., if the deadline
1345 // is in the past when the call starts), we can return the right
1346 // error to the caller when the first batch does get passed down.
1347 GRPC_ERROR_UNREF(calld->error);
1348 calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1349 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1350 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
1351 calld, grpc_error_string(calld->error));
1352 }
1353 // If we have a subchannel call, send the cancellation batch down.
1354 // Otherwise, fail all pending batches.
1355 if (calld->subchannel_call != NULL) {
1356 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
1357 } else {
1358 waiting_for_pick_batches_add(calld, batch);
1359 waiting_for_pick_batches_fail(exec_ctx, elem,
1360 GRPC_ERROR_REF(calld->error));
1361 }
1362 goto done;
1363 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001364 // Intercept on_complete for recv_trailing_metadata so that we can
1365 // check retry throttle status.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001366 if (batch->recv_trailing_metadata) {
1367 GPR_ASSERT(batch->on_complete != NULL);
1368 calld->original_on_complete = batch->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001369 GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
1370 grpc_schedule_on_exec_ctx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001371 batch->on_complete = &calld->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001372 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001373 // Check if we've already gotten a subchannel call.
1374 // Note that once we have completed the pick, we do not need to enter
1375 // the channel combiner, which is more efficient (especially for
1376 // streaming calls).
1377 if (calld->subchannel_call != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001378 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1379 gpr_log(GPR_DEBUG,
1380 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
Mark D. Roth764cf042017-09-01 09:00:06 -07001381 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001382 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001383 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001384 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001385 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001386 // We do not yet have a subchannel call.
1387 // Add the batch to the waiting-for-pick list.
1388 waiting_for_pick_batches_add(calld, batch);
1389 // For batches containing a send_initial_metadata op, enter the channel
1390 // combiner to start a pick.
1391 if (batch->send_initial_metadata) {
1392 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1393 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
1394 }
1395 GRPC_CLOSURE_SCHED(
1396 exec_ctx,
1397 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
1398 elem, grpc_combiner_scheduler(chand->combiner)),
1399 GRPC_ERROR_NONE);
1400 } else {
1401 // For all other batches, release the call combiner.
1402 if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
1403 gpr_log(GPR_DEBUG,
1404 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
1405 calld);
1406 }
1407 GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
1408 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001409 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001410done:
Craig Tillera0f3abd2017-03-31 15:42:16 -07001411 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001412}
1413
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001414/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001415static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1416 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001417 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001418 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001419 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001420 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001421 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001422 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001423 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Craig Tillerd426cac2017-03-13 12:30:45 -07001424 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001425 calld->owning_call = args->call_stack;
Mark D. Roth764cf042017-09-01 09:00:06 -07001426 calld->call_combiner = args->call_combiner;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001427 if (chand->deadline_checking_enabled) {
Mark D. Roth764cf042017-09-01 09:00:06 -07001428 grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
1429 args->call_combiner, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001430 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001431 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001432}
1433
1434/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001435static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1436 grpc_call_element *elem,
1437 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001438 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001439 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001440 channel_data *chand = elem->channel_data;
1441 if (chand->deadline_checking_enabled) {
1442 grpc_deadline_state_destroy(exec_ctx, elem);
1443 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001444 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001445 if (calld->method_params != NULL) {
1446 method_parameters_unref(calld->method_params);
1447 }
Mark D. Roth764cf042017-09-01 09:00:06 -07001448 GRPC_ERROR_UNREF(calld->error);
1449 if (calld->subchannel_call != NULL) {
1450 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001451 then_schedule_closure);
Craig Tillerd426cac2017-03-13 12:30:45 -07001452 then_schedule_closure = NULL;
Mark D. Roth764cf042017-09-01 09:00:06 -07001453 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001454 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001455 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001456 GPR_ASSERT(calld->lb_policy == NULL);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001457 GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001458 if (calld->connected_subchannel != NULL) {
1459 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1460 "picked");
1461 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001462 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1463 if (calld->subchannel_call_context[i].value != NULL) {
1464 calld->subchannel_call_context[i].destroy(
1465 calld->subchannel_call_context[i].value);
1466 }
1467 }
ncteisen274bbbe2017-06-08 14:57:11 -07001468 GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001469}
1470
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001471static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1472 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001473 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001474 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001475 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001476}
1477
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001478/*************************************************************************
1479 * EXPORTED SYMBOLS
1480 */
1481
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001482const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001483 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001484 cc_start_transport_op,
1485 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001486 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001487 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001488 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001489 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001490 cc_init_channel_elem,
1491 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001492 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001493 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001494};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001495
Craig Tiller613dafa2017-02-09 12:00:43 -08001496static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1497 grpc_error *error_ignored) {
1498 channel_data *chand = arg;
1499 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001500 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001501 } else {
1502 chand->exit_idle_when_lb_policy_arrives = true;
1503 if (!chand->started_resolving && chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001504 start_resolving_locked(exec_ctx, chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08001505 }
1506 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001507 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001508}
1509
Craig Tillera82950e2015-09-22 12:33:20 -07001510grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1511 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001512 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001513 grpc_connectivity_state out =
1514 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001515 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001516 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07001517 GRPC_CLOSURE_SCHED(
1518 exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -07001519 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001520 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001521 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001522 return out;
1523}
1524
Alexander Polcync3b1f182017-04-18 13:51:36 -07001525typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001526 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001527 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001528 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001529 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001530 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001531 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001532 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001533} external_connectivity_watcher;
1534
Alexander Polcync3b1f182017-04-18 13:51:36 -07001535static external_connectivity_watcher *lookup_external_connectivity_watcher(
1536 channel_data *chand, grpc_closure *on_complete) {
1537 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1538 external_connectivity_watcher *w =
1539 chand->external_connectivity_watcher_list_head;
1540 while (w != NULL && w->on_complete != on_complete) {
1541 w = w->next;
1542 }
1543 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1544 return w;
1545}
1546
1547static void external_connectivity_watcher_list_append(
1548 channel_data *chand, external_connectivity_watcher *w) {
1549 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1550
1551 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1552 GPR_ASSERT(!w->next);
1553 w->next = chand->external_connectivity_watcher_list_head;
1554 chand->external_connectivity_watcher_list_head = w;
1555 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1556}
1557
1558static void external_connectivity_watcher_list_remove(
1559 channel_data *chand, external_connectivity_watcher *too_remove) {
1560 GPR_ASSERT(
1561 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1562 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1563 if (too_remove == chand->external_connectivity_watcher_list_head) {
1564 chand->external_connectivity_watcher_list_head = too_remove->next;
1565 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1566 return;
1567 }
1568 external_connectivity_watcher *w =
1569 chand->external_connectivity_watcher_list_head;
1570 while (w != NULL) {
1571 if (w->next == too_remove) {
1572 w->next = w->next->next;
1573 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1574 return;
1575 }
1576 w = w->next;
1577 }
1578 GPR_UNREACHABLE_CODE(return );
1579}
1580
1581int grpc_client_channel_num_external_connectivity_watchers(
1582 grpc_channel_element *elem) {
1583 channel_data *chand = elem->channel_data;
1584 int count = 0;
1585
1586 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1587 external_connectivity_watcher *w =
1588 chand->external_connectivity_watcher_list_head;
1589 while (w != NULL) {
1590 count++;
1591 w = w->next;
1592 }
1593 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1594
1595 return count;
1596}
1597
Craig Tiller1d881fb2015-12-01 07:39:04 -08001598static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001599 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001600 external_connectivity_watcher *w = arg;
1601 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001602 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1603 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001604 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1605 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001606 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001607 gpr_free(w);
ncteisen274bbbe2017-06-08 14:57:11 -07001608 GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08001609}
1610
Craig Tillera8610c02017-02-14 10:05:11 -08001611static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1612 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001613 external_connectivity_watcher *w = arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001614 external_connectivity_watcher *found = NULL;
1615 if (w->state != NULL) {
1616 external_connectivity_watcher_list_append(w->chand, w);
ncteisen274bbbe2017-06-08 14:57:11 -07001617 GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1618 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
Alexander Polcync3b1f182017-04-18 13:51:36 -07001619 grpc_schedule_on_exec_ctx);
1620 grpc_connectivity_state_notify_on_state_change(
1621 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1622 } else {
1623 GPR_ASSERT(w->watcher_timer_init == NULL);
1624 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1625 if (found) {
1626 GPR_ASSERT(found->on_complete == w->on_complete);
1627 grpc_connectivity_state_notify_on_state_change(
1628 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1629 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001630 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1631 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001632 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1633 "external_connectivity_watcher");
1634 gpr_free(w);
1635 }
Craig Tiller86c99582015-11-25 15:22:26 -08001636}
1637
Craig Tillera82950e2015-09-22 12:33:20 -07001638void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001639 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1640 grpc_polling_entity pollent, grpc_connectivity_state *state,
1641 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001642 channel_data *chand = elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001643 external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001644 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001645 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001646 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001647 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001648 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001649 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1650 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001651 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1652 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07001653 GRPC_CLOSURE_SCHED(
Craig Tiller613dafa2017-02-09 12:00:43 -08001654 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -07001655 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07001656 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001657 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001658}