blob: 95578d989ccafed33a617daf4fc6444264db9b60 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller9eb0fde2017-03-31 16:59:30 -070034#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070042#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/sync.h>
44#include <grpc/support/useful.h>
45
Craig Tiller9eb0fde2017-03-31 16:59:30 -070046#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
47#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
48#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
49#include "src/core/ext/filters/client_channel/resolver_registry.h"
50#include "src/core/ext/filters/client_channel/retry_throttle.h"
51#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070052#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/channel/channel_args.h"
54#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080055#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070057#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080059#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070060#include "src/core/lib/support/string.h"
61#include "src/core/lib/surface/channel.h"
62#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070063#include "src/core/lib/transport/metadata.h"
64#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070065#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070066#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth26b7be42016-10-24 10:08:07 -070070/*************************************************************************
71 * METHOD-CONFIG TABLE
72 */
73
Mark D. Roth9d480942016-10-19 14:18:05 -070074typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080075 /* zero so it can be default initialized */
76 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070077 WAIT_FOR_READY_FALSE,
78 WAIT_FOR_READY_TRUE
79} wait_for_ready_value;
80
Mark D. Roth95b627b2017-02-24 11:02:58 -080081typedef struct {
82 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070083 gpr_timespec timeout;
84 wait_for_ready_value wait_for_ready;
85} method_parameters;
86
Mark D. Roth722de8d2017-02-27 10:50:44 -080087static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080088 method_parameters *method_params) {
89 gpr_ref(&method_params->refs);
90 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070091}
92
Mark D. Roth95b627b2017-02-24 11:02:58 -080093static void method_parameters_unref(method_parameters *method_params) {
94 if (gpr_unref(&method_params->refs)) {
95 gpr_free(method_params);
96 }
97}
98
Mark D. Roth95b627b2017-02-24 11:02:58 -080099static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
100 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800101}
102
Mark D. Roth95b627b2017-02-24 11:02:58 -0800103static bool parse_wait_for_ready(grpc_json *field,
104 wait_for_ready_value *wait_for_ready) {
105 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
106 return false;
107 }
108 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
109 : WAIT_FOR_READY_FALSE;
110 return true;
111}
112
Mark D. Roth722de8d2017-02-27 10:50:44 -0800113static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800114 if (field->type != GRPC_JSON_STRING) return false;
115 size_t len = strlen(field->value);
116 if (field->value[len - 1] != 's') return false;
117 char *buf = gpr_strdup(field->value);
118 buf[len - 1] = '\0'; // Remove trailing 's'.
119 char *decimal_point = strchr(buf, '.');
120 if (decimal_point != NULL) {
121 *decimal_point = '\0';
122 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
123 if (timeout->tv_nsec == -1) {
124 gpr_free(buf);
125 return false;
126 }
127 // There should always be exactly 3, 6, or 9 fractional digits.
128 int multiplier = 1;
129 switch (strlen(decimal_point + 1)) {
130 case 9:
131 break;
132 case 6:
133 multiplier *= 1000;
134 break;
135 case 3:
136 multiplier *= 1000000;
137 break;
138 default: // Unsupported number of digits.
139 gpr_free(buf);
140 return false;
141 }
142 timeout->tv_nsec *= multiplier;
143 }
144 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
145 gpr_free(buf);
146 if (timeout->tv_sec == -1) return false;
147 return true;
148}
149
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700150static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700151 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700152 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
153 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700154 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800155 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700156 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800157 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700158 } else if (strcmp(field->key, "timeout") == 0) {
159 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800160 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700161 }
162 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700163 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800164 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700165 value->timeout = timeout;
166 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700167 return value;
168}
169
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700170/*************************************************************************
171 * CHANNEL-WIDE FUNCTIONS
172 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800173
Craig Tiller800dacb2015-10-06 09:10:26 -0700174typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700175 /** resolver for this channel */
176 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700177 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700178 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700179 /** is deadline checking enabled? */
180 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700181 /** client channel factory */
182 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700183
Craig Tillerbefafe62017-02-09 11:30:54 -0800184 /** combiner protecting all variables below in this data structure */
185 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700186 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700187 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800188 /** retry throttle data */
189 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700190 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800191 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700192 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700193 grpc_channel_args *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -0700194 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -0700195 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700196 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700197 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700198 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700199 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700200 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700201 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800202 /** owning stack */
203 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800204 /** interested parties (owned) */
205 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800206
207 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800208 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800209 gpr_mu info_mu;
210 char *info_lb_policy_name;
211 /** service config in JSON form */
212 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213} channel_data;
214
Craig Tillerd6c98df2015-08-18 09:33:44 -0700215/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700216 resolver, to watch for state changes from the lb_policy. When a state
217 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700218typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700219 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700220 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700221 grpc_connectivity_state state;
222 grpc_lb_policy *lb_policy;
223} lb_policy_connectivity_watcher;
224
Craig Tiller2400bf52017-02-09 16:25:19 -0800225static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
226 grpc_lb_policy *lb_policy,
227 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700228
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800229static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
230 channel_data *chand,
231 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700232 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800233 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700234 /* TODO: Improve failure handling:
235 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
236 * - Hand over pending picks from old policies during the switch that happens
237 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700238 if (chand->lb_policy != NULL) {
239 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
240 /* cancel picks with wait_for_ready=false */
241 grpc_lb_policy_cancel_picks_locked(
242 exec_ctx, chand->lb_policy,
243 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
244 /* check= */ 0, GRPC_ERROR_REF(error));
245 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
246 /* cancel all picks */
247 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
248 /* mask= */ 0, /* check= */ 0,
249 GRPC_ERROR_REF(error));
250 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800251 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700252 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
253 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800254}
255
Craig Tiller804ff712016-05-05 16:25:40 -0700256static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800257 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700258 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700259 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800260 /* check if the notification is for the latest policy */
261 if (w->lb_policy == w->chand->lb_policy) {
262 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
263 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800264 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800265 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
266 w->chand->lb_policy = NULL;
267 }
268 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
269 GRPC_ERROR_REF(error), "lb_changed");
270 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800271 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800272 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800273 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700274
Craig Tiller906e3bc2015-11-24 07:31:31 -0800275 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700276 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700277}
278
Craig Tiller2400bf52017-02-09 16:25:19 -0800279static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
280 grpc_lb_policy *lb_policy,
281 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700282 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800283 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700284
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700285 w->chand = chand;
Craig Tillerbefafe62017-02-09 11:30:54 -0800286 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
287 grpc_combiner_scheduler(chand->combiner, false));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700288 w->state = current_state;
289 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800290 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
291 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700292}
293
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800294typedef struct {
295 char *server_name;
296 grpc_server_retry_throttle_data *retry_throttle_data;
297} service_config_parsing_state;
298
299static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
300 service_config_parsing_state *parsing_state = arg;
301 if (strcmp(field->key, "retryThrottling") == 0) {
302 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
303 if (field->type != GRPC_JSON_OBJECT) return;
304 int max_milli_tokens = 0;
305 int milli_token_ratio = 0;
306 for (grpc_json *sub_field = field->child; sub_field != NULL;
307 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800308 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800309 if (strcmp(sub_field->key, "maxTokens") == 0) {
310 if (max_milli_tokens != 0) return; // Duplicate.
311 if (sub_field->type != GRPC_JSON_NUMBER) return;
312 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
313 if (max_milli_tokens == -1) return;
314 max_milli_tokens *= 1000;
315 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
316 if (milli_token_ratio != 0) return; // Duplicate.
317 if (sub_field->type != GRPC_JSON_NUMBER) return;
318 // We support up to 3 decimal digits.
319 size_t whole_len = strlen(sub_field->value);
320 uint32_t multiplier = 1;
321 uint32_t decimal_value = 0;
322 const char *decimal_point = strchr(sub_field->value, '.');
323 if (decimal_point != NULL) {
324 whole_len = (size_t)(decimal_point - sub_field->value);
325 multiplier = 1000;
326 size_t decimal_len = strlen(decimal_point + 1);
327 if (decimal_len > 3) decimal_len = 3;
328 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
329 &decimal_value)) {
330 return;
331 }
332 uint32_t decimal_multiplier = 1;
333 for (size_t i = 0; i < (3 - decimal_len); ++i) {
334 decimal_multiplier *= 10;
335 }
336 decimal_value *= decimal_multiplier;
337 }
338 uint32_t whole_value;
339 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
340 &whole_value)) {
341 return;
342 }
343 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800344 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800345 }
346 }
347 parsing_state->retry_throttle_data =
348 grpc_retry_throttle_map_get_data_for_server(
349 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
350 }
351}
352
David Garcia Quintas956f7002017-04-13 15:40:06 -0700353// Wrap a closure associated with \a lb_policy. The associated callback (\a
354// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
355// scheduling \a wrapped_closure.
356typedef struct wrapped_on_pick_closure_arg {
357 /* the closure instance using this struct as argument */
358 grpc_closure wrapper_closure;
359
360 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
361 * calls against the internal RR instance, respectively. */
362 grpc_closure *wrapped_closure;
363
364 /* The policy instance related to the closure */
365 grpc_lb_policy *lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -0700366} wrapped_on_pick_closure_arg;
367
David Garcia Quintas37251282017-04-14 13:46:03 -0700368// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
David Garcia Quintas956f7002017-04-13 15:40:06 -0700369static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
370 grpc_error *error) {
371 wrapped_on_pick_closure_arg *wc_arg = arg;
David Garcia Quintas37251282017-04-14 13:46:03 -0700372 GPR_ASSERT(wc_arg != NULL);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700373 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
374 GPR_ASSERT(wc_arg->lb_policy != NULL);
David Garcia Quintas37251282017-04-14 13:46:03 -0700375 grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700376 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -0700377 gpr_free(wc_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700378}
379
Craig Tillerbefafe62017-02-09 11:30:54 -0800380static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
381 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700382 channel_data *chand = arg;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700383 char *lb_policy_name = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700384 grpc_lb_policy *lb_policy = NULL;
385 grpc_lb_policy *old_lb_policy;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800386 grpc_slice_hash_table *method_params_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700387 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700388 bool exit_idle = false;
ncteisen4b36a3d2017-03-13 19:08:06 -0700389 grpc_error *state_error =
390 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800391 char *service_config_json = NULL;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800392 service_config_parsing_state parsing_state;
393 memset(&parsing_state, 0, sizeof(parsing_state));
Craig Tiller3f475422015-06-25 10:43:05 -0700394
Mark D. Roth046cf762016-09-26 11:13:51 -0700395 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700396 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700397 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700398 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700399 if (channel_arg != NULL) {
400 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
401 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700402 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700403 // Special case: If at least one balancer address is present, we use
404 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700405 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700406 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700407 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700408 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700409 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700410 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700411 if (addresses->addresses[i].is_balancer) {
412 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700413 break;
414 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700415 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700416 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700417 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
418 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700419 "resolver requested LB policy %s but provided at least one "
420 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700421 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700422 }
423 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700424 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700425 }
426 // Use pick_first if nothing was specified and we didn't select grpclb
427 // above.
428 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700429 // Instantiate LB policy.
430 grpc_lb_policy_args lb_policy_args;
431 lb_policy_args.args = chand->resolver_result;
432 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800433 lb_policy_args.combiner = chand->combiner;
Mark D. Roth88405f72016-10-03 08:24:52 -0700434 lb_policy =
435 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Craig Tillera82950e2015-09-22 12:33:20 -0700436 if (lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700437 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tillerf707d622016-05-06 14:26:12 -0700438 GRPC_ERROR_UNREF(state_error);
Craig Tiller2400bf52017-02-09 16:25:19 -0800439 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
440 &state_error);
Craig Tiller45724b32015-09-22 10:42:19 -0700441 }
Mark D. Roth41124992016-11-03 11:22:20 -0700442 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700443 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700444 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700445 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700446 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800447 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700448 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800449 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700450 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800451 channel_arg =
452 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
453 GPR_ASSERT(channel_arg != NULL);
454 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700455 grpc_uri *uri =
456 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800457 GPR_ASSERT(uri->path[0] != '\0');
458 parsing_state.server_name =
459 uri->path[0] == '/' ? uri->path + 1 : uri->path;
460 grpc_service_config_parse_global_params(
461 service_config, parse_retry_throttle_params, &parsing_state);
462 parsing_state.server_name = NULL;
463 grpc_uri_destroy(uri);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700464 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800465 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700466 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700467 grpc_service_config_destroy(service_config);
468 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700469 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700470 // Before we clean up, save a copy of lb_policy_name, since it might
471 // be pointing to data inside chand->resolver_result.
472 // The copy will be saved in chand->lb_policy_name below.
473 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800474 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700475 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700476 }
477
Craig Tiller86c99582015-11-25 15:22:26 -0800478 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800479 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
480 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800481 }
482
Craig Tiller613dafa2017-02-09 12:00:43 -0800483 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700484 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800485 gpr_free(chand->info_lb_policy_name);
486 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700487 }
Craig Tiller3f475422015-06-25 10:43:05 -0700488 old_lb_policy = chand->lb_policy;
489 chand->lb_policy = lb_policy;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800490 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800491 gpr_free(chand->info_service_config_json);
492 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800493 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800494 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800495
496 if (chand->retry_throttle_data != NULL) {
497 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
498 }
499 chand->retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700500 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800501 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700502 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700503 chand->method_params_table = method_params_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700504 if (lb_policy != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800505 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller0ede5452016-04-23 12:21:45 -0700506 } else if (chand->resolver == NULL /* disconnected */) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700507 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
508 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
509 "Channel disconnected", &error, 1));
Craig Tiller91031da2016-12-28 15:44:25 -0800510 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tillera82950e2015-09-22 12:33:20 -0700511 }
512 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
513 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700514 exit_idle = true;
515 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700516 }
Craig Tiller98465032015-06-29 14:36:42 -0700517
Craig Tiller804ff712016-05-05 16:25:40 -0700518 if (error == GRPC_ERROR_NONE && chand->resolver) {
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700519 set_channel_connectivity_state_locked(
520 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Craig Tillera82950e2015-09-22 12:33:20 -0700521 if (lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800522 watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700523 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800524 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -0800525 grpc_resolver_next_locked(exec_ctx, chand->resolver,
526 &chand->resolver_result,
527 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700528 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800529 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800530 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800531 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
532 chand->resolver = NULL;
533 }
Craig Tiller804ff712016-05-05 16:25:40 -0700534 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800535 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700536 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700537 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
538 "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)),
Craig Tiller804ff712016-05-05 16:25:40 -0700539 "resolver_gone");
Craig Tillera82950e2015-09-22 12:33:20 -0700540 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700541
Craig Tillera82950e2015-09-22 12:33:20 -0700542 if (exit_idle) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800543 grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
Craig Tillera82950e2015-09-22 12:33:20 -0700544 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
545 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700546
Craig Tillera82950e2015-09-22 12:33:20 -0700547 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800548 grpc_pollset_set_del_pollset_set(
549 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700550 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
551 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700552
Craig Tillera82950e2015-09-22 12:33:20 -0700553 if (lb_policy != NULL) {
554 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
555 }
Craig Tiller45724b32015-09-22 10:42:19 -0700556
Craig Tiller906e3bc2015-11-24 07:31:31 -0800557 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700558 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700559}
560
Craig Tillera8610c02017-02-14 10:05:11 -0800561static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
562 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800563 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800564 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700565 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700566
Craig Tillera82950e2015-09-22 12:33:20 -0700567 if (op->on_connectivity_state_change != NULL) {
568 grpc_connectivity_state_notify_on_state_change(
569 exec_ctx, &chand->state_tracker, op->connectivity_state,
570 op->on_connectivity_state_change);
571 op->on_connectivity_state_change = NULL;
572 op->connectivity_state = NULL;
573 }
574
Craig Tiller26dab312015-12-07 14:43:47 -0800575 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800576 if (chand->lb_policy == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700577 grpc_closure_sched(
578 exec_ctx, op->send_ping,
579 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800580 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800581 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800582 op->bind_pollset = NULL;
583 }
584 op->send_ping = NULL;
585 }
586
Craig Tiller1c51edc2016-05-07 16:18:43 -0700587 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
588 if (chand->resolver != NULL) {
589 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700590 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700591 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800592 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700593 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
594 chand->resolver = NULL;
595 if (!chand->started_resolving) {
596 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
597 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller91031da2016-12-28 15:44:25 -0800598 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700599 }
600 if (chand->lb_policy != NULL) {
601 grpc_pollset_set_del_pollset_set(exec_ctx,
602 chand->lb_policy->interested_parties,
603 chand->interested_parties);
604 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
605 chand->lb_policy = NULL;
606 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700607 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700608 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700609 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800610 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
611
612 grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800613}
614
615static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
616 grpc_channel_element *elem,
617 grpc_transport_op *op) {
618 channel_data *chand = elem->channel_data;
619
Craig Tillerbefafe62017-02-09 11:30:54 -0800620 GPR_ASSERT(op->set_accept_stream == false);
621 if (op->bind_pollset != NULL) {
622 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
623 op->bind_pollset);
624 }
625
Craig Tillerc55c1022017-03-10 10:26:42 -0800626 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800627 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
Craig Tillerbefafe62017-02-09 11:30:54 -0800628 grpc_closure_sched(
Craig Tillerc55c1022017-03-10 10:26:42 -0800629 exec_ctx,
630 grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
631 op, grpc_combiner_scheduler(chand->combiner, false)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800632 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700633}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800634
Mark D. Rothb2d24882016-10-27 15:44:07 -0700635static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
636 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700637 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700638 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800639 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700640 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800641 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700642 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800643 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700644 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800645 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800646 *info->service_config_json =
647 chand->info_service_config_json == NULL
648 ? NULL
649 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800650 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800651 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700652}
653
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700654/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800655static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800656 grpc_channel_element *elem,
657 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700658 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700659 GPR_ASSERT(args->is_last);
660 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800661 // Initialize data members.
Craig Tillerbefafe62017-02-09 11:30:54 -0800662 chand->combiner = grpc_combiner_create(NULL);
Craig Tillerd85477512017-02-09 12:02:39 -0800663 gpr_mu_init(&chand->info_mu);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800664 chand->owning_stack = args->channel_stack;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700665 grpc_closure_init(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800666 on_resolver_result_changed_locked, chand,
667 grpc_combiner_scheduler(chand->combiner, false));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800668 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700669 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
670 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800671 // Record client channel factory.
672 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
673 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700674 if (arg == NULL) {
675 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
676 "Missing client channel factory in args for client channel filter");
677 }
678 if (arg->type != GRPC_ARG_POINTER) {
679 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
680 "client channel factory arg must be a pointer");
681 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800682 grpc_client_channel_factory_ref(arg->value.pointer.p);
683 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800684 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800685 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700686 if (arg == NULL) {
687 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
688 "Missing server uri in args for client channel filter");
689 }
690 if (arg->type != GRPC_ARG_STRING) {
691 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
692 "server uri arg must be a string");
693 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800694 char *proxy_name = NULL;
695 grpc_channel_args *new_args = NULL;
696 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
697 &proxy_name, &new_args);
698 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800699 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800700 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
701 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800702 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800703 if (proxy_name != NULL) gpr_free(proxy_name);
704 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800705 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700706 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800707 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700708 chand->deadline_checking_enabled =
709 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800710 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700711}
712
Craig Tiller972470b2017-02-09 15:05:36 -0800713static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
714 grpc_error *error) {
715 grpc_resolver *resolver = arg;
716 grpc_resolver_shutdown_locked(exec_ctx, resolver);
717 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
718}
719
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700720/* Destructor for channel_data */
721static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
722 grpc_channel_element *elem) {
723 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700724 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800725 grpc_closure_sched(
726 exec_ctx,
727 grpc_closure_create(shutdown_resolver_locked, chand->resolver,
728 grpc_combiner_scheduler(chand->combiner, false)),
729 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700730 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700731 if (chand->client_channel_factory != NULL) {
732 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
733 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734 if (chand->lb_policy != NULL) {
735 grpc_pollset_set_del_pollset_set(exec_ctx,
736 chand->lb_policy->interested_parties,
737 chand->interested_parties);
738 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
739 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800740 gpr_free(chand->info_lb_policy_name);
741 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800742 if (chand->retry_throttle_data != NULL) {
743 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
744 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700745 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800746 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700747 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700748 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800749 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800750 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800751 gpr_mu_destroy(&chand->info_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700752}
753
754/*************************************************************************
755 * PER-CALL FUNCTIONS
756 */
757
758#define GET_CALL(call_data) \
759 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
760
761#define CANCELLED_CALL ((grpc_subchannel_call *)1)
762
763typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -0800764 /* zero so that it can be default-initialized */
765 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0,
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700766 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
767} subchannel_creation_phase;
768
769/** Call data. Holds a pointer to grpc_subchannel_call and the
770 associated machinery to create such a pointer.
771 Handles queueing of stream ops until a call object is ready, waiting
772 for initial metadata before trying to create a call object,
773 and handling cancellation gracefully. */
774typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700775 // State for handling deadlines.
776 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700777 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
778 // and this struct both independently store a pointer to the call
779 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700780 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700781 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700782
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800783 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700784 gpr_timespec call_start_time;
785 gpr_timespec deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700786 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800787 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700788
Mark D. Rothf28763c2016-09-14 15:18:40 -0700789 grpc_error *cancel_error;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700790
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700791 /** either 0 for no call, 1 for cancelled, or a pointer to a
792 grpc_subchannel_call */
793 gpr_atm subchannel_call;
Craig Tillerd426cac2017-03-13 12:30:45 -0700794 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700795
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700796 subchannel_creation_phase creation_phase;
797 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700798 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700799 grpc_polling_entity *pollent;
800
Craig Tillera0f3abd2017-03-31 15:42:16 -0700801 grpc_transport_stream_op_batch **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700802 size_t waiting_ops_count;
803 size_t waiting_ops_capacity;
804
805 grpc_closure next_step;
806
807 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200808
809 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800810
811 grpc_closure on_complete;
812 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700813} call_data;
814
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800815grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
816 grpc_call_element *call_elem) {
817 grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
818 return scc == CANCELLED_CALL ? NULL : scc;
819}
820
Craig Tillere1b51da2017-03-31 15:44:33 -0700821static void add_waiting_locked(call_data *calld,
822 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700823 GPR_TIMER_BEGIN("add_waiting_locked", 0);
824 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
825 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
826 calld->waiting_ops =
827 gpr_realloc(calld->waiting_ops,
828 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
829 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700830 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700831 GPR_TIMER_END("add_waiting_locked", 0);
832}
833
834static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
835 grpc_error *error) {
836 size_t i;
837 for (i = 0; i < calld->waiting_ops_count; i++) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700838 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700839 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700840 }
841 calld->waiting_ops_count = 0;
842 GRPC_ERROR_UNREF(error);
843}
844
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700845static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700846 if (calld->waiting_ops_count == 0) {
847 return;
848 }
849
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800850 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -0700851 grpc_transport_stream_op_batch **ops = calld->waiting_ops;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800852 size_t nops = calld->waiting_ops_count;
853 if (call == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700854 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
855 return;
856 }
857 calld->waiting_ops = NULL;
858 calld->waiting_ops_count = 0;
859 calld->waiting_ops_capacity = 0;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800860 for (size_t i = 0; i < nops; i++) {
861 grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
862 }
Craig Tiller9efea882017-02-09 13:06:52 -0800863 gpr_free(ops);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700864}
865
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700866// Sets calld->method_params and calld->retry_throttle_data.
Craig Tiller11c17d42017-03-13 13:36:34 -0700867// If the method params specify a timeout, populates
868// *per_method_deadline and returns true.
869static bool set_call_method_params_from_service_config_locked(
870 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
871 gpr_timespec *per_method_deadline) {
872 channel_data *chand = elem->channel_data;
873 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700874 if (chand->retry_throttle_data != NULL) {
875 calld->retry_throttle_data =
876 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
877 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700878 if (chand->method_params_table != NULL) {
879 calld->method_params = grpc_method_config_table_get(
880 exec_ctx, chand->method_params_table, calld->path);
881 if (calld->method_params != NULL) {
882 method_parameters_ref(calld->method_params);
883 if (gpr_time_cmp(calld->method_params->timeout,
884 gpr_time_0(GPR_TIMESPAN)) != 0) {
885 *per_method_deadline =
886 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
887 return true;
888 }
889 }
890 }
891 return false;
892}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700893
Craig Tiller11c17d42017-03-13 13:36:34 -0700894static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
895 grpc_call_element *elem) {
896 /* apply service-config level configuration to the call (now that we're
897 * certain it exists) */
898 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700899 channel_data *chand = elem->channel_data;
Craig Tiller11c17d42017-03-13 13:36:34 -0700900 gpr_timespec per_method_deadline;
901 if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
902 &per_method_deadline)) {
903 // If the deadline from the service config is shorter than the one
904 // from the client API, reset the deadline timer.
Craig Tiller3be7dd02017-04-03 14:30:03 -0700905 if (chand->deadline_checking_enabled &&
906 gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700907 calld->deadline = per_method_deadline;
908 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
909 }
910 }
911}
912
Craig Tillerbefafe62017-02-09 11:30:54 -0800913static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
914 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700915 grpc_call_element *elem = arg;
916 call_data *calld = elem->call_data;
917 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700918 GPR_ASSERT(calld->creation_phase ==
919 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
Yuchen Zeng19656b12016-09-01 18:00:45 -0700920 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
921 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700922 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
923 if (calld->connected_subchannel == NULL) {
924 gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
ncteisen4b36a3d2017-03-13 19:08:06 -0700925 fail_locked(exec_ctx, calld,
926 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
927 "Failed to create subchannel", &error, 1));
Mark D. Roth72f6da82016-09-02 13:42:38 -0700928 } else if (GET_CALL(calld) == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700929 /* already cancelled before subchannel became ready */
ncteisen4b36a3d2017-03-13 19:08:06 -0700930 grpc_error *cancellation_error =
931 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
932 "Cancelled before creating subchannel", &error, 1);
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800933 /* if due to deadline, attach the deadline exceeded status to the error */
934 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
935 cancellation_error =
936 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
937 GRPC_STATUS_DEADLINE_EXCEEDED);
938 }
939 fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700940 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700941 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700942 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -0700943 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -0700944 .pollent = calld->pollent,
945 .path = calld->path,
946 .start_time = calld->call_start_time,
947 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700948 .arena = calld->arena,
949 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700950 grpc_error *new_error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -0700951 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700952 gpr_atm_rel_store(&calld->subchannel_call,
953 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth61a63982017-04-05 09:56:12 -0700954 if (new_error != GRPC_ERROR_NONE) {
955 new_error = grpc_error_add_child(new_error, error);
956 fail_locked(exec_ctx, calld, new_error);
957 } else {
958 retry_waiting_locked(exec_ctx, calld);
959 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700960 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700961 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
962}
963
964static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
965 call_data *calld = elem->call_data;
966 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
967 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
968 return NULL;
969 } else {
970 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
971 }
972}
973
Craig Tiller577c9b22015-11-02 14:11:15 -0800974typedef struct {
975 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800976 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -0800977 grpc_connected_subchannel **connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700978 grpc_call_context_element *subchannel_call_context;
Craig Tiller577c9b22015-11-02 14:11:15 -0800979 grpc_closure *on_ready;
980 grpc_call_element *elem;
981 grpc_closure closure;
982} continue_picking_args;
983
Yuchen Zeng144ce652016-09-01 18:19:34 -0700984/** Return true if subchannel is available immediately (in which case on_ready
985 should not be called), or false otherwise (in which case on_ready should be
986 called when the subchannel is available). */
Craig Tillerbefafe62017-02-09 11:30:54 -0800987static bool pick_subchannel_locked(
988 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
989 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700990 grpc_connected_subchannel **connected_subchannel,
991 grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
Craig Tillerbefafe62017-02-09 11:30:54 -0800992 grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800993
Craig Tillerbefafe62017-02-09 11:30:54 -0800994static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
995 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800996 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -0700997 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800998 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700999 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller91031da2016-12-28 15:44:25 -08001000 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001001 } else {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001002 if (pick_subchannel_locked(
1003 exec_ctx, cpa->elem, cpa->initial_metadata,
1004 cpa->initial_metadata_flags, cpa->connected_subchannel,
1005 cpa->subchannel_call_context, cpa->on_ready, GRPC_ERROR_NONE)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001006 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001007 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001008 }
1009 gpr_free(cpa);
1010}
1011
Craig Tillerbefafe62017-02-09 11:30:54 -08001012static bool pick_subchannel_locked(
1013 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1014 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001015 grpc_connected_subchannel **connected_subchannel,
1016 grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
Craig Tillerbefafe62017-02-09 11:30:54 -08001017 grpc_error *error) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001018 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001019
Craig Tiller577c9b22015-11-02 14:11:15 -08001020 channel_data *chand = elem->channel_data;
1021 call_data *calld = elem->call_data;
1022 continue_picking_args *cpa;
1023 grpc_closure *closure;
1024
Craig Tillerb5585d42015-11-17 07:18:31 -08001025 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -08001026
Craig Tiller577c9b22015-11-02 14:11:15 -08001027 if (initial_metadata == NULL) {
1028 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001029 grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
1030 connected_subchannel,
1031 GRPC_ERROR_REF(error));
Craig Tiller577c9b22015-11-02 14:11:15 -08001032 }
1033 for (closure = chand->waiting_for_config_closures.head; closure != NULL;
Craig Tiller804ff712016-05-05 16:25:40 -07001034 closure = closure->next_data.next) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001035 cpa = closure->cb_arg;
Craig Tillerb5585d42015-11-17 07:18:31 -08001036 if (cpa->connected_subchannel == connected_subchannel) {
1037 cpa->connected_subchannel = NULL;
ncteisen4b36a3d2017-03-13 19:08:06 -07001038 grpc_closure_sched(exec_ctx, cpa->on_ready,
1039 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1040 "Pick cancelled", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -08001041 }
1042 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001043 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth697a1f62016-09-07 13:35:07 -07001044 GRPC_ERROR_UNREF(error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001045 return true;
Craig Tiller577c9b22015-11-02 14:11:15 -08001046 }
Mark D. Roth697a1f62016-09-07 13:35:07 -07001047 GPR_ASSERT(error == GRPC_ERROR_NONE);
Craig Tiller577c9b22015-11-02 14:11:15 -08001048 if (chand->lb_policy != NULL) {
Craig Tiller2e0788a2017-03-14 06:55:44 -07001049 apply_final_configuration_locked(exec_ctx, elem);
Craig Tiller86c0f8a2015-12-01 20:05:40 -08001050 grpc_lb_policy *lb_policy = chand->lb_policy;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001051 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothe40dd292016-10-05 14:58:37 -07001052 // If the application explicitly set wait_for_ready, use that.
1053 // Otherwise, if the service config specified a value for this
1054 // method, use that.
Mark D. Rothc1c38582016-10-11 11:03:27 -07001055 const bool wait_for_ready_set_from_api =
1056 initial_metadata_flags &
1057 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1058 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001059 calld->method_params != NULL &&
1060 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001061 if (!wait_for_ready_set_from_api &&
1062 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001063 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001064 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1065 } else {
1066 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1067 }
1068 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001069 const grpc_lb_policy_pick_args inputs = {
Yuchen Zengac8bc422016-10-05 14:00:02 -07001070 initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
1071 gpr_inf_future(GPR_CLOCK_MONOTONIC)};
David Garcia Quintas956f7002017-04-13 15:40:06 -07001072
1073 // Wrap the user-provided callback in order to hold a strong reference to
1074 // the LB policy for the duration of the pick.
1075 wrapped_on_pick_closure_arg *w_on_pick_arg =
1076 gpr_zalloc(sizeof(*w_on_pick_arg));
1077 grpc_closure_init(&w_on_pick_arg->wrapper_closure,
1078 wrapped_on_pick_closure_cb, w_on_pick_arg,
1079 grpc_schedule_on_exec_ctx);
1080 w_on_pick_arg->wrapped_closure = on_ready;
1081 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
1082 w_on_pick_arg->lb_policy = lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -07001083 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Roth09e458c2017-05-02 08:13:26 -07001084 exec_ctx, lb_policy, &inputs, connected_subchannel,
1085 subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001086 if (pick_done) {
1087 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
1088 GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
1089 "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -07001090 gpr_free(w_on_pick_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001091 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001092 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
1093 GPR_TIMER_END("pick_subchannel", 0);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001094 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001095 }
1096 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001097 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -08001098 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -08001099 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1100 &chand->resolver_result,
1101 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -08001102 }
Craig Tiller0eab6972016-04-23 12:59:57 -07001103 if (chand->resolver != NULL) {
1104 cpa = gpr_malloc(sizeof(*cpa));
1105 cpa->initial_metadata = initial_metadata;
1106 cpa->initial_metadata_flags = initial_metadata_flags;
1107 cpa->connected_subchannel = connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001108 cpa->subchannel_call_context = subchannel_call_context;
Craig Tiller0eab6972016-04-23 12:59:57 -07001109 cpa->on_ready = on_ready;
1110 cpa->elem = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001111 grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
1112 grpc_combiner_scheduler(chand->combiner, true));
Craig Tiller804ff712016-05-05 16:25:40 -07001113 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
1114 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -07001115 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -07001116 grpc_closure_sched(exec_ctx, on_ready,
1117 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001118 }
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001119
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001120 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001121 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001122}
1123
Craig Tillere1b51da2017-03-31 15:44:33 -07001124static void start_transport_stream_op_batch_locked_inner(
1125 grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
1126 grpc_call_element *elem) {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001127 channel_data *chand = elem->channel_data;
Craig Tillera11bfc82017-02-14 09:56:33 -08001128 call_data *calld = elem->call_data;
Craig Tillerbefafe62017-02-09 11:30:54 -08001129 grpc_subchannel_call *call;
1130
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001131 /* need to recheck that another thread hasn't set the call */
1132 call = GET_CALL(calld);
1133 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001134 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Rothf28763c2016-09-14 15:18:40 -07001135 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001136 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001137 return;
1138 }
1139 if (call != NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001140 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001141 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001142 return;
1143 }
1144 /* if this is a cancellation, then we can raise our cancelled flag */
Craig Tillerc55c1022017-03-10 10:26:42 -08001145 if (op->cancel_stream) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001146 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
1147 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tillera11bfc82017-02-14 09:56:33 -08001148 /* recurse to retry */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001149 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001150 /* early out */
1151 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001152 } else {
Craig Tillerbe9691a2017-02-14 10:00:42 -08001153 /* Stash a copy of cancel_error in our call data, so that we can use
1154 it for subsequent operations. This ensures that if the call is
1155 cancelled before any ops are passed down (e.g., if the deadline
1156 is in the past when the call starts), we can return the right
1157 error to the caller when the first op does get passed down. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001158 calld->cancel_error =
1159 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001160 switch (calld->creation_phase) {
1161 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
Craig Tillerc55c1022017-03-10 10:26:42 -08001162 fail_locked(exec_ctx, calld,
1163 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001164 break;
1165 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
Craig Tillerc55c1022017-03-10 10:26:42 -08001166 pick_subchannel_locked(
Mark D. Roth09e458c2017-05-02 08:13:26 -07001167 exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, NULL,
Craig Tillerc55c1022017-03-10 10:26:42 -08001168 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001169 break;
1170 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001171 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tillerc55c1022017-03-10 10:26:42 -08001172 exec_ctx, op,
1173 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001174 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001175 return;
1176 }
1177 }
1178 /* if we don't have a subchannel, try to get one */
1179 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
Craig Tillerc55c1022017-03-10 10:26:42 -08001180 calld->connected_subchannel == NULL && op->send_initial_metadata) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001181 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
Craig Tillerbefafe62017-02-09 11:30:54 -08001182 grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
1183 grpc_combiner_scheduler(chand->combiner, true));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001184 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001185 /* If a subchannel is not available immediately, the polling entity from
1186 call_data should be provided to channel_data's interested_parties, so
1187 that IO of the lb_policy and resolver could be done under it. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001188 if (pick_subchannel_locked(
1189 exec_ctx, elem,
1190 op->payload->send_initial_metadata.send_initial_metadata,
1191 op->payload->send_initial_metadata.send_initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001192 &calld->connected_subchannel, calld->subchannel_call_context,
1193 &calld->next_step, GRPC_ERROR_NONE)) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001194 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
1195 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Yuchen Zeng19656b12016-09-01 18:00:45 -07001196 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001197 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1198 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001199 }
1200 }
1201 /* if we've got a subchannel, then let's ask it to create a call */
1202 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
1203 calld->connected_subchannel != NULL) {
1204 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -07001205 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -07001206 .pollent = calld->pollent,
1207 .path = calld->path,
1208 .start_time = calld->call_start_time,
1209 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001210 .arena = calld->arena,
1211 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001212 grpc_error *error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -07001213 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001214 gpr_atm_rel_store(&calld->subchannel_call,
1215 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001216 if (error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001217 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001218 grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
Mark D. Rothca136032017-04-04 13:53:29 -07001219 } else {
Mark D. Rothca136032017-04-04 13:53:29 -07001220 retry_waiting_locked(exec_ctx, calld);
1221 /* recurse to retry */
1222 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001223 }
Craig Tillera11bfc82017-02-14 09:56:33 -08001224 /* early out */
1225 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001226 }
1227 /* nothing to be done but wait */
1228 add_waiting_locked(calld, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001229}
1230
Mark D. Rothde144102017-03-15 10:11:03 -07001231static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001232 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001233 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001234 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001235 if (error == GRPC_ERROR_NONE) {
1236 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001237 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001238 } else {
1239 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001240 // decide whether or not to retry. Note that we should only
1241 // record failures whose statuses match the configured retryable
1242 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001243 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001244 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001245 }
1246 }
Mark D. Roth95039b52017-02-24 07:59:45 -08001247 grpc_closure_run(exec_ctx, calld->original_on_complete,
1248 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001249}
1250
Craig Tillere1b51da2017-03-31 15:44:33 -07001251static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1252 void *arg,
1253 grpc_error *error_ignored) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001254 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001255
Craig Tillera0f3abd2017-03-31 15:42:16 -07001256 grpc_transport_stream_op_batch *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -08001257 grpc_call_element *elem = op->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001258 call_data *calld = elem->call_data;
1259
Craig Tillerf6cb0c02017-03-28 14:11:49 -07001260 if (op->recv_trailing_metadata) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001261 GPR_ASSERT(op->on_complete != NULL);
1262 calld->original_on_complete = op->on_complete;
Mark D. Rothde144102017-03-15 10:11:03 -07001263 grpc_closure_init(&calld->on_complete, on_complete, elem,
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001264 grpc_schedule_on_exec_ctx);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001265 op->on_complete = &calld->on_complete;
1266 }
1267
Craig Tillera0f3abd2017-03-31 15:42:16 -07001268 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001269
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001270 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001271 "start_transport_stream_op_batch");
1272 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillerbefafe62017-02-09 11:30:54 -08001273}
1274
Craig Tillerbe9691a2017-02-14 10:00:42 -08001275/* The logic here is fairly complicated, due to (a) the fact that we
1276 need to handle the case where we receive the send op before the
1277 initial metadata op, and (b) the need for efficiency, especially in
1278 the streaming case.
1279
1280 We use double-checked locking to initially see if initialization has been
1281 performed. If it has not, we acquire the combiner and perform initialization.
1282 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001283static void cc_start_transport_stream_op_batch(
1284 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1285 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001286 call_data *calld = elem->call_data;
1287 channel_data *chand = elem->channel_data;
1288 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001289 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001290 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
1291 op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001292 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001293 /* try to (atomically) get the call */
1294 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001295 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001296 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001297 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001298 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001299 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001300 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001301 return;
1302 }
1303 if (call != NULL) {
1304 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001305 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001306 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001307 return;
1308 }
1309 /* we failed; lock and figure out what to do */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001310 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Craig Tillerc55c1022017-03-10 10:26:42 -08001311 op->handler_private.extra_arg = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001312 grpc_closure_sched(
1313 exec_ctx,
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001314 grpc_closure_init(&op->handler_private.closure,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001315 start_transport_stream_op_batch_locked, op,
Craig Tillerbefafe62017-02-09 11:30:54 -08001316 grpc_combiner_scheduler(chand->combiner, false)),
1317 GRPC_ERROR_NONE);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001318 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001319}
1320
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001321/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001322static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1323 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001324 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001325 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001326 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001327 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001328 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001329 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001330 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001331 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001332 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001333 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001334 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001335 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001336 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001337}
1338
1339/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001340static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1341 grpc_call_element *elem,
1342 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001343 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001344 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001345 channel_data *chand = elem->channel_data;
1346 if (chand->deadline_checking_enabled) {
1347 grpc_deadline_state_destroy(exec_ctx, elem);
1348 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001349 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001350 if (calld->method_params != NULL) {
1351 method_parameters_unref(calld->method_params);
1352 }
Mark D. Rothf28763c2016-09-14 15:18:40 -07001353 GRPC_ERROR_UNREF(calld->cancel_error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001354 grpc_subchannel_call *call = GET_CALL(calld);
1355 if (call != NULL && call != CANCELLED_CALL) {
Craig Tillerd426cac2017-03-13 12:30:45 -07001356 grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
1357 then_schedule_closure = NULL;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001358 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
1359 }
1360 GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001361 GPR_ASSERT(calld->waiting_ops_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001362 if (calld->connected_subchannel != NULL) {
1363 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1364 "picked");
1365 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001366 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1367 if (calld->subchannel_call_context[i].value != NULL) {
1368 calld->subchannel_call_context[i].destroy(
1369 calld->subchannel_call_context[i].value);
1370 }
1371 }
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001372 gpr_free(calld->waiting_ops);
Craig Tillerd426cac2017-03-13 12:30:45 -07001373 grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001374}
1375
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001376static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1377 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001378 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001379 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001380 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001381}
1382
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001383/*************************************************************************
1384 * EXPORTED SYMBOLS
1385 */
1386
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001387const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001388 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001389 cc_start_transport_op,
1390 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001391 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001392 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001393 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001394 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001395 cc_init_channel_elem,
1396 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001397 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001398 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001399 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001400};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001401
Craig Tiller613dafa2017-02-09 12:00:43 -08001402static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1403 grpc_error *error_ignored) {
1404 channel_data *chand = arg;
1405 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001406 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001407 } else {
1408 chand->exit_idle_when_lb_policy_arrives = true;
1409 if (!chand->started_resolving && chand->resolver != NULL) {
1410 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1411 chand->started_resolving = true;
Craig Tiller972470b2017-02-09 15:05:36 -08001412 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1413 &chand->resolver_result,
1414 &chand->on_resolver_result_changed);
Craig Tiller613dafa2017-02-09 12:00:43 -08001415 }
1416 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001417 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001418}
1419
Craig Tillera82950e2015-09-22 12:33:20 -07001420grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1421 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001422 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001423 grpc_connectivity_state out =
1424 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001425 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001426 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001427 grpc_closure_sched(
1428 exec_ctx,
1429 grpc_closure_create(try_to_connect_locked, chand,
1430 grpc_combiner_scheduler(chand->combiner, false)),
1431 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001432 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001433 return out;
1434}
1435
Craig Tiller86c99582015-11-25 15:22:26 -08001436typedef struct {
1437 channel_data *chand;
1438 grpc_pollset *pollset;
1439 grpc_closure *on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001440 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001441 grpc_closure my_closure;
1442} external_connectivity_watcher;
1443
Craig Tiller1d881fb2015-12-01 07:39:04 -08001444static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001445 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001446 external_connectivity_watcher *w = arg;
1447 grpc_closure *follow_up = w->on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -08001448 grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
Craig Tiller1d881fb2015-12-01 07:39:04 -08001449 w->pollset);
1450 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1451 "external_connectivity_watcher");
Craig Tiller86c99582015-11-25 15:22:26 -08001452 gpr_free(w);
Craig Tiller613dafa2017-02-09 12:00:43 -08001453 grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
1454}
1455
Craig Tillera8610c02017-02-14 10:05:11 -08001456static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1457 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001458 external_connectivity_watcher *w = arg;
1459 grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
1460 grpc_schedule_on_exec_ctx);
1461 grpc_connectivity_state_notify_on_state_change(
1462 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
Craig Tiller86c99582015-11-25 15:22:26 -08001463}
1464
Craig Tillera82950e2015-09-22 12:33:20 -07001465void grpc_client_channel_watch_connectivity_state(
Craig Tiller906e3bc2015-11-24 07:31:31 -08001466 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
Craig Tillera82950e2015-09-22 12:33:20 -07001467 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001468 channel_data *chand = elem->channel_data;
Craig Tiller86c99582015-11-25 15:22:26 -08001469 external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
1470 w->chand = chand;
1471 w->pollset = pollset;
1472 w->on_complete = on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001473 w->state = state;
Craig Tiller69b093b2016-02-25 19:04:07 -08001474 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001475 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1476 "external_connectivity_watcher");
Craig Tiller613dafa2017-02-09 12:00:43 -08001477 grpc_closure_sched(
1478 exec_ctx,
Craig Tillera8610c02017-02-14 10:05:11 -08001479 grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tiller613dafa2017-02-09 12:00:43 -08001480 grpc_combiner_scheduler(chand->combiner, true)),
1481 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001482}