blob: 5f06a3467dfe2d0ce6c2ed05039984e8009f61f4 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller9eb0fde2017-03-31 16:59:30 -070034#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070042#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/sync.h>
44#include <grpc/support/useful.h>
45
Craig Tiller9eb0fde2017-03-31 16:59:30 -070046#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
47#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
48#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
49#include "src/core/ext/filters/client_channel/resolver_registry.h"
50#include "src/core/ext/filters/client_channel/retry_throttle.h"
51#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070052#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/channel/channel_args.h"
54#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080055#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070057#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080059#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070060#include "src/core/lib/support/string.h"
61#include "src/core/lib/surface/channel.h"
62#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070063#include "src/core/lib/transport/metadata.h"
64#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070065#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070066#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth26b7be42016-10-24 10:08:07 -070070/*************************************************************************
71 * METHOD-CONFIG TABLE
72 */
73
Mark D. Roth9d480942016-10-19 14:18:05 -070074typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080075 /* zero so it can be default initialized */
76 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070077 WAIT_FOR_READY_FALSE,
78 WAIT_FOR_READY_TRUE
79} wait_for_ready_value;
80
Mark D. Roth95b627b2017-02-24 11:02:58 -080081typedef struct {
82 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070083 gpr_timespec timeout;
84 wait_for_ready_value wait_for_ready;
85} method_parameters;
86
Mark D. Roth722de8d2017-02-27 10:50:44 -080087static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080088 method_parameters *method_params) {
89 gpr_ref(&method_params->refs);
90 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070091}
92
Mark D. Roth95b627b2017-02-24 11:02:58 -080093static void method_parameters_unref(method_parameters *method_params) {
94 if (gpr_unref(&method_params->refs)) {
95 gpr_free(method_params);
96 }
97}
98
Mark D. Roth95b627b2017-02-24 11:02:58 -080099static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
100 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800101}
102
Mark D. Roth95b627b2017-02-24 11:02:58 -0800103static bool parse_wait_for_ready(grpc_json *field,
104 wait_for_ready_value *wait_for_ready) {
105 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
106 return false;
107 }
108 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
109 : WAIT_FOR_READY_FALSE;
110 return true;
111}
112
Mark D. Roth722de8d2017-02-27 10:50:44 -0800113static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800114 if (field->type != GRPC_JSON_STRING) return false;
115 size_t len = strlen(field->value);
116 if (field->value[len - 1] != 's') return false;
117 char *buf = gpr_strdup(field->value);
118 buf[len - 1] = '\0'; // Remove trailing 's'.
119 char *decimal_point = strchr(buf, '.');
120 if (decimal_point != NULL) {
121 *decimal_point = '\0';
122 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
123 if (timeout->tv_nsec == -1) {
124 gpr_free(buf);
125 return false;
126 }
127 // There should always be exactly 3, 6, or 9 fractional digits.
128 int multiplier = 1;
129 switch (strlen(decimal_point + 1)) {
130 case 9:
131 break;
132 case 6:
133 multiplier *= 1000;
134 break;
135 case 3:
136 multiplier *= 1000000;
137 break;
138 default: // Unsupported number of digits.
139 gpr_free(buf);
140 return false;
141 }
142 timeout->tv_nsec *= multiplier;
143 }
144 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
145 gpr_free(buf);
146 if (timeout->tv_sec == -1) return false;
147 return true;
148}
149
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700150static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700151 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700152 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
153 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700154 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800155 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700156 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800157 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700158 } else if (strcmp(field->key, "timeout") == 0) {
159 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800160 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700161 }
162 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700163 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800164 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700165 value->timeout = timeout;
166 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700167 return value;
168}
169
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700170/*************************************************************************
171 * CHANNEL-WIDE FUNCTIONS
172 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800173
Craig Tiller800dacb2015-10-06 09:10:26 -0700174typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700175 /** resolver for this channel */
176 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700177 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700178 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700179 /** is deadline checking enabled? */
180 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700181 /** client channel factory */
182 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700183
Craig Tillerbefafe62017-02-09 11:30:54 -0800184 /** combiner protecting all variables below in this data structure */
185 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700186 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700187 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800188 /** retry throttle data */
189 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700190 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800191 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700192 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700193 grpc_channel_args *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -0700194 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -0700195 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700196 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700197 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700198 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700199 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700200 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700201 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800202 /** owning stack */
203 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800204 /** interested parties (owned) */
205 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800206
207 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800208 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800209 gpr_mu info_mu;
210 char *info_lb_policy_name;
211 /** service config in JSON form */
212 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213} channel_data;
214
Craig Tillerd6c98df2015-08-18 09:33:44 -0700215/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700216 resolver, to watch for state changes from the lb_policy. When a state
217 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700218typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700219 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700220 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700221 grpc_connectivity_state state;
222 grpc_lb_policy *lb_policy;
223} lb_policy_connectivity_watcher;
224
Craig Tiller2400bf52017-02-09 16:25:19 -0800225static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
226 grpc_lb_policy *lb_policy,
227 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700228
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800229static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
230 channel_data *chand,
231 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700232 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800233 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700234 /* TODO: Improve failure handling:
235 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
236 * - Hand over pending picks from old policies during the switch that happens
237 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700238 if (chand->lb_policy != NULL) {
239 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
240 /* cancel picks with wait_for_ready=false */
241 grpc_lb_policy_cancel_picks_locked(
242 exec_ctx, chand->lb_policy,
243 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
244 /* check= */ 0, GRPC_ERROR_REF(error));
245 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
246 /* cancel all picks */
247 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
248 /* mask= */ 0, /* check= */ 0,
249 GRPC_ERROR_REF(error));
250 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800251 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700252 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
253 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800254}
255
Craig Tiller804ff712016-05-05 16:25:40 -0700256static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800257 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700258 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700259 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800260 /* check if the notification is for the latest policy */
261 if (w->lb_policy == w->chand->lb_policy) {
262 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
263 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800264 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800265 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
266 w->chand->lb_policy = NULL;
267 }
268 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
269 GRPC_ERROR_REF(error), "lb_changed");
270 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800271 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800272 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800273 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700274
Craig Tiller906e3bc2015-11-24 07:31:31 -0800275 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700276 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700277}
278
Craig Tiller2400bf52017-02-09 16:25:19 -0800279static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
280 grpc_lb_policy *lb_policy,
281 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700282 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800283 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700284
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700285 w->chand = chand;
Craig Tillerbefafe62017-02-09 11:30:54 -0800286 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
287 grpc_combiner_scheduler(chand->combiner, false));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700288 w->state = current_state;
289 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800290 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
291 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700292}
293
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800294typedef struct {
295 char *server_name;
296 grpc_server_retry_throttle_data *retry_throttle_data;
297} service_config_parsing_state;
298
299static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
300 service_config_parsing_state *parsing_state = arg;
301 if (strcmp(field->key, "retryThrottling") == 0) {
302 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
303 if (field->type != GRPC_JSON_OBJECT) return;
304 int max_milli_tokens = 0;
305 int milli_token_ratio = 0;
306 for (grpc_json *sub_field = field->child; sub_field != NULL;
307 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800308 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800309 if (strcmp(sub_field->key, "maxTokens") == 0) {
310 if (max_milli_tokens != 0) return; // Duplicate.
311 if (sub_field->type != GRPC_JSON_NUMBER) return;
312 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
313 if (max_milli_tokens == -1) return;
314 max_milli_tokens *= 1000;
315 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
316 if (milli_token_ratio != 0) return; // Duplicate.
317 if (sub_field->type != GRPC_JSON_NUMBER) return;
318 // We support up to 3 decimal digits.
319 size_t whole_len = strlen(sub_field->value);
320 uint32_t multiplier = 1;
321 uint32_t decimal_value = 0;
322 const char *decimal_point = strchr(sub_field->value, '.');
323 if (decimal_point != NULL) {
324 whole_len = (size_t)(decimal_point - sub_field->value);
325 multiplier = 1000;
326 size_t decimal_len = strlen(decimal_point + 1);
327 if (decimal_len > 3) decimal_len = 3;
328 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
329 &decimal_value)) {
330 return;
331 }
332 uint32_t decimal_multiplier = 1;
333 for (size_t i = 0; i < (3 - decimal_len); ++i) {
334 decimal_multiplier *= 10;
335 }
336 decimal_value *= decimal_multiplier;
337 }
338 uint32_t whole_value;
339 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
340 &whole_value)) {
341 return;
342 }
343 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800344 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800345 }
346 }
347 parsing_state->retry_throttle_data =
348 grpc_retry_throttle_map_get_data_for_server(
349 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
350 }
351}
352
David Garcia Quintas956f7002017-04-13 15:40:06 -0700353// Wrap a closure associated with \a lb_policy. The associated callback (\a
354// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
355// scheduling \a wrapped_closure.
356typedef struct wrapped_on_pick_closure_arg {
357 /* the closure instance using this struct as argument */
358 grpc_closure wrapper_closure;
359
360 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
361 * calls against the internal RR instance, respectively. */
362 grpc_closure *wrapped_closure;
363
364 /* The policy instance related to the closure */
365 grpc_lb_policy *lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -0700366} wrapped_on_pick_closure_arg;
367
David Garcia Quintas37251282017-04-14 13:46:03 -0700368// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
David Garcia Quintas956f7002017-04-13 15:40:06 -0700369static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
370 grpc_error *error) {
371 wrapped_on_pick_closure_arg *wc_arg = arg;
David Garcia Quintas37251282017-04-14 13:46:03 -0700372 GPR_ASSERT(wc_arg != NULL);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700373 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
374 GPR_ASSERT(wc_arg->lb_policy != NULL);
David Garcia Quintas37251282017-04-14 13:46:03 -0700375 grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700376 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -0700377 gpr_free(wc_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700378}
379
Craig Tillerbefafe62017-02-09 11:30:54 -0800380static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
381 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700382 channel_data *chand = arg;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700383 char *lb_policy_name = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700384 grpc_lb_policy *lb_policy = NULL;
385 grpc_lb_policy *old_lb_policy;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800386 grpc_slice_hash_table *method_params_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700387 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700388 bool exit_idle = false;
ncteisen4b36a3d2017-03-13 19:08:06 -0700389 grpc_error *state_error =
390 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800391 char *service_config_json = NULL;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800392 service_config_parsing_state parsing_state;
393 memset(&parsing_state, 0, sizeof(parsing_state));
Craig Tiller3f475422015-06-25 10:43:05 -0700394
Mark D. Roth046cf762016-09-26 11:13:51 -0700395 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700396 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700397 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700398 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700399 if (channel_arg != NULL) {
400 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
401 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700402 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700403 // Special case: If at least one balancer address is present, we use
404 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700405 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700406 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700407 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700408 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700409 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700410 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700411 if (addresses->addresses[i].is_balancer) {
412 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700413 break;
414 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700415 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700416 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700417 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
418 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700419 "resolver requested LB policy %s but provided at least one "
420 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700421 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700422 }
423 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700424 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700425 }
426 // Use pick_first if nothing was specified and we didn't select grpclb
427 // above.
428 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700429 // Instantiate LB policy.
430 grpc_lb_policy_args lb_policy_args;
431 lb_policy_args.args = chand->resolver_result;
432 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800433 lb_policy_args.combiner = chand->combiner;
Mark D. Roth88405f72016-10-03 08:24:52 -0700434 lb_policy =
435 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Craig Tillera82950e2015-09-22 12:33:20 -0700436 if (lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700437 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tillerf707d622016-05-06 14:26:12 -0700438 GRPC_ERROR_UNREF(state_error);
Craig Tiller2400bf52017-02-09 16:25:19 -0800439 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
440 &state_error);
Craig Tiller45724b32015-09-22 10:42:19 -0700441 }
Mark D. Roth41124992016-11-03 11:22:20 -0700442 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700443 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700444 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700445 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700446 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800447 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700448 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800449 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700450 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800451 channel_arg =
452 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
453 GPR_ASSERT(channel_arg != NULL);
454 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700455 grpc_uri *uri =
456 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800457 GPR_ASSERT(uri->path[0] != '\0');
458 parsing_state.server_name =
459 uri->path[0] == '/' ? uri->path + 1 : uri->path;
460 grpc_service_config_parse_global_params(
461 service_config, parse_retry_throttle_params, &parsing_state);
462 parsing_state.server_name = NULL;
463 grpc_uri_destroy(uri);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700464 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800465 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700466 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700467 grpc_service_config_destroy(service_config);
468 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700469 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700470 // Before we clean up, save a copy of lb_policy_name, since it might
471 // be pointing to data inside chand->resolver_result.
472 // The copy will be saved in chand->lb_policy_name below.
473 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800474 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700475 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700476 }
477
Craig Tiller86c99582015-11-25 15:22:26 -0800478 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800479 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
480 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800481 }
482
Craig Tiller613dafa2017-02-09 12:00:43 -0800483 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700484 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800485 gpr_free(chand->info_lb_policy_name);
486 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700487 }
Craig Tiller3f475422015-06-25 10:43:05 -0700488 old_lb_policy = chand->lb_policy;
489 chand->lb_policy = lb_policy;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800490 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800491 gpr_free(chand->info_service_config_json);
492 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800493 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800494 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800495
496 if (chand->retry_throttle_data != NULL) {
497 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
498 }
499 chand->retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700500 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800501 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700502 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700503 chand->method_params_table = method_params_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700504 if (lb_policy != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800505 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller0ede5452016-04-23 12:21:45 -0700506 } else if (chand->resolver == NULL /* disconnected */) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700507 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
508 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
509 "Channel disconnected", &error, 1));
Craig Tiller91031da2016-12-28 15:44:25 -0800510 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tillera82950e2015-09-22 12:33:20 -0700511 }
512 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
513 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700514 exit_idle = true;
515 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700516 }
Craig Tiller98465032015-06-29 14:36:42 -0700517
Craig Tiller804ff712016-05-05 16:25:40 -0700518 if (error == GRPC_ERROR_NONE && chand->resolver) {
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700519 set_channel_connectivity_state_locked(
520 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Craig Tillera82950e2015-09-22 12:33:20 -0700521 if (lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800522 watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700523 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800524 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -0800525 grpc_resolver_next_locked(exec_ctx, chand->resolver,
526 &chand->resolver_result,
527 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700528 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800529 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800530 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800531 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
532 chand->resolver = NULL;
533 }
Craig Tiller804ff712016-05-05 16:25:40 -0700534 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800535 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700536 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700537 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
538 "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)),
Craig Tiller804ff712016-05-05 16:25:40 -0700539 "resolver_gone");
Craig Tillera82950e2015-09-22 12:33:20 -0700540 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700541
Craig Tillera82950e2015-09-22 12:33:20 -0700542 if (exit_idle) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800543 grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
Craig Tillera82950e2015-09-22 12:33:20 -0700544 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
545 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700546
Craig Tillera82950e2015-09-22 12:33:20 -0700547 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800548 grpc_pollset_set_del_pollset_set(
549 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700550 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
551 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700552
Craig Tillera82950e2015-09-22 12:33:20 -0700553 if (lb_policy != NULL) {
554 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
555 }
Craig Tiller45724b32015-09-22 10:42:19 -0700556
Craig Tiller906e3bc2015-11-24 07:31:31 -0800557 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700558 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700559}
560
Craig Tillera8610c02017-02-14 10:05:11 -0800561static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
562 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800563 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800564 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700565 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700566
Craig Tillera82950e2015-09-22 12:33:20 -0700567 if (op->on_connectivity_state_change != NULL) {
568 grpc_connectivity_state_notify_on_state_change(
569 exec_ctx, &chand->state_tracker, op->connectivity_state,
570 op->on_connectivity_state_change);
571 op->on_connectivity_state_change = NULL;
572 op->connectivity_state = NULL;
573 }
574
Craig Tiller26dab312015-12-07 14:43:47 -0800575 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800576 if (chand->lb_policy == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700577 grpc_closure_sched(
578 exec_ctx, op->send_ping,
579 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800580 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800581 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800582 op->bind_pollset = NULL;
583 }
584 op->send_ping = NULL;
585 }
586
Craig Tiller1c51edc2016-05-07 16:18:43 -0700587 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
588 if (chand->resolver != NULL) {
589 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700590 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700591 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800592 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700593 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
594 chand->resolver = NULL;
595 if (!chand->started_resolving) {
596 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
597 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller91031da2016-12-28 15:44:25 -0800598 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700599 }
600 if (chand->lb_policy != NULL) {
601 grpc_pollset_set_del_pollset_set(exec_ctx,
602 chand->lb_policy->interested_parties,
603 chand->interested_parties);
604 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
605 chand->lb_policy = NULL;
606 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700607 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700608 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700609 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800610 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
611
612 grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800613}
614
615static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
616 grpc_channel_element *elem,
617 grpc_transport_op *op) {
618 channel_data *chand = elem->channel_data;
619
Craig Tillerbefafe62017-02-09 11:30:54 -0800620 GPR_ASSERT(op->set_accept_stream == false);
621 if (op->bind_pollset != NULL) {
622 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
623 op->bind_pollset);
624 }
625
Craig Tillerc55c1022017-03-10 10:26:42 -0800626 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800627 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
Craig Tillerbefafe62017-02-09 11:30:54 -0800628 grpc_closure_sched(
Craig Tillerc55c1022017-03-10 10:26:42 -0800629 exec_ctx,
630 grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
631 op, grpc_combiner_scheduler(chand->combiner, false)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800632 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700633}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800634
Mark D. Rothb2d24882016-10-27 15:44:07 -0700635static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
636 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700637 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700638 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800639 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700640 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800641 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700642 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800643 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700644 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800645 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800646 *info->service_config_json =
647 chand->info_service_config_json == NULL
648 ? NULL
649 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800650 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800651 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700652}
653
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700654/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800655static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800656 grpc_channel_element *elem,
657 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700658 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700659 GPR_ASSERT(args->is_last);
660 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800661 // Initialize data members.
Craig Tillerbefafe62017-02-09 11:30:54 -0800662 chand->combiner = grpc_combiner_create(NULL);
Craig Tillerd85477512017-02-09 12:02:39 -0800663 gpr_mu_init(&chand->info_mu);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800664 chand->owning_stack = args->channel_stack;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700665 grpc_closure_init(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800666 on_resolver_result_changed_locked, chand,
667 grpc_combiner_scheduler(chand->combiner, false));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800668 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700669 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
670 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800671 // Record client channel factory.
672 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
673 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700674 if (arg == NULL) {
675 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
676 "Missing client channel factory in args for client channel filter");
677 }
678 if (arg->type != GRPC_ARG_POINTER) {
679 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
680 "client channel factory arg must be a pointer");
681 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800682 grpc_client_channel_factory_ref(arg->value.pointer.p);
683 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800684 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800685 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700686 if (arg == NULL) {
687 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
688 "Missing server uri in args for client channel filter");
689 }
690 if (arg->type != GRPC_ARG_STRING) {
691 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
692 "server uri arg must be a string");
693 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800694 char *proxy_name = NULL;
695 grpc_channel_args *new_args = NULL;
696 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
697 &proxy_name, &new_args);
698 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800699 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800700 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
701 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800702 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800703 if (proxy_name != NULL) gpr_free(proxy_name);
704 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800705 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700706 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800707 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700708 chand->deadline_checking_enabled =
709 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800710 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700711}
712
Craig Tiller972470b2017-02-09 15:05:36 -0800713static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
714 grpc_error *error) {
715 grpc_resolver *resolver = arg;
716 grpc_resolver_shutdown_locked(exec_ctx, resolver);
717 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
718}
719
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700720/* Destructor for channel_data */
721static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
722 grpc_channel_element *elem) {
723 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700724 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800725 grpc_closure_sched(
726 exec_ctx,
727 grpc_closure_create(shutdown_resolver_locked, chand->resolver,
728 grpc_combiner_scheduler(chand->combiner, false)),
729 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700730 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700731 if (chand->client_channel_factory != NULL) {
732 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
733 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734 if (chand->lb_policy != NULL) {
735 grpc_pollset_set_del_pollset_set(exec_ctx,
736 chand->lb_policy->interested_parties,
737 chand->interested_parties);
738 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
739 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800740 gpr_free(chand->info_lb_policy_name);
741 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800742 if (chand->retry_throttle_data != NULL) {
743 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
744 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700745 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800746 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700747 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700748 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800749 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800750 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800751 gpr_mu_destroy(&chand->info_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700752}
753
754/*************************************************************************
755 * PER-CALL FUNCTIONS
756 */
757
758#define GET_CALL(call_data) \
759 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
760
761#define CANCELLED_CALL ((grpc_subchannel_call *)1)
762
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700763/** Call data. Holds a pointer to grpc_subchannel_call and the
764 associated machinery to create such a pointer.
765 Handles queueing of stream ops until a call object is ready, waiting
766 for initial metadata before trying to create a call object,
767 and handling cancellation gracefully. */
768typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700769 // State for handling deadlines.
770 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700771 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
772 // and this struct both independently store a pointer to the call
773 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700774 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700775 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700776
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800777 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700778 gpr_timespec call_start_time;
779 gpr_timespec deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700780 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800781 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700782
Mark D. Rothf28763c2016-09-14 15:18:40 -0700783 grpc_error *cancel_error;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700784
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700785 /** either 0 for no call, 1 for cancelled, or a pointer to a
786 grpc_subchannel_call */
787 gpr_atm subchannel_call;
Craig Tillerd426cac2017-03-13 12:30:45 -0700788 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700789
Mark D. Roth64a317c2017-05-02 08:27:08 -0700790 bool pick_pending;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700791 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700792 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700793 grpc_polling_entity *pollent;
794
Craig Tillera0f3abd2017-03-31 15:42:16 -0700795 grpc_transport_stream_op_batch **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700796 size_t waiting_ops_count;
797 size_t waiting_ops_capacity;
798
799 grpc_closure next_step;
800
801 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200802
803 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800804
805 grpc_closure on_complete;
806 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700807} call_data;
808
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800809grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
810 grpc_call_element *call_elem) {
811 grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
812 return scc == CANCELLED_CALL ? NULL : scc;
813}
814
Craig Tillere1b51da2017-03-31 15:44:33 -0700815static void add_waiting_locked(call_data *calld,
816 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700817 GPR_TIMER_BEGIN("add_waiting_locked", 0);
818 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
819 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
820 calld->waiting_ops =
821 gpr_realloc(calld->waiting_ops,
822 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
823 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700824 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700825 GPR_TIMER_END("add_waiting_locked", 0);
826}
827
828static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
829 grpc_error *error) {
830 size_t i;
831 for (i = 0; i < calld->waiting_ops_count; i++) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700832 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700833 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700834 }
835 calld->waiting_ops_count = 0;
836 GRPC_ERROR_UNREF(error);
837}
838
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700839static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700840 if (calld->waiting_ops_count == 0) {
841 return;
842 }
843
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800844 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -0700845 grpc_transport_stream_op_batch **ops = calld->waiting_ops;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800846 size_t nops = calld->waiting_ops_count;
847 if (call == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700848 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
849 return;
850 }
851 calld->waiting_ops = NULL;
852 calld->waiting_ops_count = 0;
853 calld->waiting_ops_capacity = 0;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800854 for (size_t i = 0; i < nops; i++) {
855 grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
856 }
Craig Tiller9efea882017-02-09 13:06:52 -0800857 gpr_free(ops);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700858}
859
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700860// Sets calld->method_params and calld->retry_throttle_data.
Craig Tiller11c17d42017-03-13 13:36:34 -0700861// If the method params specify a timeout, populates
862// *per_method_deadline and returns true.
863static bool set_call_method_params_from_service_config_locked(
864 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
865 gpr_timespec *per_method_deadline) {
866 channel_data *chand = elem->channel_data;
867 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700868 if (chand->retry_throttle_data != NULL) {
869 calld->retry_throttle_data =
870 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
871 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700872 if (chand->method_params_table != NULL) {
873 calld->method_params = grpc_method_config_table_get(
874 exec_ctx, chand->method_params_table, calld->path);
875 if (calld->method_params != NULL) {
876 method_parameters_ref(calld->method_params);
877 if (gpr_time_cmp(calld->method_params->timeout,
878 gpr_time_0(GPR_TIMESPAN)) != 0) {
879 *per_method_deadline =
880 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
881 return true;
882 }
883 }
884 }
885 return false;
886}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700887
Craig Tiller11c17d42017-03-13 13:36:34 -0700888static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
889 grpc_call_element *elem) {
890 /* apply service-config level configuration to the call (now that we're
891 * certain it exists) */
892 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700893 channel_data *chand = elem->channel_data;
Craig Tiller11c17d42017-03-13 13:36:34 -0700894 gpr_timespec per_method_deadline;
895 if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
896 &per_method_deadline)) {
897 // If the deadline from the service config is shorter than the one
898 // from the client API, reset the deadline timer.
Craig Tiller3be7dd02017-04-03 14:30:03 -0700899 if (chand->deadline_checking_enabled &&
900 gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700901 calld->deadline = per_method_deadline;
902 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
903 }
904 }
905}
906
Craig Tillerbefafe62017-02-09 11:30:54 -0800907static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
908 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700909 grpc_call_element *elem = arg;
910 call_data *calld = elem->call_data;
911 channel_data *chand = elem->channel_data;
Mark D. Roth64a317c2017-05-02 08:27:08 -0700912 GPR_ASSERT(calld->pick_pending);
913 calld->pick_pending = false;
Yuchen Zeng19656b12016-09-01 18:00:45 -0700914 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
915 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700916 if (calld->connected_subchannel == NULL) {
917 gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
ncteisen4b36a3d2017-03-13 19:08:06 -0700918 fail_locked(exec_ctx, calld,
919 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
920 "Failed to create subchannel", &error, 1));
Mark D. Roth72f6da82016-09-02 13:42:38 -0700921 } else if (GET_CALL(calld) == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700922 /* already cancelled before subchannel became ready */
ncteisen4b36a3d2017-03-13 19:08:06 -0700923 grpc_error *cancellation_error =
924 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
925 "Cancelled before creating subchannel", &error, 1);
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800926 /* if due to deadline, attach the deadline exceeded status to the error */
927 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
928 cancellation_error =
929 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
930 GRPC_STATUS_DEADLINE_EXCEEDED);
931 }
932 fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700933 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700934 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700935 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -0700936 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -0700937 .pollent = calld->pollent,
938 .path = calld->path,
939 .start_time = calld->call_start_time,
940 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700941 .arena = calld->arena,
942 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700943 grpc_error *new_error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -0700944 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700945 gpr_atm_rel_store(&calld->subchannel_call,
946 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth61a63982017-04-05 09:56:12 -0700947 if (new_error != GRPC_ERROR_NONE) {
948 new_error = grpc_error_add_child(new_error, error);
949 fail_locked(exec_ctx, calld, new_error);
950 } else {
951 retry_waiting_locked(exec_ctx, calld);
952 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700953 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700954 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
955}
956
957static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
958 call_data *calld = elem->call_data;
959 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
960 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
961 return NULL;
962 } else {
963 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
964 }
965}
966
Craig Tiller577c9b22015-11-02 14:11:15 -0800967typedef struct {
968 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800969 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -0800970 grpc_connected_subchannel **connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700971 grpc_call_context_element *subchannel_call_context;
Craig Tiller577c9b22015-11-02 14:11:15 -0800972 grpc_closure *on_ready;
973 grpc_call_element *elem;
974 grpc_closure closure;
975} continue_picking_args;
976
Yuchen Zeng144ce652016-09-01 18:19:34 -0700977/** Return true if subchannel is available immediately (in which case on_ready
978 should not be called), or false otherwise (in which case on_ready should be
979 called when the subchannel is available). */
Craig Tillerbefafe62017-02-09 11:30:54 -0800980static bool pick_subchannel_locked(
981 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
982 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700983 grpc_connected_subchannel **connected_subchannel,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -0700984 grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready);
Craig Tiller577c9b22015-11-02 14:11:15 -0800985
Craig Tillerbefafe62017-02-09 11:30:54 -0800986static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
987 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800988 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -0700989 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800990 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700991 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller91031da2016-12-28 15:44:25 -0800992 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700993 } else {
Mark D. Roth09e458c2017-05-02 08:13:26 -0700994 if (pick_subchannel_locked(
995 exec_ctx, cpa->elem, cpa->initial_metadata,
996 cpa->initial_metadata_flags, cpa->connected_subchannel,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -0700997 cpa->subchannel_call_context, cpa->on_ready)) {
Craig Tiller91031da2016-12-28 15:44:25 -0800998 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700999 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001000 }
1001 gpr_free(cpa);
1002}
1003
Mark D. Roth64a317c2017-05-02 08:27:08 -07001004static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1005 grpc_error *error) {
1006 channel_data *chand = elem->channel_data;
1007 call_data *calld = elem->call_data;
1008 if (chand->lb_policy != NULL) {
1009 grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
1010 &calld->connected_subchannel,
1011 GRPC_ERROR_REF(error));
1012 }
1013 for (grpc_closure *closure = chand->waiting_for_config_closures.head;
1014 closure != NULL; closure = closure->next_data.next) {
1015 continue_picking_args *cpa = closure->cb_arg;
1016 if (cpa->connected_subchannel == &calld->connected_subchannel) {
1017 cpa->connected_subchannel = NULL;
1018 grpc_closure_sched(exec_ctx, cpa->on_ready,
1019 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1020 "Pick cancelled", &error, 1));
1021 }
1022 }
1023 GRPC_ERROR_UNREF(error);
1024}
1025
Craig Tillerbefafe62017-02-09 11:30:54 -08001026static bool pick_subchannel_locked(
1027 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1028 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001029 grpc_connected_subchannel **connected_subchannel,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -07001030 grpc_call_context_element *subchannel_call_context,
1031 grpc_closure *on_ready) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001032 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001033
Craig Tiller577c9b22015-11-02 14:11:15 -08001034 channel_data *chand = elem->channel_data;
1035 call_data *calld = elem->call_data;
Craig Tiller577c9b22015-11-02 14:11:15 -08001036
Craig Tillerb5585d42015-11-17 07:18:31 -08001037 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -08001038
Craig Tiller577c9b22015-11-02 14:11:15 -08001039 if (chand->lb_policy != NULL) {
Craig Tiller2e0788a2017-03-14 06:55:44 -07001040 apply_final_configuration_locked(exec_ctx, elem);
Craig Tiller86c0f8a2015-12-01 20:05:40 -08001041 grpc_lb_policy *lb_policy = chand->lb_policy;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001042 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothe40dd292016-10-05 14:58:37 -07001043 // If the application explicitly set wait_for_ready, use that.
1044 // Otherwise, if the service config specified a value for this
1045 // method, use that.
Mark D. Rothc1c38582016-10-11 11:03:27 -07001046 const bool wait_for_ready_set_from_api =
1047 initial_metadata_flags &
1048 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1049 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001050 calld->method_params != NULL &&
1051 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001052 if (!wait_for_ready_set_from_api &&
1053 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001054 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001055 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1056 } else {
1057 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1058 }
1059 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001060 const grpc_lb_policy_pick_args inputs = {
Yuchen Zengac8bc422016-10-05 14:00:02 -07001061 initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
1062 gpr_inf_future(GPR_CLOCK_MONOTONIC)};
David Garcia Quintas956f7002017-04-13 15:40:06 -07001063
1064 // Wrap the user-provided callback in order to hold a strong reference to
1065 // the LB policy for the duration of the pick.
1066 wrapped_on_pick_closure_arg *w_on_pick_arg =
1067 gpr_zalloc(sizeof(*w_on_pick_arg));
1068 grpc_closure_init(&w_on_pick_arg->wrapper_closure,
1069 wrapped_on_pick_closure_cb, w_on_pick_arg,
1070 grpc_schedule_on_exec_ctx);
1071 w_on_pick_arg->wrapped_closure = on_ready;
1072 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
1073 w_on_pick_arg->lb_policy = lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -07001074 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Roth09e458c2017-05-02 08:13:26 -07001075 exec_ctx, lb_policy, &inputs, connected_subchannel,
1076 subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001077 if (pick_done) {
1078 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
1079 GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
1080 "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -07001081 gpr_free(w_on_pick_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001082 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001083 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
1084 GPR_TIMER_END("pick_subchannel", 0);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001085 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001086 }
1087 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001088 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -08001089 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -08001090 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1091 &chand->resolver_result,
1092 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -08001093 }
Craig Tiller0eab6972016-04-23 12:59:57 -07001094 if (chand->resolver != NULL) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001095 continue_picking_args *cpa = gpr_malloc(sizeof(*cpa));
Craig Tiller0eab6972016-04-23 12:59:57 -07001096 cpa->initial_metadata = initial_metadata;
1097 cpa->initial_metadata_flags = initial_metadata_flags;
1098 cpa->connected_subchannel = connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001099 cpa->subchannel_call_context = subchannel_call_context;
Craig Tiller0eab6972016-04-23 12:59:57 -07001100 cpa->on_ready = on_ready;
1101 cpa->elem = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001102 grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
1103 grpc_combiner_scheduler(chand->combiner, true));
Craig Tiller804ff712016-05-05 16:25:40 -07001104 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
1105 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -07001106 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -07001107 grpc_closure_sched(exec_ctx, on_ready,
1108 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001109 }
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001110
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001111 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001112 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001113}
1114
Craig Tillere1b51da2017-03-31 15:44:33 -07001115static void start_transport_stream_op_batch_locked_inner(
1116 grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
1117 grpc_call_element *elem) {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001118 channel_data *chand = elem->channel_data;
Craig Tillera11bfc82017-02-14 09:56:33 -08001119 call_data *calld = elem->call_data;
Craig Tillerbefafe62017-02-09 11:30:54 -08001120 grpc_subchannel_call *call;
1121
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001122 /* need to recheck that another thread hasn't set the call */
1123 call = GET_CALL(calld);
1124 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001125 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Rothf28763c2016-09-14 15:18:40 -07001126 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001127 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001128 return;
1129 }
1130 if (call != NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001131 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001132 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001133 return;
1134 }
1135 /* if this is a cancellation, then we can raise our cancelled flag */
Craig Tillerc55c1022017-03-10 10:26:42 -08001136 if (op->cancel_stream) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001137 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
1138 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tillera11bfc82017-02-14 09:56:33 -08001139 /* recurse to retry */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001140 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001141 /* early out */
1142 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001143 } else {
Craig Tillerbe9691a2017-02-14 10:00:42 -08001144 /* Stash a copy of cancel_error in our call data, so that we can use
1145 it for subsequent operations. This ensures that if the call is
1146 cancelled before any ops are passed down (e.g., if the deadline
1147 is in the past when the call starts), we can return the right
1148 error to the caller when the first op does get passed down. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001149 calld->cancel_error =
1150 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001151 if (calld->pick_pending) {
1152 cancel_pick_locked(
1153 exec_ctx, elem,
1154 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
1155 } else {
1156 fail_locked(exec_ctx, calld,
1157 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001158 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001159 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tillerc55c1022017-03-10 10:26:42 -08001160 exec_ctx, op,
1161 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001162 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001163 return;
1164 }
1165 }
1166 /* if we don't have a subchannel, try to get one */
Mark D. Roth64a317c2017-05-02 08:27:08 -07001167 if (!calld->pick_pending && calld->connected_subchannel == NULL &&
1168 op->send_initial_metadata) {
1169 calld->pick_pending = true;
Craig Tillerbefafe62017-02-09 11:30:54 -08001170 grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
1171 grpc_combiner_scheduler(chand->combiner, true));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001172 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001173 /* If a subchannel is not available immediately, the polling entity from
1174 call_data should be provided to channel_data's interested_parties, so
1175 that IO of the lb_policy and resolver could be done under it. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001176 if (pick_subchannel_locked(
1177 exec_ctx, elem,
1178 op->payload->send_initial_metadata.send_initial_metadata,
1179 op->payload->send_initial_metadata.send_initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001180 &calld->connected_subchannel, calld->subchannel_call_context,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -07001181 &calld->next_step)) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001182 calld->pick_pending = false;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001183 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Yuchen Zeng19656b12016-09-01 18:00:45 -07001184 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001185 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1186 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001187 }
1188 }
1189 /* if we've got a subchannel, then let's ask it to create a call */
Mark D. Roth64a317c2017-05-02 08:27:08 -07001190 if (!calld->pick_pending && calld->connected_subchannel != NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001191 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -07001192 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -07001193 .pollent = calld->pollent,
1194 .path = calld->path,
1195 .start_time = calld->call_start_time,
1196 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001197 .arena = calld->arena,
1198 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001199 grpc_error *error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -07001200 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001201 gpr_atm_rel_store(&calld->subchannel_call,
1202 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001203 if (error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001204 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001205 grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
Mark D. Rothca136032017-04-04 13:53:29 -07001206 } else {
Mark D. Rothca136032017-04-04 13:53:29 -07001207 retry_waiting_locked(exec_ctx, calld);
1208 /* recurse to retry */
1209 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001210 }
Craig Tillera11bfc82017-02-14 09:56:33 -08001211 /* early out */
1212 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001213 }
1214 /* nothing to be done but wait */
1215 add_waiting_locked(calld, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001216}
1217
Mark D. Rothde144102017-03-15 10:11:03 -07001218static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001219 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001220 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001221 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001222 if (error == GRPC_ERROR_NONE) {
1223 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001224 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001225 } else {
1226 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001227 // decide whether or not to retry. Note that we should only
1228 // record failures whose statuses match the configured retryable
1229 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001230 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001231 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001232 }
1233 }
Mark D. Roth95039b52017-02-24 07:59:45 -08001234 grpc_closure_run(exec_ctx, calld->original_on_complete,
1235 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001236}
1237
Craig Tillere1b51da2017-03-31 15:44:33 -07001238static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1239 void *arg,
1240 grpc_error *error_ignored) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001241 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001242
Craig Tillera0f3abd2017-03-31 15:42:16 -07001243 grpc_transport_stream_op_batch *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -08001244 grpc_call_element *elem = op->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001245 call_data *calld = elem->call_data;
1246
Craig Tillerf6cb0c02017-03-28 14:11:49 -07001247 if (op->recv_trailing_metadata) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001248 GPR_ASSERT(op->on_complete != NULL);
1249 calld->original_on_complete = op->on_complete;
Mark D. Rothde144102017-03-15 10:11:03 -07001250 grpc_closure_init(&calld->on_complete, on_complete, elem,
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001251 grpc_schedule_on_exec_ctx);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001252 op->on_complete = &calld->on_complete;
1253 }
1254
Craig Tillera0f3abd2017-03-31 15:42:16 -07001255 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001256
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001257 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001258 "start_transport_stream_op_batch");
1259 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillerbefafe62017-02-09 11:30:54 -08001260}
1261
Craig Tillerbe9691a2017-02-14 10:00:42 -08001262/* The logic here is fairly complicated, due to (a) the fact that we
1263 need to handle the case where we receive the send op before the
1264 initial metadata op, and (b) the need for efficiency, especially in
1265 the streaming case.
1266
1267 We use double-checked locking to initially see if initialization has been
1268 performed. If it has not, we acquire the combiner and perform initialization.
1269 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001270static void cc_start_transport_stream_op_batch(
1271 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1272 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001273 call_data *calld = elem->call_data;
1274 channel_data *chand = elem->channel_data;
1275 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001276 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001277 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
1278 op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001279 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001280 /* try to (atomically) get the call */
1281 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001282 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001283 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001284 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001285 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001286 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001287 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001288 return;
1289 }
1290 if (call != NULL) {
1291 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001292 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001293 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001294 return;
1295 }
1296 /* we failed; lock and figure out what to do */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001297 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Craig Tillerc55c1022017-03-10 10:26:42 -08001298 op->handler_private.extra_arg = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001299 grpc_closure_sched(
1300 exec_ctx,
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001301 grpc_closure_init(&op->handler_private.closure,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001302 start_transport_stream_op_batch_locked, op,
Craig Tillerbefafe62017-02-09 11:30:54 -08001303 grpc_combiner_scheduler(chand->combiner, false)),
1304 GRPC_ERROR_NONE);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001305 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001306}
1307
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001308/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001309static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1310 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001311 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001312 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001313 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001314 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001315 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001316 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001317 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001318 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001319 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001320 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001321 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001322 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001323 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001324}
1325
1326/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001327static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1328 grpc_call_element *elem,
1329 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001330 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001331 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001332 channel_data *chand = elem->channel_data;
1333 if (chand->deadline_checking_enabled) {
1334 grpc_deadline_state_destroy(exec_ctx, elem);
1335 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001336 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001337 if (calld->method_params != NULL) {
1338 method_parameters_unref(calld->method_params);
1339 }
Mark D. Rothf28763c2016-09-14 15:18:40 -07001340 GRPC_ERROR_UNREF(calld->cancel_error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001341 grpc_subchannel_call *call = GET_CALL(calld);
1342 if (call != NULL && call != CANCELLED_CALL) {
Craig Tillerd426cac2017-03-13 12:30:45 -07001343 grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
1344 then_schedule_closure = NULL;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001345 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
1346 }
Mark D. Roth64a317c2017-05-02 08:27:08 -07001347 GPR_ASSERT(!calld->pick_pending);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001348 GPR_ASSERT(calld->waiting_ops_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001349 if (calld->connected_subchannel != NULL) {
1350 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1351 "picked");
1352 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001353 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1354 if (calld->subchannel_call_context[i].value != NULL) {
1355 calld->subchannel_call_context[i].destroy(
1356 calld->subchannel_call_context[i].value);
1357 }
1358 }
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001359 gpr_free(calld->waiting_ops);
Craig Tillerd426cac2017-03-13 12:30:45 -07001360 grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001361}
1362
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001363static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1364 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001365 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001366 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001367 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001368}
1369
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001370/*************************************************************************
1371 * EXPORTED SYMBOLS
1372 */
1373
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001374const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001375 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001376 cc_start_transport_op,
1377 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001378 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001379 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001380 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001381 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001382 cc_init_channel_elem,
1383 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001384 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001385 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001386 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001387};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001388
Craig Tiller613dafa2017-02-09 12:00:43 -08001389static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1390 grpc_error *error_ignored) {
1391 channel_data *chand = arg;
1392 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001393 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001394 } else {
1395 chand->exit_idle_when_lb_policy_arrives = true;
1396 if (!chand->started_resolving && chand->resolver != NULL) {
1397 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1398 chand->started_resolving = true;
Craig Tiller972470b2017-02-09 15:05:36 -08001399 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1400 &chand->resolver_result,
1401 &chand->on_resolver_result_changed);
Craig Tiller613dafa2017-02-09 12:00:43 -08001402 }
1403 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001404 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001405}
1406
Craig Tillera82950e2015-09-22 12:33:20 -07001407grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1408 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001409 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001410 grpc_connectivity_state out =
1411 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001412 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001413 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001414 grpc_closure_sched(
1415 exec_ctx,
1416 grpc_closure_create(try_to_connect_locked, chand,
1417 grpc_combiner_scheduler(chand->combiner, false)),
1418 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001419 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001420 return out;
1421}
1422
Craig Tiller86c99582015-11-25 15:22:26 -08001423typedef struct {
1424 channel_data *chand;
1425 grpc_pollset *pollset;
1426 grpc_closure *on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001427 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001428 grpc_closure my_closure;
1429} external_connectivity_watcher;
1430
Craig Tiller1d881fb2015-12-01 07:39:04 -08001431static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001432 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001433 external_connectivity_watcher *w = arg;
1434 grpc_closure *follow_up = w->on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -08001435 grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
Craig Tiller1d881fb2015-12-01 07:39:04 -08001436 w->pollset);
1437 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1438 "external_connectivity_watcher");
Craig Tiller86c99582015-11-25 15:22:26 -08001439 gpr_free(w);
Craig Tiller613dafa2017-02-09 12:00:43 -08001440 grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
1441}
1442
Craig Tillera8610c02017-02-14 10:05:11 -08001443static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1444 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001445 external_connectivity_watcher *w = arg;
1446 grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
1447 grpc_schedule_on_exec_ctx);
1448 grpc_connectivity_state_notify_on_state_change(
1449 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
Craig Tiller86c99582015-11-25 15:22:26 -08001450}
1451
Craig Tillera82950e2015-09-22 12:33:20 -07001452void grpc_client_channel_watch_connectivity_state(
Craig Tiller906e3bc2015-11-24 07:31:31 -08001453 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
Mark D. Roth92210832017-05-02 15:04:39 -07001454 grpc_connectivity_state *state, grpc_closure *closure) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001455 channel_data *chand = elem->channel_data;
Craig Tiller86c99582015-11-25 15:22:26 -08001456 external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
1457 w->chand = chand;
1458 w->pollset = pollset;
Mark D. Roth92210832017-05-02 15:04:39 -07001459 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001460 w->state = state;
Craig Tiller69b093b2016-02-25 19:04:07 -08001461 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001462 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1463 "external_connectivity_watcher");
Craig Tiller613dafa2017-02-09 12:00:43 -08001464 grpc_closure_sched(
1465 exec_ctx,
Craig Tillera8610c02017-02-14 10:05:11 -08001466 grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tiller613dafa2017-02-09 12:00:43 -08001467 grpc_combiner_scheduler(chand->combiner, true)),
1468 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001469}