blob: 70209a502099dc9dd57cd09460f001d610ea813d [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070024#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080025#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070026#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080027
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028#include <grpc/support/alloc.h>
29#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070030#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080031#include <grpc/support/sync.h>
32#include <grpc/support/useful.h>
33
Craig Tiller9eb0fde2017-03-31 16:59:30 -070034#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
35#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
36#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
37#include "src/core/ext/filters/client_channel/resolver_registry.h"
38#include "src/core/ext/filters/client_channel/retry_throttle.h"
39#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070040#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/channel/channel_args.h"
42#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080043#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070045#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070046#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080047#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070048#include "src/core/lib/support/string.h"
49#include "src/core/lib/surface/channel.h"
50#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070051#include "src/core/lib/transport/metadata.h"
52#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070053#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070054#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070055
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080056/* Client channel implementation */
57
Craig Tiller694580f2017-10-18 14:48:14 -070058grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070059
Mark D. Roth26b7be42016-10-24 10:08:07 -070060/*************************************************************************
61 * METHOD-CONFIG TABLE
62 */
63
Mark D. Roth9d480942016-10-19 14:18:05 -070064typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080065 /* zero so it can be default initialized */
66 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070067 WAIT_FOR_READY_FALSE,
68 WAIT_FOR_READY_TRUE
69} wait_for_ready_value;
70
Mark D. Roth95b627b2017-02-24 11:02:58 -080071typedef struct {
72 gpr_refcount refs;
Craig Tiller89c14282017-07-19 15:32:27 -070073 grpc_millis timeout;
Mark D. Roth9d480942016-10-19 14:18:05 -070074 wait_for_ready_value wait_for_ready;
75} method_parameters;
76
Mark D. Roth722de8d2017-02-27 10:50:44 -080077static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080078 method_parameters *method_params) {
79 gpr_ref(&method_params->refs);
80 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070081}
82
Mark D. Roth95b627b2017-02-24 11:02:58 -080083static void method_parameters_unref(method_parameters *method_params) {
84 if (gpr_unref(&method_params->refs)) {
85 gpr_free(method_params);
86 }
87}
88
Mark D. Roth95b627b2017-02-24 11:02:58 -080089static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
Yash Tibrewalbc130da2017-09-12 22:44:08 -070090 method_parameters_unref((method_parameters *)value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -080091}
92
Mark D. Roth95b627b2017-02-24 11:02:58 -080093static bool parse_wait_for_ready(grpc_json *field,
94 wait_for_ready_value *wait_for_ready) {
95 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
96 return false;
97 }
98 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
99 : WAIT_FOR_READY_FALSE;
100 return true;
101}
102
Craig Tiller89c14282017-07-19 15:32:27 -0700103static bool parse_timeout(grpc_json *field, grpc_millis *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800104 if (field->type != GRPC_JSON_STRING) return false;
105 size_t len = strlen(field->value);
106 if (field->value[len - 1] != 's') return false;
107 char *buf = gpr_strdup(field->value);
108 buf[len - 1] = '\0'; // Remove trailing 's'.
109 char *decimal_point = strchr(buf, '.');
Craig Tiller89c14282017-07-19 15:32:27 -0700110 int nanos = 0;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800111 if (decimal_point != NULL) {
112 *decimal_point = '\0';
Craig Tiller89c14282017-07-19 15:32:27 -0700113 nanos = gpr_parse_nonnegative_int(decimal_point + 1);
114 if (nanos == -1) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800115 gpr_free(buf);
116 return false;
117 }
118 // There should always be exactly 3, 6, or 9 fractional digits.
119 int multiplier = 1;
120 switch (strlen(decimal_point + 1)) {
121 case 9:
122 break;
123 case 6:
124 multiplier *= 1000;
125 break;
126 case 3:
127 multiplier *= 1000000;
128 break;
129 default: // Unsupported number of digits.
130 gpr_free(buf);
131 return false;
132 }
Craig Tiller89c14282017-07-19 15:32:27 -0700133 nanos *= multiplier;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800134 }
Craig Tiller89c14282017-07-19 15:32:27 -0700135 int seconds = gpr_parse_nonnegative_int(buf);
Mark D. Roth95b627b2017-02-24 11:02:58 -0800136 gpr_free(buf);
Craig Tiller89c14282017-07-19 15:32:27 -0700137 if (seconds == -1) return false;
138 *timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800139 return true;
140}
141
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700142static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700143 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Craig Tiller89c14282017-07-19 15:32:27 -0700144 grpc_millis timeout = 0;
Mark D. Roth47f10842016-11-03 08:45:27 -0700145 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700146 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800147 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700148 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800149 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700150 } else if (strcmp(field->key, "timeout") == 0) {
Craig Tiller89c14282017-07-19 15:32:27 -0700151 if (timeout > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800152 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700153 }
154 }
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700155 method_parameters *value =
156 (method_parameters *)gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800157 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700158 value->timeout = timeout;
159 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700160 return value;
161}
162
Alexander Polcync3b1f182017-04-18 13:51:36 -0700163struct external_connectivity_watcher;
164
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700165/*************************************************************************
166 * CHANNEL-WIDE FUNCTIONS
167 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800168
Craig Tiller800dacb2015-10-06 09:10:26 -0700169typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700170 /** resolver for this channel */
171 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700172 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700173 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700174 /** is deadline checking enabled? */
175 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700176 /** client channel factory */
177 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700178
Craig Tillerbefafe62017-02-09 11:30:54 -0800179 /** combiner protecting all variables below in this data structure */
180 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700181 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700182 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800183 /** retry throttle data */
184 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700185 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800186 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700187 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700188 grpc_channel_args *resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700189 /** a list of closures that are all waiting for resolver result to come in */
190 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700191 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700192 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700193 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700194 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700195 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700196 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800197 /** owning stack */
198 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800199 /** interested parties (owned) */
200 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800201
Alexander Polcync3b1f182017-04-18 13:51:36 -0700202 /* external_connectivity_watcher_list head is guarded by its own mutex, since
203 * counts need to be grabbed immediately without polling on a cq */
204 gpr_mu external_connectivity_watcher_list_mu;
205 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
206
Craig Tiller613dafa2017-02-09 12:00:43 -0800207 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800208 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800209 gpr_mu info_mu;
210 char *info_lb_policy_name;
211 /** service config in JSON form */
212 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213} channel_data;
214
Craig Tillerd6c98df2015-08-18 09:33:44 -0700215/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700216 resolver, to watch for state changes from the lb_policy. When a state
217 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700218typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700219 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700220 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700221 grpc_connectivity_state state;
222 grpc_lb_policy *lb_policy;
223} lb_policy_connectivity_watcher;
224
Craig Tiller2400bf52017-02-09 16:25:19 -0800225static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
226 grpc_lb_policy *lb_policy,
227 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700228
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800229static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
230 channel_data *chand,
231 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700232 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800233 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700234 /* TODO: Improve failure handling:
235 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
236 * - Hand over pending picks from old policies during the switch that happens
237 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700238 if (chand->lb_policy != NULL) {
239 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
240 /* cancel picks with wait_for_ready=false */
241 grpc_lb_policy_cancel_picks_locked(
242 exec_ctx, chand->lb_policy,
243 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
244 /* check= */ 0, GRPC_ERROR_REF(error));
245 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
246 /* cancel all picks */
247 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
248 /* mask= */ 0, /* check= */ 0,
249 GRPC_ERROR_REF(error));
250 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800251 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700252 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700253 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
254 grpc_connectivity_state_name(state));
255 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700256 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
257 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800258}
259
Craig Tiller804ff712016-05-05 16:25:40 -0700260static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800261 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700262 lb_policy_connectivity_watcher *w = (lb_policy_connectivity_watcher *)arg;
Craig Tillercb2609f2015-11-24 17:19:19 -0800263 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800264 /* check if the notification is for the latest policy */
265 if (w->lb_policy == w->chand->lb_policy) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700266 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700267 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
268 w->lb_policy, grpc_connectivity_state_name(w->state));
269 }
Craig Tillerc5de8352017-02-09 14:08:05 -0800270 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
271 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800272 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800273 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
274 w->chand->lb_policy = NULL;
275 }
276 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
277 GRPC_ERROR_REF(error), "lb_changed");
278 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800279 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800280 }
Craig Tillera82950e2015-09-22 12:33:20 -0700281 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800282 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700283 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700284}
285
Craig Tiller2400bf52017-02-09 16:25:19 -0800286static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
287 grpc_lb_policy *lb_policy,
288 grpc_connectivity_state current_state) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700289 lb_policy_connectivity_watcher *w =
290 (lb_policy_connectivity_watcher *)gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800291 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700292 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700293 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700294 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700295 w->state = current_state;
296 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800297 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
298 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700299}
300
Mark D. Roth60751fe2017-07-07 12:50:33 -0700301static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
302 channel_data *chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700303 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700304 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
305 }
306 GPR_ASSERT(!chand->started_resolving);
307 chand->started_resolving = true;
308 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
309 grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
310 &chand->on_resolver_result_changed);
311}
312
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800313typedef struct {
314 char *server_name;
315 grpc_server_retry_throttle_data *retry_throttle_data;
316} service_config_parsing_state;
317
318static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700319 service_config_parsing_state *parsing_state =
320 (service_config_parsing_state *)arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800321 if (strcmp(field->key, "retryThrottling") == 0) {
322 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
323 if (field->type != GRPC_JSON_OBJECT) return;
324 int max_milli_tokens = 0;
325 int milli_token_ratio = 0;
326 for (grpc_json *sub_field = field->child; sub_field != NULL;
327 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800328 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800329 if (strcmp(sub_field->key, "maxTokens") == 0) {
330 if (max_milli_tokens != 0) return; // Duplicate.
331 if (sub_field->type != GRPC_JSON_NUMBER) return;
332 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
333 if (max_milli_tokens == -1) return;
334 max_milli_tokens *= 1000;
335 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
336 if (milli_token_ratio != 0) return; // Duplicate.
337 if (sub_field->type != GRPC_JSON_NUMBER) return;
338 // We support up to 3 decimal digits.
339 size_t whole_len = strlen(sub_field->value);
340 uint32_t multiplier = 1;
341 uint32_t decimal_value = 0;
342 const char *decimal_point = strchr(sub_field->value, '.');
343 if (decimal_point != NULL) {
344 whole_len = (size_t)(decimal_point - sub_field->value);
345 multiplier = 1000;
346 size_t decimal_len = strlen(decimal_point + 1);
347 if (decimal_len > 3) decimal_len = 3;
348 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
349 &decimal_value)) {
350 return;
351 }
352 uint32_t decimal_multiplier = 1;
353 for (size_t i = 0; i < (3 - decimal_len); ++i) {
354 decimal_multiplier *= 10;
355 }
356 decimal_value *= decimal_multiplier;
357 }
358 uint32_t whole_value;
359 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
360 &whole_value)) {
361 return;
362 }
363 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800364 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800365 }
366 }
367 parsing_state->retry_throttle_data =
368 grpc_retry_throttle_map_get_data_for_server(
369 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
370 }
371}
372
Craig Tillerbefafe62017-02-09 11:30:54 -0800373static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
374 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700375 channel_data *chand = (channel_data *)arg;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700376 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700377 gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
378 grpc_error_string(error));
379 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700380 // Extract the following fields from the resolver result, if non-NULL.
Mark D. Roth15494b52017-07-12 15:26:55 -0700381 bool lb_policy_updated = false;
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700382 char *lb_policy_name_dup = NULL;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700383 bool lb_policy_name_changed = false;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700384 grpc_lb_policy *new_lb_policy = NULL;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800385 char *service_config_json = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700386 grpc_server_retry_throttle_data *retry_throttle_data = NULL;
387 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth046cf762016-09-26 11:13:51 -0700388 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700389 // Find LB policy name.
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700390 const char *lb_policy_name = NULL;
Mark D. Rothaf842452016-10-21 15:05:15 -0700391 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700392 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700393 if (channel_arg != NULL) {
394 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
395 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700396 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700397 // Special case: If at least one balancer address is present, we use
398 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700399 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700400 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700401 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700402 grpc_lb_addresses *addresses =
403 (grpc_lb_addresses *)channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700404 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700405 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700406 if (addresses->addresses[i].is_balancer) {
407 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700408 break;
409 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700410 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700411 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700412 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
413 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700414 "resolver requested LB policy %s but provided at least one "
415 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700416 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700417 }
418 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700419 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700420 }
421 // Use pick_first if nothing was specified and we didn't select grpclb
422 // above.
423 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700424 grpc_lb_policy_args lb_policy_args;
425 lb_policy_args.args = chand->resolver_result;
426 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800427 lb_policy_args.combiner = chand->combiner;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700428 // Check to see if we're already using the right LB policy.
429 // Note: It's safe to use chand->info_lb_policy_name here without
430 // taking a lock on chand->info_mu, because this function is the
431 // only thing that modifies its value, and it can only be invoked
432 // once at any given time.
Mark D. Roth60751fe2017-07-07 12:50:33 -0700433 lb_policy_name_changed =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700434 chand->info_lb_policy_name == NULL ||
435 strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700436 if (chand->lb_policy != NULL && !lb_policy_name_changed) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700437 // Continue using the same LB policy. Update with new addresses.
Mark D. Roth15494b52017-07-12 15:26:55 -0700438 lb_policy_updated = true;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700439 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
440 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700441 // Instantiate new LB policy.
442 new_lb_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700443 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700444 if (new_lb_policy == NULL) {
445 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700446 }
Craig Tiller45724b32015-09-22 10:42:19 -0700447 }
Mark D. Roth41124992016-11-03 11:22:20 -0700448 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700449 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700450 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700451 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700452 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800453 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700454 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800455 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700456 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800457 channel_arg =
458 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
459 GPR_ASSERT(channel_arg != NULL);
460 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700461 grpc_uri *uri =
462 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800463 GPR_ASSERT(uri->path[0] != '\0');
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700464 service_config_parsing_state parsing_state;
465 memset(&parsing_state, 0, sizeof(parsing_state));
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800466 parsing_state.server_name =
467 uri->path[0] == '/' ? uri->path + 1 : uri->path;
468 grpc_service_config_parse_global_params(
469 service_config, parse_retry_throttle_params, &parsing_state);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800470 grpc_uri_destroy(uri);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700471 retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700472 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800473 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700474 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700475 grpc_service_config_destroy(service_config);
476 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700477 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700478 // Before we clean up, save a copy of lb_policy_name, since it might
479 // be pointing to data inside chand->resolver_result.
480 // The copy will be saved in chand->lb_policy_name below.
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700481 lb_policy_name_dup = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800482 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700483 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700484 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700485 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700486 gpr_log(GPR_DEBUG,
487 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
488 "service_config=\"%s\"",
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700489 chand, lb_policy_name_dup,
490 lb_policy_name_changed ? " (changed)" : "", service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700491 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700492 // Now swap out fields in chand. Note that the new values may still
493 // be NULL if (e.g.) the resolver failed to return results or the
494 // results did not contain the necessary data.
495 //
496 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800497 gpr_mu_lock(&chand->info_mu);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700498 if (lb_policy_name_dup != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800499 gpr_free(chand->info_lb_policy_name);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700500 chand->info_lb_policy_name = lb_policy_name_dup;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700501 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800502 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800503 gpr_free(chand->info_service_config_json);
504 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800505 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800506 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700507 // Swap out the retry throttle data.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800508 if (chand->retry_throttle_data != NULL) {
509 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
510 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700511 chand->retry_throttle_data = retry_throttle_data;
512 // Swap out the method params table.
Mark D. Roth9d480942016-10-19 14:18:05 -0700513 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800514 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700515 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700516 chand->method_params_table = method_params_table;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700517 // If we have a new LB policy or are shutting down (in which case
518 // new_lb_policy will be NULL), swap out the LB policy, unreffing the
519 // old one and removing its fds from chand->interested_parties.
520 // Note that we do NOT do this if either (a) we updated the existing
521 // LB policy above or (b) we failed to create the new LB policy (in
522 // which case we want to continue using the most recent one we had).
523 if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
524 chand->resolver == NULL) {
525 if (chand->lb_policy != NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700526 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700527 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
528 chand->lb_policy);
529 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700530 grpc_pollset_set_del_pollset_set(exec_ctx,
531 chand->lb_policy->interested_parties,
532 chand->interested_parties);
533 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700534 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700535 chand->lb_policy = new_lb_policy;
536 }
537 // Now that we've swapped out the relevant fields of chand, check for
538 // error or shutdown.
539 if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700540 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700541 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
542 }
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800543 if (chand->resolver != NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700544 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700545 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
546 }
Craig Tiller972470b2017-02-09 15:05:36 -0800547 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800548 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
549 chand->resolver = NULL;
550 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800551 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700552 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700553 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700554 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700555 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700556 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
557 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
558 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
559 "Channel disconnected", &error, 1));
560 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
561 &chand->waiting_for_resolver_result_closures);
562 } else { // Not shutting down.
563 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
564 grpc_error *state_error =
565 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
566 if (new_lb_policy != NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700567 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700568 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
569 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700570 GRPC_ERROR_UNREF(state_error);
571 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
572 &state_error);
573 grpc_pollset_set_add_pollset_set(exec_ctx,
574 new_lb_policy->interested_parties,
575 chand->interested_parties);
576 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
577 &chand->waiting_for_resolver_result_closures);
578 if (chand->exit_idle_when_lb_policy_arrives) {
579 grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
580 chand->exit_idle_when_lb_policy_arrives = false;
581 }
582 watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
583 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700584 if (!lb_policy_updated) {
585 set_channel_connectivity_state_locked(exec_ctx, chand, state,
586 GRPC_ERROR_REF(state_error),
587 "new_lb+resolver");
588 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700589 grpc_resolver_next_locked(exec_ctx, chand->resolver,
590 &chand->resolver_result,
591 &chand->on_resolver_result_changed);
592 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700593 }
Craig Tiller3f475422015-06-25 10:43:05 -0700594}
595
Craig Tillera8610c02017-02-14 10:05:11 -0800596static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
597 grpc_error *error_ignored) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700598 grpc_transport_op *op = (grpc_transport_op *)arg;
599 grpc_channel_element *elem =
600 (grpc_channel_element *)op->handler_private.extra_arg;
601 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700602
Craig Tillera82950e2015-09-22 12:33:20 -0700603 if (op->on_connectivity_state_change != NULL) {
604 grpc_connectivity_state_notify_on_state_change(
605 exec_ctx, &chand->state_tracker, op->connectivity_state,
606 op->on_connectivity_state_change);
607 op->on_connectivity_state_change = NULL;
608 op->connectivity_state = NULL;
609 }
610
Craig Tiller26dab312015-12-07 14:43:47 -0800611 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800612 if (chand->lb_policy == NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700613 GRPC_CLOSURE_SCHED(
ncteisen4b36a3d2017-03-13 19:08:06 -0700614 exec_ctx, op->send_ping,
615 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800616 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800617 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800618 op->bind_pollset = NULL;
619 }
620 op->send_ping = NULL;
621 }
622
Craig Tiller1c51edc2016-05-07 16:18:43 -0700623 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
624 if (chand->resolver != NULL) {
625 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700626 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700627 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800628 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700629 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
630 chand->resolver = NULL;
631 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700632 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700633 GRPC_ERROR_REF(op->disconnect_with_error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700634 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
635 &chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700636 }
637 if (chand->lb_policy != NULL) {
638 grpc_pollset_set_del_pollset_set(exec_ctx,
639 chand->lb_policy->interested_parties,
640 chand->interested_parties);
641 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
642 chand->lb_policy = NULL;
643 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700644 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700645 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700646 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800647 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
648
ncteisen274bbbe2017-06-08 14:57:11 -0700649 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800650}
651
652static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
653 grpc_channel_element *elem,
654 grpc_transport_op *op) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700655 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tillerbefafe62017-02-09 11:30:54 -0800656
Craig Tillerbefafe62017-02-09 11:30:54 -0800657 GPR_ASSERT(op->set_accept_stream == false);
658 if (op->bind_pollset != NULL) {
659 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
660 op->bind_pollset);
661 }
662
Craig Tillerc55c1022017-03-10 10:26:42 -0800663 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800664 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700665 GRPC_CLOSURE_SCHED(
Craig Tillerc55c1022017-03-10 10:26:42 -0800666 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -0700667 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700668 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800669 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700670}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800671
Mark D. Rothb2d24882016-10-27 15:44:07 -0700672static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
673 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700674 const grpc_channel_info *info) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700675 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800676 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700677 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800678 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700679 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800680 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700681 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800682 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800683 *info->service_config_json =
684 chand->info_service_config_json == NULL
685 ? NULL
686 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800687 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800688 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700689}
690
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700691/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800692static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800693 grpc_channel_element *elem,
694 grpc_channel_element_args *args) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700695 channel_data *chand = (channel_data *)elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700696 GPR_ASSERT(args->is_last);
697 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800698 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700699 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800700 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700701 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
702
703 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
704 chand->external_connectivity_watcher_list_head = NULL;
705 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
706
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800707 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700708 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800709 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700710 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800711 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700712 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
713 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800714 // Record client channel factory.
715 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
716 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700717 if (arg == NULL) {
718 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
719 "Missing client channel factory in args for client channel filter");
720 }
721 if (arg->type != GRPC_ARG_POINTER) {
722 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
723 "client channel factory arg must be a pointer");
724 }
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700725 grpc_client_channel_factory_ref(
726 (grpc_client_channel_factory *)arg->value.pointer.p);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700727 chand->client_channel_factory =
728 (grpc_client_channel_factory *)arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800729 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800730 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700731 if (arg == NULL) {
732 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
733 "Missing server uri in args for client channel filter");
734 }
735 if (arg->type != GRPC_ARG_STRING) {
736 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
737 "server uri arg must be a string");
738 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800739 char *proxy_name = NULL;
740 grpc_channel_args *new_args = NULL;
741 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
742 &proxy_name, &new_args);
743 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800744 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800745 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
746 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800747 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800748 if (proxy_name != NULL) gpr_free(proxy_name);
749 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800750 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700751 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800752 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700753 chand->deadline_checking_enabled =
754 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800755 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700756}
757
Craig Tiller972470b2017-02-09 15:05:36 -0800758static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
759 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700760 grpc_resolver *resolver = (grpc_resolver *)arg;
Craig Tiller972470b2017-02-09 15:05:36 -0800761 grpc_resolver_shutdown_locked(exec_ctx, resolver);
762 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
763}
764
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700765/* Destructor for channel_data */
766static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
767 grpc_channel_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700768 channel_data *chand = (channel_data *)elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700769 if (chand->resolver != NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700770 GRPC_CLOSURE_SCHED(
771 exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
Craig Tilleree4b1452017-05-12 10:56:03 -0700772 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800773 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700774 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700775 if (chand->client_channel_factory != NULL) {
776 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
777 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700778 if (chand->lb_policy != NULL) {
779 grpc_pollset_set_del_pollset_set(exec_ctx,
780 chand->lb_policy->interested_parties,
781 chand->interested_parties);
782 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
783 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800784 gpr_free(chand->info_lb_policy_name);
785 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800786 if (chand->retry_throttle_data != NULL) {
787 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
788 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700789 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800790 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700791 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700792 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800793 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800794 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800795 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700796 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700797}
798
799/*************************************************************************
800 * PER-CALL FUNCTIONS
801 */
802
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700803// Max number of batches that can be pending on a call at any given
804// time. This includes:
805// recv_initial_metadata
806// send_initial_metadata
807// recv_message
808// send_message
809// recv_trailing_metadata
810// send_trailing_metadata
Mark D. Roth76e264b2017-08-25 09:03:33 -0700811// We also add room for a single cancel_stream batch.
812#define MAX_WAITING_BATCHES 7
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700813
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700814/** Call data. Holds a pointer to grpc_subchannel_call and the
815 associated machinery to create such a pointer.
816 Handles queueing of stream ops until a call object is ready, waiting
817 for initial metadata before trying to create a call object,
818 and handling cancellation gracefully. */
819typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700820 // State for handling deadlines.
821 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700822 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700823 // and this struct both independently store pointers to the call stack
824 // and call combiner. If/when we have time, find a way to avoid this
825 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700826 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700827
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800828 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700829 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700830 grpc_millis deadline;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700831 gpr_arena *arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700832 grpc_call_stack *owning_call;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700833 grpc_call_combiner *call_combiner;
834
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700835 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800836 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700837
Mark D. Roth76e264b2017-08-25 09:03:33 -0700838 grpc_subchannel_call *subchannel_call;
839 grpc_error *error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700840
Mark D. Roth60751fe2017-07-07 12:50:33 -0700841 grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
842 grpc_closure lb_pick_closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700843 grpc_closure lb_pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700844
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700845 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700846 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700847 grpc_polling_entity *pollent;
848
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700849 grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
850 size_t waiting_for_pick_batches_count;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700851 grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700852
Mark D. Roth76e264b2017-08-25 09:03:33 -0700853 grpc_transport_stream_op_batch *initial_metadata_batch;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200854
855 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800856
857 grpc_closure on_complete;
858 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700859} call_data;
860
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800861grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
Mark D. Roth76e264b2017-08-25 09:03:33 -0700862 grpc_call_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700863 call_data *calld = (call_data *)elem->call_data;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700864 return calld->subchannel_call;
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800865}
866
Mark D. Roth76e264b2017-08-25 09:03:33 -0700867// This is called via the call combiner, so access to calld is synchronized.
868static void waiting_for_pick_batches_add(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700869 call_data *calld, grpc_transport_stream_op_batch *batch) {
Mark D. Roth76e264b2017-08-25 09:03:33 -0700870 if (batch->send_initial_metadata) {
871 GPR_ASSERT(calld->initial_metadata_batch == NULL);
872 calld->initial_metadata_batch = batch;
873 } else {
874 GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
875 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
876 batch;
877 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700878}
879
Mark D. Roth76e264b2017-08-25 09:03:33 -0700880// This is called via the call combiner, so access to calld is synchronized.
881static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
882 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700883 call_data *calld = (call_data *)arg;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700884 if (calld->waiting_for_pick_batches_count > 0) {
885 --calld->waiting_for_pick_batches_count;
886 grpc_transport_stream_op_batch_finish_with_failure(
887 exec_ctx,
888 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count],
889 GRPC_ERROR_REF(error), calld->call_combiner);
890 }
891}
892
893// This is called via the call combiner, so access to calld is synchronized.
894static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
895 grpc_call_element *elem,
896 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700897 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700898 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700899 gpr_log(GPR_DEBUG,
900 "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
901 elem->channel_data, calld, calld->waiting_for_pick_batches_count,
902 grpc_error_string(error));
903 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700904 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Mark D. Roth76e264b2017-08-25 09:03:33 -0700905 GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
906 fail_pending_batch_in_call_combiner, calld,
907 grpc_schedule_on_exec_ctx);
908 GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
909 &calld->handle_pending_batch_in_call_combiner[i],
910 GRPC_ERROR_REF(error),
911 "waiting_for_pick_batches_fail");
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700912 }
Mark D. Roth76e264b2017-08-25 09:03:33 -0700913 if (calld->initial_metadata_batch != NULL) {
914 grpc_transport_stream_op_batch_finish_with_failure(
915 exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error),
916 calld->call_combiner);
917 } else {
918 GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
919 "waiting_for_pick_batches_fail");
920 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700921 GRPC_ERROR_UNREF(error);
922}
923
Mark D. Roth76e264b2017-08-25 09:03:33 -0700924// This is called via the call combiner, so access to calld is synchronized.
925static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
926 void *arg, grpc_error *ignored) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700927 call_data *calld = (call_data *)arg;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700928 if (calld->waiting_for_pick_batches_count > 0) {
929 --calld->waiting_for_pick_batches_count;
930 grpc_subchannel_call_process_op(
931 exec_ctx, calld->subchannel_call,
932 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]);
Craig Tiller57726ca2016-09-12 11:59:45 -0700933 }
Mark D. Roth76e264b2017-08-25 09:03:33 -0700934}
935
936// This is called via the call combiner, so access to calld is synchronized.
937static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
938 grpc_call_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700939 channel_data *chand = (channel_data *)elem->channel_data;
940 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700941 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700942 gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
943 " pending batches to subchannel_call=%p",
Mark D. Roth76e264b2017-08-25 09:03:33 -0700944 chand, calld, calld->waiting_for_pick_batches_count,
945 calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700946 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700947 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Mark D. Roth76e264b2017-08-25 09:03:33 -0700948 GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
949 run_pending_batch_in_call_combiner, calld,
950 grpc_schedule_on_exec_ctx);
951 GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
952 &calld->handle_pending_batch_in_call_combiner[i],
953 GRPC_ERROR_NONE,
954 "waiting_for_pick_batches_resume");
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700955 }
Mark D. Roth76e264b2017-08-25 09:03:33 -0700956 GPR_ASSERT(calld->initial_metadata_batch != NULL);
957 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
958 calld->initial_metadata_batch);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700959}
960
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700961// Applies service config to the call. Must be invoked once we know
962// that the resolver has returned results to the channel.
963static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
964 grpc_call_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700965 channel_data *chand = (channel_data *)elem->channel_data;
966 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700967 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700968 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
969 chand, calld);
970 }
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700971 if (chand->retry_throttle_data != NULL) {
972 calld->retry_throttle_data =
973 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
974 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700975 if (chand->method_params_table != NULL) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700976 calld->method_params = (method_parameters *)grpc_method_config_table_get(
Craig Tiller11c17d42017-03-13 13:36:34 -0700977 exec_ctx, chand->method_params_table, calld->path);
978 if (calld->method_params != NULL) {
979 method_parameters_ref(calld->method_params);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700980 // If the deadline from the service config is shorter than the one
981 // from the client API, reset the deadline timer.
982 if (chand->deadline_checking_enabled &&
Craig Tiller89c14282017-07-19 15:32:27 -0700983 calld->method_params->timeout != 0) {
984 const grpc_millis per_method_deadline =
Craig Tiller9a8c3f32017-07-21 13:14:14 -0700985 grpc_timespec_to_millis_round_up(calld->call_start_time) +
Craig Tiller89c14282017-07-19 15:32:27 -0700986 calld->method_params->timeout;
987 if (per_method_deadline < calld->deadline) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700988 calld->deadline = per_method_deadline;
989 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
990 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700991 }
992 }
993 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700994}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700995
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700996static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700997 grpc_call_element *elem,
998 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700999 channel_data *chand = (channel_data *)elem->channel_data;
1000 call_data *calld = (call_data *)elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001001 const grpc_connected_subchannel_call_args call_args = {
Yash Tibrewald8b84a22017-09-25 13:38:03 -07001002 calld->pollent, // pollent
1003 calld->path, // path
1004 calld->call_start_time, // start_time
1005 calld->deadline, // deadline
1006 calld->arena, // arena
1007 calld->subchannel_call_context, // context
1008 calld->call_combiner // call_combiner
1009 };
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001010 grpc_error *new_error = grpc_connected_subchannel_create_call(
Mark D. Roth76e264b2017-08-25 09:03:33 -07001011 exec_ctx, calld->connected_subchannel, &call_args,
1012 &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001013 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001014 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07001015 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001016 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001017 if (new_error != GRPC_ERROR_NONE) {
1018 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001019 waiting_for_pick_batches_fail(exec_ctx, elem, new_error);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001020 } else {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001021 waiting_for_pick_batches_resume(exec_ctx, elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07001022 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001023 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07001024}
1025
Mark D. Rothb2929602017-09-11 09:31:11 -07001026// Invoked when a pick is completed, on both success or failure.
1027static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1028 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001029 call_data *calld = (call_data *)elem->call_data;
1030 channel_data *chand = (channel_data *)elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001031 if (calld->connected_subchannel == NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001032 // Failed to create subchannel.
Mark D. Roth76e264b2017-08-25 09:03:33 -07001033 GRPC_ERROR_UNREF(calld->error);
1034 calld->error = error == GRPC_ERROR_NONE
1035 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1036 "Call dropped by load balancing policy")
1037 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1038 "Failed to create subchannel", &error, 1);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001039 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001040 gpr_log(GPR_DEBUG,
1041 "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07001042 calld, grpc_error_string(calld->error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001043 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001044 waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001045 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07001046 /* Create call on subchannel. */
Mark D. Roth60751fe2017-07-07 12:50:33 -07001047 create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001048 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001049 GRPC_ERROR_UNREF(error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001050}
1051
Mark D. Rothb2929602017-09-11 09:31:11 -07001052// A wrapper around pick_done_locked() that is used in cases where
1053// either (a) the pick was deferred pending a resolver result or (b) the
1054// pick was done asynchronously. Removes the call's polling entity from
1055// chand->interested_parties before invoking pick_done_locked().
1056static void async_pick_done_locked(grpc_exec_ctx *exec_ctx,
1057 grpc_call_element *elem, grpc_error *error) {
1058 channel_data *chand = (channel_data *)elem->channel_data;
1059 call_data *calld = (call_data *)elem->call_data;
1060 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
1061 chand->interested_parties);
1062 pick_done_locked(exec_ctx, elem, error);
1063}
1064
1065// Note: This runs under the client_channel combiner, but will NOT be
1066// holding the call combiner.
1067static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg,
1068 grpc_error *error) {
1069 grpc_call_element *elem = (grpc_call_element *)arg;
1070 channel_data *chand = (channel_data *)elem->channel_data;
1071 call_data *calld = (call_data *)elem->call_data;
1072 if (calld->lb_policy != NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001073 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07001074 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
1075 chand, calld, calld->lb_policy);
1076 }
1077 grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
1078 &calld->connected_subchannel,
1079 GRPC_ERROR_REF(error));
1080 }
1081 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel");
1082}
1083
1084// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
1085// Unrefs the LB policy and invokes async_pick_done_locked().
1086static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1087 grpc_error *error) {
1088 grpc_call_element *elem = (grpc_call_element *)arg;
1089 channel_data *chand = (channel_data *)elem->channel_data;
1090 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -07001091 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07001092 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
1093 chand, calld);
1094 }
1095 GPR_ASSERT(calld->lb_policy != NULL);
1096 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1097 calld->lb_policy = NULL;
1098 async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
1099}
1100
1101// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
1102// If the pick was completed synchronously, unrefs the LB policy and
1103// returns true.
1104static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
1105 grpc_call_element *elem) {
1106 channel_data *chand = (channel_data *)elem->channel_data;
1107 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -07001108 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07001109 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
1110 chand, calld, chand->lb_policy);
1111 }
1112 apply_service_config_to_call_locked(exec_ctx, elem);
1113 // If the application explicitly set wait_for_ready, use that.
1114 // Otherwise, if the service config specified a value for this
1115 // method, use that.
1116 uint32_t initial_metadata_flags =
1117 calld->initial_metadata_batch->payload->send_initial_metadata
1118 .send_initial_metadata_flags;
1119 const bool wait_for_ready_set_from_api =
1120 initial_metadata_flags &
1121 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1122 const bool wait_for_ready_set_from_service_config =
1123 calld->method_params != NULL &&
1124 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
1125 if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) {
1126 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
1127 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1128 } else {
1129 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1130 }
1131 }
1132 const grpc_lb_policy_pick_args inputs = {
1133 calld->initial_metadata_batch->payload->send_initial_metadata
1134 .send_initial_metadata,
1135 initial_metadata_flags, &calld->lb_token_mdelem};
1136 // Keep a ref to the LB policy in calld while the pick is pending.
1137 GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
1138 calld->lb_policy = chand->lb_policy;
1139 GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
1140 grpc_combiner_scheduler(chand->combiner));
1141 const bool pick_done = grpc_lb_policy_pick_locked(
1142 exec_ctx, chand->lb_policy, &inputs, &calld->connected_subchannel,
1143 calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
1144 if (pick_done) {
1145 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
Craig Tiller6014e8a2017-10-16 13:50:29 -07001146 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07001147 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
1148 chand, calld);
1149 }
1150 GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
1151 calld->lb_policy = NULL;
1152 } else {
1153 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
1154 grpc_call_combiner_set_notify_on_cancel(
1155 exec_ctx, calld->call_combiner,
1156 GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure,
1157 pick_callback_cancel_locked, elem,
1158 grpc_combiner_scheduler(chand->combiner)));
1159 }
1160 return pick_done;
1161}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001162
Craig Tiller577c9b22015-11-02 14:11:15 -08001163typedef struct {
Craig Tiller577c9b22015-11-02 14:11:15 -08001164 grpc_call_element *elem;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001165 bool finished;
Craig Tiller577c9b22015-11-02 14:11:15 -08001166 grpc_closure closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001167 grpc_closure cancel_closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001168} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08001169
Mark D. Roth76e264b2017-08-25 09:03:33 -07001170// Note: This runs under the client_channel combiner, but will NOT be
1171// holding the call combiner.
1172static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
1173 void *arg,
1174 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001175 pick_after_resolver_result_args *args =
1176 (pick_after_resolver_result_args *)arg;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001177 if (args->finished) {
1178 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001179 return;
Mark D. Roth764cf042017-09-01 09:00:06 -07001180 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001181 // If we don't yet have a resolver result, then a closure for
1182 // pick_after_resolver_result_done_locked() will have been added to
1183 // chand->waiting_for_resolver_result_closures, and it may not be invoked
1184 // until after this call has been destroyed. We mark the operation as
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001185 // finished, so that when pick_after_resolver_result_done_locked()
Mark D. Roth76e264b2017-08-25 09:03:33 -07001186 // is called, it will be a no-op. We also immediately invoke
Mark D. Rothb2929602017-09-11 09:31:11 -07001187 // async_pick_done_locked() to propagate the error back to the caller.
1188 args->finished = true;
1189 grpc_call_element *elem = args->elem;
1190 channel_data *chand = (channel_data *)elem->channel_data;
1191 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -07001192 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001193 gpr_log(GPR_DEBUG,
1194 "chand=%p calld=%p: cancelling pick waiting for resolver result",
1195 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001196 }
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001197 // Note: Although we are not in the call combiner here, we are
1198 // basically stealing the call combiner from the pending pick, so
Mark D. Rothb2929602017-09-11 09:31:11 -07001199 // it's safe to call async_pick_done_locked() here -- we are
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001200 // essentially calling it here instead of calling it in
1201 // pick_after_resolver_result_done_locked().
Mark D. Rothb2929602017-09-11 09:31:11 -07001202 async_pick_done_locked(exec_ctx, elem,
1203 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1204 "Pick cancelled", &error, 1));
Mark D. Roth76e264b2017-08-25 09:03:33 -07001205}
1206
Mark D. Roth60751fe2017-07-07 12:50:33 -07001207static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
1208 void *arg,
1209 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001210 pick_after_resolver_result_args *args =
1211 (pick_after_resolver_result_args *)arg;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001212 if (args->finished) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001213 /* cancelled, do nothing */
Craig Tiller6014e8a2017-10-16 13:50:29 -07001214 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001215 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
1216 }
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001217 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001218 return;
1219 }
1220 args->finished = true;
1221 grpc_call_element *elem = args->elem;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001222 channel_data *chand = (channel_data *)elem->channel_data;
1223 call_data *calld = (call_data *)elem->call_data;
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001224 if (error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001225 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001226 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
1227 chand, calld);
1228 }
Mark D. Rothb2929602017-09-11 09:31:11 -07001229 async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001230 } else {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001231 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07001232 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
1233 chand, calld);
1234 }
Mark D. Rothb2929602017-09-11 09:31:11 -07001235 if (pick_callback_start_locked(exec_ctx, elem)) {
1236 // Even if the LB policy returns a result synchronously, we have
1237 // already added our polling entity to chand->interested_parties
1238 // in order to wait for the resolver result, so we need to
1239 // remove it here. Therefore, we call async_pick_done_locked()
1240 // instead of pick_done_locked().
1241 async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001242 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001243 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001244}
1245
Mark D. Roth60751fe2017-07-07 12:50:33 -07001246static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
1247 grpc_call_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001248 channel_data *chand = (channel_data *)elem->channel_data;
1249 call_data *calld = (call_data *)elem->call_data;
Craig Tiller6014e8a2017-10-16 13:50:29 -07001250 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001251 gpr_log(GPR_DEBUG,
1252 "chand=%p calld=%p: deferring pick pending resolver result", chand,
1253 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001254 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001255 pick_after_resolver_result_args *args =
1256 (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
1257 args->elem = elem;
1258 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
1259 args, grpc_combiner_scheduler(chand->combiner));
1260 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
1261 &args->closure, GRPC_ERROR_NONE);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001262 grpc_call_combiner_set_notify_on_cancel(
1263 exec_ctx, calld->call_combiner,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001264 GRPC_CLOSURE_INIT(&args->cancel_closure,
1265 pick_after_resolver_result_cancel_locked, args,
Mark D. Roth76e264b2017-08-25 09:03:33 -07001266 grpc_combiner_scheduler(chand->combiner)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07001267}
1268
Mark D. Roth76e264b2017-08-25 09:03:33 -07001269static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg,
Mark D. Rothb2929602017-09-11 09:31:11 -07001270 grpc_error *ignored) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001271 grpc_call_element *elem = (grpc_call_element *)arg;
1272 call_data *calld = (call_data *)elem->call_data;
1273 channel_data *chand = (channel_data *)elem->channel_data;
1274 GPR_ASSERT(calld->connected_subchannel == NULL);
Mark D. Rothb2929602017-09-11 09:31:11 -07001275 if (chand->lb_policy != NULL) {
1276 // We already have an LB policy, so ask it for a pick.
1277 if (pick_callback_start_locked(exec_ctx, elem)) {
1278 // Pick completed synchronously.
1279 pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE);
1280 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001281 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001282 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07001283 // We do not yet have an LB policy, so wait for a resolver result.
1284 if (chand->resolver == NULL) {
1285 pick_done_locked(exec_ctx, elem,
1286 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
1287 return;
1288 }
1289 if (!chand->started_resolving) {
1290 start_resolving_locked(exec_ctx, chand);
1291 }
1292 pick_after_resolver_result_start_locked(exec_ctx, elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001293 }
Mark D. Rothb2929602017-09-11 09:31:11 -07001294 // We need to wait for either a resolver result or for an async result
1295 // from the LB policy. Add the polling entity from call_data to the
1296 // channel_data's interested_parties, so that the I/O of the LB policy
1297 // and resolver can be done under it. The polling entity will be
1298 // removed in async_pick_done_locked().
1299 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1300 chand->interested_parties);
Craig Tillera11bfc82017-02-14 09:56:33 -08001301}
1302
Mark D. Rothde144102017-03-15 10:11:03 -07001303static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001304 grpc_call_element *elem = (grpc_call_element *)arg;
1305 call_data *calld = (call_data *)elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001306 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001307 if (error == GRPC_ERROR_NONE) {
1308 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001309 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001310 } else {
1311 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001312 // decide whether or not to retry. Note that we should only
1313 // record failures whose statuses match the configured retryable
1314 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001315 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001316 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001317 }
1318 }
ncteisen274bbbe2017-06-08 14:57:11 -07001319 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
Mark D. Roth95039b52017-02-24 07:59:45 -08001320 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001321}
1322
Craig Tillere1b51da2017-03-31 15:44:33 -07001323static void cc_start_transport_stream_op_batch(
1324 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001325 grpc_transport_stream_op_batch *batch) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001326 call_data *calld = (call_data *)elem->call_data;
1327 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001328 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001329 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
Mark D. Roth60751fe2017-07-07 12:50:33 -07001330 batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001331 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001332 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
1333 // If we've previously been cancelled, immediately fail any new batches.
1334 if (calld->error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001335 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001336 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
1337 chand, calld, grpc_error_string(calld->error));
1338 }
1339 grpc_transport_stream_op_batch_finish_with_failure(
1340 exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
1341 goto done;
1342 }
1343 if (batch->cancel_stream) {
1344 // Stash a copy of cancel_error in our call data, so that we can use
1345 // it for subsequent operations. This ensures that if the call is
1346 // cancelled before any batches are passed down (e.g., if the deadline
1347 // is in the past when the call starts), we can return the right
1348 // error to the caller when the first batch does get passed down.
1349 GRPC_ERROR_UNREF(calld->error);
1350 calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001351 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001352 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
1353 calld, grpc_error_string(calld->error));
1354 }
1355 // If we have a subchannel call, send the cancellation batch down.
1356 // Otherwise, fail all pending batches.
1357 if (calld->subchannel_call != NULL) {
1358 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
1359 } else {
1360 waiting_for_pick_batches_add(calld, batch);
1361 waiting_for_pick_batches_fail(exec_ctx, elem,
1362 GRPC_ERROR_REF(calld->error));
1363 }
1364 goto done;
1365 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001366 // Intercept on_complete for recv_trailing_metadata so that we can
1367 // check retry throttle status.
Mark D. Roth60751fe2017-07-07 12:50:33 -07001368 if (batch->recv_trailing_metadata) {
1369 GPR_ASSERT(batch->on_complete != NULL);
1370 calld->original_on_complete = batch->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001371 GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
1372 grpc_schedule_on_exec_ctx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001373 batch->on_complete = &calld->on_complete;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001374 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001375 // Check if we've already gotten a subchannel call.
1376 // Note that once we have completed the pick, we do not need to enter
1377 // the channel combiner, which is more efficient (especially for
1378 // streaming calls).
1379 if (calld->subchannel_call != NULL) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001380 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001381 gpr_log(GPR_DEBUG,
1382 "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07001383 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001384 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001385 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001386 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001387 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001388 // We do not yet have a subchannel call.
1389 // Add the batch to the waiting-for-pick list.
1390 waiting_for_pick_batches_add(calld, batch);
1391 // For batches containing a send_initial_metadata op, enter the channel
1392 // combiner to start a pick.
1393 if (batch->send_initial_metadata) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07001394 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07001395 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner",
1396 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001397 }
1398 GRPC_CLOSURE_SCHED(
1399 exec_ctx,
1400 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
1401 elem, grpc_combiner_scheduler(chand->combiner)),
1402 GRPC_ERROR_NONE);
1403 } else {
1404 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07001405 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001406 gpr_log(GPR_DEBUG,
1407 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
1408 calld);
1409 }
1410 GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
1411 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07001412 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001413done:
Craig Tillera0f3abd2017-03-31 15:42:16 -07001414 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001415}
1416
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001417/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001418static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1419 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001420 const grpc_call_element_args *args) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001421 call_data *calld = (call_data *)elem->call_data;
1422 channel_data *chand = (channel_data *)elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001423 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001424 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001425 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07001426 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07001427 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07001428 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07001429 calld->call_combiner = args->call_combiner;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001430 if (chand->deadline_checking_enabled) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07001431 grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
1432 args->call_combiner, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001433 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001434 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001435}
1436
1437/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001438static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1439 grpc_call_element *elem,
1440 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001441 grpc_closure *then_schedule_closure) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001442 call_data *calld = (call_data *)elem->call_data;
1443 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001444 if (chand->deadline_checking_enabled) {
1445 grpc_deadline_state_destroy(exec_ctx, elem);
1446 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001447 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001448 if (calld->method_params != NULL) {
1449 method_parameters_unref(calld->method_params);
1450 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07001451 GRPC_ERROR_UNREF(calld->error);
1452 if (calld->subchannel_call != NULL) {
1453 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001454 then_schedule_closure);
Craig Tillerd426cac2017-03-13 12:30:45 -07001455 then_schedule_closure = NULL;
Mark D. Roth76e264b2017-08-25 09:03:33 -07001456 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001457 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001458 }
Mark D. Roth60751fe2017-07-07 12:50:33 -07001459 GPR_ASSERT(calld->lb_policy == NULL);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001460 GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001461 if (calld->connected_subchannel != NULL) {
1462 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1463 "picked");
1464 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001465 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1466 if (calld->subchannel_call_context[i].value != NULL) {
1467 calld->subchannel_call_context[i].destroy(
1468 calld->subchannel_call_context[i].value);
1469 }
1470 }
ncteisen274bbbe2017-06-08 14:57:11 -07001471 GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001472}
1473
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001474static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1475 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001476 grpc_polling_entity *pollent) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001477 call_data *calld = (call_data *)elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001478 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001479}
1480
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001481/*************************************************************************
1482 * EXPORTED SYMBOLS
1483 */
1484
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001485const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001486 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001487 cc_start_transport_op,
1488 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001489 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001490 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001491 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001492 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001493 cc_init_channel_elem,
1494 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001495 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001496 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001497};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001498
Craig Tiller613dafa2017-02-09 12:00:43 -08001499static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1500 grpc_error *error_ignored) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001501 channel_data *chand = (channel_data *)arg;
Craig Tiller613dafa2017-02-09 12:00:43 -08001502 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001503 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001504 } else {
1505 chand->exit_idle_when_lb_policy_arrives = true;
1506 if (!chand->started_resolving && chand->resolver != NULL) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07001507 start_resolving_locked(exec_ctx, chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08001508 }
1509 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001510 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001511}
1512
Craig Tillera82950e2015-09-22 12:33:20 -07001513grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1514 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001515 channel_data *chand = (channel_data *)elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001516 grpc_connectivity_state out =
1517 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001518 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001519 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07001520 GRPC_CLOSURE_SCHED(
1521 exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -07001522 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001523 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001524 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001525 return out;
1526}
1527
Alexander Polcync3b1f182017-04-18 13:51:36 -07001528typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001529 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001530 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001531 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001532 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001533 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001534 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001535 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001536} external_connectivity_watcher;
1537
Alexander Polcync3b1f182017-04-18 13:51:36 -07001538static external_connectivity_watcher *lookup_external_connectivity_watcher(
1539 channel_data *chand, grpc_closure *on_complete) {
1540 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1541 external_connectivity_watcher *w =
1542 chand->external_connectivity_watcher_list_head;
1543 while (w != NULL && w->on_complete != on_complete) {
1544 w = w->next;
1545 }
1546 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1547 return w;
1548}
1549
1550static void external_connectivity_watcher_list_append(
1551 channel_data *chand, external_connectivity_watcher *w) {
1552 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1553
1554 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1555 GPR_ASSERT(!w->next);
1556 w->next = chand->external_connectivity_watcher_list_head;
1557 chand->external_connectivity_watcher_list_head = w;
1558 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1559}
1560
1561static void external_connectivity_watcher_list_remove(
1562 channel_data *chand, external_connectivity_watcher *too_remove) {
1563 GPR_ASSERT(
1564 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1565 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1566 if (too_remove == chand->external_connectivity_watcher_list_head) {
1567 chand->external_connectivity_watcher_list_head = too_remove->next;
1568 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1569 return;
1570 }
1571 external_connectivity_watcher *w =
1572 chand->external_connectivity_watcher_list_head;
1573 while (w != NULL) {
1574 if (w->next == too_remove) {
1575 w->next = w->next->next;
1576 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1577 return;
1578 }
1579 w = w->next;
1580 }
1581 GPR_UNREACHABLE_CODE(return );
1582}
1583
1584int grpc_client_channel_num_external_connectivity_watchers(
1585 grpc_channel_element *elem) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001586 channel_data *chand = (channel_data *)elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001587 int count = 0;
1588
1589 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1590 external_connectivity_watcher *w =
1591 chand->external_connectivity_watcher_list_head;
1592 while (w != NULL) {
1593 count++;
1594 w = w->next;
1595 }
1596 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1597
1598 return count;
1599}
1600
Craig Tiller1d881fb2015-12-01 07:39:04 -08001601static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001602 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001603 external_connectivity_watcher *w = (external_connectivity_watcher *)arg;
Craig Tiller86c99582015-11-25 15:22:26 -08001604 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001605 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1606 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001607 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1608 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001609 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001610 gpr_free(w);
ncteisen274bbbe2017-06-08 14:57:11 -07001611 GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08001612}
1613
Craig Tillera8610c02017-02-14 10:05:11 -08001614static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1615 grpc_error *error_ignored) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001616 external_connectivity_watcher *w = (external_connectivity_watcher *)arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001617 external_connectivity_watcher *found = NULL;
1618 if (w->state != NULL) {
1619 external_connectivity_watcher_list_append(w->chand, w);
ncteisen274bbbe2017-06-08 14:57:11 -07001620 GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1621 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
Alexander Polcync3b1f182017-04-18 13:51:36 -07001622 grpc_schedule_on_exec_ctx);
1623 grpc_connectivity_state_notify_on_state_change(
1624 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1625 } else {
1626 GPR_ASSERT(w->watcher_timer_init == NULL);
1627 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1628 if (found) {
1629 GPR_ASSERT(found->on_complete == w->on_complete);
1630 grpc_connectivity_state_notify_on_state_change(
1631 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1632 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001633 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1634 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001635 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1636 "external_connectivity_watcher");
1637 gpr_free(w);
1638 }
Craig Tiller86c99582015-11-25 15:22:26 -08001639}
1640
Craig Tillera82950e2015-09-22 12:33:20 -07001641void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001642 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1643 grpc_polling_entity pollent, grpc_connectivity_state *state,
1644 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001645 channel_data *chand = (channel_data *)elem->channel_data;
1646 external_connectivity_watcher *w =
1647 (external_connectivity_watcher *)gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001648 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001649 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001650 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001651 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001652 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001653 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1654 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001655 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1656 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07001657 GRPC_CLOSURE_SCHED(
Craig Tiller613dafa2017-02-09 12:00:43 -08001658 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -07001659 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07001660 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001661 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001662}