blob: ab71467d73f13178ff75f996b0f709f46a110851 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller9eb0fde2017-03-31 16:59:30 -070034#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070042#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/sync.h>
44#include <grpc/support/useful.h>
45
Craig Tiller9eb0fde2017-03-31 16:59:30 -070046#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
47#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
48#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
49#include "src/core/ext/filters/client_channel/resolver_registry.h"
50#include "src/core/ext/filters/client_channel/retry_throttle.h"
51#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070052#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/channel/channel_args.h"
54#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080055#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070057#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080059#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070060#include "src/core/lib/support/string.h"
61#include "src/core/lib/surface/channel.h"
62#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070063#include "src/core/lib/transport/metadata.h"
64#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070065#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070066#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth26b7be42016-10-24 10:08:07 -070070/*************************************************************************
71 * METHOD-CONFIG TABLE
72 */
73
Mark D. Roth9d480942016-10-19 14:18:05 -070074typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080075 /* zero so it can be default initialized */
76 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070077 WAIT_FOR_READY_FALSE,
78 WAIT_FOR_READY_TRUE
79} wait_for_ready_value;
80
Mark D. Roth95b627b2017-02-24 11:02:58 -080081typedef struct {
82 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070083 gpr_timespec timeout;
84 wait_for_ready_value wait_for_ready;
85} method_parameters;
86
Mark D. Roth722de8d2017-02-27 10:50:44 -080087static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080088 method_parameters *method_params) {
89 gpr_ref(&method_params->refs);
90 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070091}
92
Mark D. Roth95b627b2017-02-24 11:02:58 -080093static void method_parameters_unref(method_parameters *method_params) {
94 if (gpr_unref(&method_params->refs)) {
95 gpr_free(method_params);
96 }
97}
98
Mark D. Roth95b627b2017-02-24 11:02:58 -080099static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
100 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800101}
102
Mark D. Roth95b627b2017-02-24 11:02:58 -0800103static bool parse_wait_for_ready(grpc_json *field,
104 wait_for_ready_value *wait_for_ready) {
105 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
106 return false;
107 }
108 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
109 : WAIT_FOR_READY_FALSE;
110 return true;
111}
112
Mark D. Roth722de8d2017-02-27 10:50:44 -0800113static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -0800114 if (field->type != GRPC_JSON_STRING) return false;
115 size_t len = strlen(field->value);
116 if (field->value[len - 1] != 's') return false;
117 char *buf = gpr_strdup(field->value);
118 buf[len - 1] = '\0'; // Remove trailing 's'.
119 char *decimal_point = strchr(buf, '.');
120 if (decimal_point != NULL) {
121 *decimal_point = '\0';
122 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
123 if (timeout->tv_nsec == -1) {
124 gpr_free(buf);
125 return false;
126 }
127 // There should always be exactly 3, 6, or 9 fractional digits.
128 int multiplier = 1;
129 switch (strlen(decimal_point + 1)) {
130 case 9:
131 break;
132 case 6:
133 multiplier *= 1000;
134 break;
135 case 3:
136 multiplier *= 1000000;
137 break;
138 default: // Unsupported number of digits.
139 gpr_free(buf);
140 return false;
141 }
142 timeout->tv_nsec *= multiplier;
143 }
144 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
145 gpr_free(buf);
146 if (timeout->tv_sec == -1) return false;
147 return true;
148}
149
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700150static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700151 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700152 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
153 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700154 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800155 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700156 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800157 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700158 } else if (strcmp(field->key, "timeout") == 0) {
159 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800160 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700161 }
162 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700163 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800164 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700165 value->timeout = timeout;
166 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700167 return value;
168}
169
Alexander Polcync3b1f182017-04-18 13:51:36 -0700170struct external_connectivity_watcher;
171
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700172/*************************************************************************
173 * CHANNEL-WIDE FUNCTIONS
174 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175
Craig Tiller800dacb2015-10-06 09:10:26 -0700176typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700177 /** resolver for this channel */
178 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700179 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700180 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700181 /** is deadline checking enabled? */
182 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700183 /** client channel factory */
184 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700185
Craig Tillerbefafe62017-02-09 11:30:54 -0800186 /** combiner protecting all variables below in this data structure */
187 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700188 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700189 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800190 /** retry throttle data */
191 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700192 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800193 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700194 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700195 grpc_channel_args *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -0700196 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -0700197 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700198 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700199 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700200 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700201 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700202 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700203 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800204 /** owning stack */
205 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800206 /** interested parties (owned) */
207 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800208
Alexander Polcync3b1f182017-04-18 13:51:36 -0700209 /* external_connectivity_watcher_list head is guarded by its own mutex, since
210 * counts need to be grabbed immediately without polling on a cq */
211 gpr_mu external_connectivity_watcher_list_mu;
212 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
213
Craig Tiller613dafa2017-02-09 12:00:43 -0800214 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800215 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800216 gpr_mu info_mu;
217 char *info_lb_policy_name;
218 /** service config in JSON form */
219 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220} channel_data;
221
Craig Tillerd6c98df2015-08-18 09:33:44 -0700222/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700223 resolver, to watch for state changes from the lb_policy. When a state
224 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700225typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700226 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700227 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700228 grpc_connectivity_state state;
229 grpc_lb_policy *lb_policy;
230} lb_policy_connectivity_watcher;
231
Craig Tiller2400bf52017-02-09 16:25:19 -0800232static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
233 grpc_lb_policy *lb_policy,
234 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700235
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800236static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
237 channel_data *chand,
238 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700239 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800240 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700241 /* TODO: Improve failure handling:
242 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
243 * - Hand over pending picks from old policies during the switch that happens
244 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700245 if (chand->lb_policy != NULL) {
246 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
247 /* cancel picks with wait_for_ready=false */
248 grpc_lb_policy_cancel_picks_locked(
249 exec_ctx, chand->lb_policy,
250 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
251 /* check= */ 0, GRPC_ERROR_REF(error));
252 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
253 /* cancel all picks */
254 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
255 /* mask= */ 0, /* check= */ 0,
256 GRPC_ERROR_REF(error));
257 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800258 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700259 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
260 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800261}
262
Craig Tiller804ff712016-05-05 16:25:40 -0700263static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800264 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700265 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700266 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800267 /* check if the notification is for the latest policy */
268 if (w->lb_policy == w->chand->lb_policy) {
269 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
270 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800271 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800272 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
273 w->chand->lb_policy = NULL;
274 }
275 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
276 GRPC_ERROR_REF(error), "lb_changed");
277 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800278 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800279 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800280 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700281
Craig Tiller906e3bc2015-11-24 07:31:31 -0800282 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700283 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700284}
285
Craig Tiller2400bf52017-02-09 16:25:19 -0800286static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
287 grpc_lb_policy *lb_policy,
288 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700289 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800290 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700291
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700292 w->chand = chand;
Craig Tillerbefafe62017-02-09 11:30:54 -0800293 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
294 grpc_combiner_scheduler(chand->combiner, false));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700295 w->state = current_state;
296 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800297 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
298 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700299}
300
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800301typedef struct {
302 char *server_name;
303 grpc_server_retry_throttle_data *retry_throttle_data;
304} service_config_parsing_state;
305
306static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
307 service_config_parsing_state *parsing_state = arg;
308 if (strcmp(field->key, "retryThrottling") == 0) {
309 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
310 if (field->type != GRPC_JSON_OBJECT) return;
311 int max_milli_tokens = 0;
312 int milli_token_ratio = 0;
313 for (grpc_json *sub_field = field->child; sub_field != NULL;
314 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800315 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800316 if (strcmp(sub_field->key, "maxTokens") == 0) {
317 if (max_milli_tokens != 0) return; // Duplicate.
318 if (sub_field->type != GRPC_JSON_NUMBER) return;
319 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
320 if (max_milli_tokens == -1) return;
321 max_milli_tokens *= 1000;
322 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
323 if (milli_token_ratio != 0) return; // Duplicate.
324 if (sub_field->type != GRPC_JSON_NUMBER) return;
325 // We support up to 3 decimal digits.
326 size_t whole_len = strlen(sub_field->value);
327 uint32_t multiplier = 1;
328 uint32_t decimal_value = 0;
329 const char *decimal_point = strchr(sub_field->value, '.');
330 if (decimal_point != NULL) {
331 whole_len = (size_t)(decimal_point - sub_field->value);
332 multiplier = 1000;
333 size_t decimal_len = strlen(decimal_point + 1);
334 if (decimal_len > 3) decimal_len = 3;
335 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
336 &decimal_value)) {
337 return;
338 }
339 uint32_t decimal_multiplier = 1;
340 for (size_t i = 0; i < (3 - decimal_len); ++i) {
341 decimal_multiplier *= 10;
342 }
343 decimal_value *= decimal_multiplier;
344 }
345 uint32_t whole_value;
346 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
347 &whole_value)) {
348 return;
349 }
350 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800351 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800352 }
353 }
354 parsing_state->retry_throttle_data =
355 grpc_retry_throttle_map_get_data_for_server(
356 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
357 }
358}
359
David Garcia Quintas956f7002017-04-13 15:40:06 -0700360// Wrap a closure associated with \a lb_policy. The associated callback (\a
361// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
362// scheduling \a wrapped_closure.
363typedef struct wrapped_on_pick_closure_arg {
364 /* the closure instance using this struct as argument */
365 grpc_closure wrapper_closure;
366
367 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
368 * calls against the internal RR instance, respectively. */
369 grpc_closure *wrapped_closure;
370
371 /* The policy instance related to the closure */
372 grpc_lb_policy *lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -0700373} wrapped_on_pick_closure_arg;
374
David Garcia Quintas37251282017-04-14 13:46:03 -0700375// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
David Garcia Quintas956f7002017-04-13 15:40:06 -0700376static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
377 grpc_error *error) {
378 wrapped_on_pick_closure_arg *wc_arg = arg;
David Garcia Quintas37251282017-04-14 13:46:03 -0700379 GPR_ASSERT(wc_arg != NULL);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700380 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
381 GPR_ASSERT(wc_arg->lb_policy != NULL);
David Garcia Quintas37251282017-04-14 13:46:03 -0700382 grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700383 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -0700384 gpr_free(wc_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -0700385}
386
Craig Tillerbefafe62017-02-09 11:30:54 -0800387static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
388 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700389 channel_data *chand = arg;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700390 char *lb_policy_name = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700391 grpc_lb_policy *lb_policy = NULL;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700392 grpc_lb_policy *old_lb_policy = NULL;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800393 grpc_slice_hash_table *method_params_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700394 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700395 bool exit_idle = false;
ncteisen4b36a3d2017-03-13 19:08:06 -0700396 grpc_error *state_error =
397 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800398 char *service_config_json = NULL;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800399 service_config_parsing_state parsing_state;
400 memset(&parsing_state, 0, sizeof(parsing_state));
Craig Tiller3f475422015-06-25 10:43:05 -0700401
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700402 bool lb_policy_updated = false;
Mark D. Roth046cf762016-09-26 11:13:51 -0700403 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700404 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700405 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700406 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700407 if (channel_arg != NULL) {
408 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
409 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700410 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700411 // Special case: If at least one balancer address is present, we use
412 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700413 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700414 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700415 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700416 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700417 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700418 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700419 if (addresses->addresses[i].is_balancer) {
420 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700421 break;
422 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700423 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700424 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700425 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
426 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700427 "resolver requested LB policy %s but provided at least one "
428 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700429 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700430 }
431 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700432 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700433 }
434 // Use pick_first if nothing was specified and we didn't select grpclb
435 // above.
436 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700437 // Instantiate LB policy.
438 grpc_lb_policy_args lb_policy_args;
439 lb_policy_args.args = chand->resolver_result;
440 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800441 lb_policy_args.combiner = chand->combiner;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700442
443 const bool lb_policy_type_changed =
444 (chand->info_lb_policy_name == NULL) ||
445 (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
446 if (chand->lb_policy != NULL && !lb_policy_type_changed) {
447 // update
448 lb_policy_updated = true;
449 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
450 } else {
451 lb_policy =
452 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
453 if (lb_policy != NULL) {
454 GRPC_LB_POLICY_REF(lb_policy, "config_change");
455 GRPC_ERROR_UNREF(state_error);
456 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
457 &state_error);
458 old_lb_policy = chand->lb_policy;
459 chand->lb_policy = lb_policy;
460 }
Craig Tiller45724b32015-09-22 10:42:19 -0700461 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700462
Mark D. Roth41124992016-11-03 11:22:20 -0700463 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700464 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700465 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700466 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700467 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800468 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700469 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800470 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700471 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800472 channel_arg =
473 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
474 GPR_ASSERT(channel_arg != NULL);
475 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700476 grpc_uri *uri =
477 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800478 GPR_ASSERT(uri->path[0] != '\0');
479 parsing_state.server_name =
480 uri->path[0] == '/' ? uri->path + 1 : uri->path;
481 grpc_service_config_parse_global_params(
482 service_config, parse_retry_throttle_params, &parsing_state);
483 parsing_state.server_name = NULL;
484 grpc_uri_destroy(uri);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700485 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800486 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700487 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700488 grpc_service_config_destroy(service_config);
489 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700490 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700491 // Before we clean up, save a copy of lb_policy_name, since it might
492 // be pointing to data inside chand->resolver_result.
493 // The copy will be saved in chand->lb_policy_name below.
494 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800495 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700496 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700497 }
498
Craig Tiller86c99582015-11-25 15:22:26 -0800499 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800500 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
501 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800502 }
503
Craig Tiller613dafa2017-02-09 12:00:43 -0800504 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700505 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800506 gpr_free(chand->info_lb_policy_name);
507 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700508 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800509 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800510 gpr_free(chand->info_service_config_json);
511 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800512 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800513 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800514
515 if (chand->retry_throttle_data != NULL) {
516 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
517 }
518 chand->retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700519 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800520 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700521 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700522 chand->method_params_table = method_params_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700523 if (lb_policy != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800524 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller0ede5452016-04-23 12:21:45 -0700525 } else if (chand->resolver == NULL /* disconnected */) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700526 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
527 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
528 "Channel disconnected", &error, 1));
Craig Tiller91031da2016-12-28 15:44:25 -0800529 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tillera82950e2015-09-22 12:33:20 -0700530 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700531 if (!lb_policy_updated && lb_policy != NULL &&
532 chand->exit_idle_when_lb_policy_arrives) {
Craig Tillera82950e2015-09-22 12:33:20 -0700533 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700534 exit_idle = true;
535 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700536 }
Craig Tiller98465032015-06-29 14:36:42 -0700537
Craig Tiller804ff712016-05-05 16:25:40 -0700538 if (error == GRPC_ERROR_NONE && chand->resolver) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700539 if (!lb_policy_updated) {
540 set_channel_connectivity_state_locked(exec_ctx, chand, state,
541 GRPC_ERROR_REF(state_error),
542 "new_lb+resolver");
543 if (lb_policy != NULL) {
544 watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
545 }
Craig Tiller45724b32015-09-22 10:42:19 -0700546 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800547 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -0800548 grpc_resolver_next_locked(exec_ctx, chand->resolver,
549 &chand->resolver_result,
550 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700551 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800552 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800553 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800554 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
555 chand->resolver = NULL;
556 }
Craig Tiller804ff712016-05-05 16:25:40 -0700557 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800558 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700559 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700560 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
561 "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)),
Craig Tiller804ff712016-05-05 16:25:40 -0700562 "resolver_gone");
Craig Tillera82950e2015-09-22 12:33:20 -0700563 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700564
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700565 if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800566 grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
Craig Tillera82950e2015-09-22 12:33:20 -0700567 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
568 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700569
Craig Tillera82950e2015-09-22 12:33:20 -0700570 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800571 grpc_pollset_set_del_pollset_set(
572 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700573 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700574 old_lb_policy = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700575 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700576
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700577 if (!lb_policy_updated && lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700578 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
579 }
Craig Tiller45724b32015-09-22 10:42:19 -0700580
Craig Tiller906e3bc2015-11-24 07:31:31 -0800581 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700582 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700583}
584
Craig Tillera8610c02017-02-14 10:05:11 -0800585static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
586 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800587 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800588 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700589 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700590
Craig Tillera82950e2015-09-22 12:33:20 -0700591 if (op->on_connectivity_state_change != NULL) {
592 grpc_connectivity_state_notify_on_state_change(
593 exec_ctx, &chand->state_tracker, op->connectivity_state,
594 op->on_connectivity_state_change);
595 op->on_connectivity_state_change = NULL;
596 op->connectivity_state = NULL;
597 }
598
Craig Tiller26dab312015-12-07 14:43:47 -0800599 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800600 if (chand->lb_policy == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700601 grpc_closure_sched(
602 exec_ctx, op->send_ping,
603 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800604 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800605 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800606 op->bind_pollset = NULL;
607 }
608 op->send_ping = NULL;
609 }
610
Craig Tiller1c51edc2016-05-07 16:18:43 -0700611 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
612 if (chand->resolver != NULL) {
613 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700614 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700615 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800616 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700617 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
618 chand->resolver = NULL;
619 if (!chand->started_resolving) {
620 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
621 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller91031da2016-12-28 15:44:25 -0800622 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700623 }
624 if (chand->lb_policy != NULL) {
625 grpc_pollset_set_del_pollset_set(exec_ctx,
626 chand->lb_policy->interested_parties,
627 chand->interested_parties);
628 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
629 chand->lb_policy = NULL;
630 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700631 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700632 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700633 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800634 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
635
636 grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800637}
638
639static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
640 grpc_channel_element *elem,
641 grpc_transport_op *op) {
642 channel_data *chand = elem->channel_data;
643
Craig Tillerbefafe62017-02-09 11:30:54 -0800644 GPR_ASSERT(op->set_accept_stream == false);
645 if (op->bind_pollset != NULL) {
646 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
647 op->bind_pollset);
648 }
649
Craig Tillerc55c1022017-03-10 10:26:42 -0800650 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800651 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
Craig Tillerbefafe62017-02-09 11:30:54 -0800652 grpc_closure_sched(
Craig Tillerc55c1022017-03-10 10:26:42 -0800653 exec_ctx,
654 grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
655 op, grpc_combiner_scheduler(chand->combiner, false)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800656 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700657}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800658
Mark D. Rothb2d24882016-10-27 15:44:07 -0700659static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
660 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700661 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700662 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800663 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700664 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800665 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700666 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800667 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700668 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800669 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800670 *info->service_config_json =
671 chand->info_service_config_json == NULL
672 ? NULL
673 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800674 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800675 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700676}
677
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700678/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800679static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800680 grpc_channel_element *elem,
681 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700682 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700683 GPR_ASSERT(args->is_last);
684 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800685 // Initialize data members.
Craig Tillerbefafe62017-02-09 11:30:54 -0800686 chand->combiner = grpc_combiner_create(NULL);
Craig Tillerd85477512017-02-09 12:02:39 -0800687 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700688 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
689
690 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
691 chand->external_connectivity_watcher_list_head = NULL;
692 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
693
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800694 chand->owning_stack = args->channel_stack;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700695 grpc_closure_init(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800696 on_resolver_result_changed_locked, chand,
697 grpc_combiner_scheduler(chand->combiner, false));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800698 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700699 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
700 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800701 // Record client channel factory.
702 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
703 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700704 if (arg == NULL) {
705 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
706 "Missing client channel factory in args for client channel filter");
707 }
708 if (arg->type != GRPC_ARG_POINTER) {
709 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
710 "client channel factory arg must be a pointer");
711 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800712 grpc_client_channel_factory_ref(arg->value.pointer.p);
713 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800714 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800715 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700716 if (arg == NULL) {
717 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
718 "Missing server uri in args for client channel filter");
719 }
720 if (arg->type != GRPC_ARG_STRING) {
721 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
722 "server uri arg must be a string");
723 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800724 char *proxy_name = NULL;
725 grpc_channel_args *new_args = NULL;
726 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
727 &proxy_name, &new_args);
728 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800729 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800730 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
731 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800732 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800733 if (proxy_name != NULL) gpr_free(proxy_name);
734 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800735 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700736 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800737 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700738 chand->deadline_checking_enabled =
739 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800740 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700741}
742
Craig Tiller972470b2017-02-09 15:05:36 -0800743static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
744 grpc_error *error) {
745 grpc_resolver *resolver = arg;
746 grpc_resolver_shutdown_locked(exec_ctx, resolver);
747 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
748}
749
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700750/* Destructor for channel_data */
751static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
752 grpc_channel_element *elem) {
753 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700754 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800755 grpc_closure_sched(
756 exec_ctx,
757 grpc_closure_create(shutdown_resolver_locked, chand->resolver,
758 grpc_combiner_scheduler(chand->combiner, false)),
759 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700760 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700761 if (chand->client_channel_factory != NULL) {
762 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
763 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700764 if (chand->lb_policy != NULL) {
765 grpc_pollset_set_del_pollset_set(exec_ctx,
766 chand->lb_policy->interested_parties,
767 chand->interested_parties);
768 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
769 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800770 gpr_free(chand->info_lb_policy_name);
771 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800772 if (chand->retry_throttle_data != NULL) {
773 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
774 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700775 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800776 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700777 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700778 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800779 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800780 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800781 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700782 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700783}
784
785/*************************************************************************
786 * PER-CALL FUNCTIONS
787 */
788
789#define GET_CALL(call_data) \
790 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
791
792#define CANCELLED_CALL ((grpc_subchannel_call *)1)
793
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700794/** Call data. Holds a pointer to grpc_subchannel_call and the
795 associated machinery to create such a pointer.
796 Handles queueing of stream ops until a call object is ready, waiting
797 for initial metadata before trying to create a call object,
798 and handling cancellation gracefully. */
799typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700800 // State for handling deadlines.
801 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700802 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
803 // and this struct both independently store a pointer to the call
804 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700805 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700806 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700807
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800808 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700809 gpr_timespec call_start_time;
810 gpr_timespec deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700811 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800812 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700813
Mark D. Rothf28763c2016-09-14 15:18:40 -0700814 grpc_error *cancel_error;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700815
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700816 /** either 0 for no call, 1 for cancelled, or a pointer to a
817 grpc_subchannel_call */
818 gpr_atm subchannel_call;
Craig Tillerd426cac2017-03-13 12:30:45 -0700819 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700820
Mark D. Roth64a317c2017-05-02 08:27:08 -0700821 bool pick_pending;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700822 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700823 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700824 grpc_polling_entity *pollent;
825
Craig Tillera0f3abd2017-03-31 15:42:16 -0700826 grpc_transport_stream_op_batch **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700827 size_t waiting_ops_count;
828 size_t waiting_ops_capacity;
829
830 grpc_closure next_step;
831
832 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200833
834 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800835
836 grpc_closure on_complete;
837 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700838} call_data;
839
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800840grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
841 grpc_call_element *call_elem) {
842 grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
843 return scc == CANCELLED_CALL ? NULL : scc;
844}
845
Craig Tillere1b51da2017-03-31 15:44:33 -0700846static void add_waiting_locked(call_data *calld,
847 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700848 GPR_TIMER_BEGIN("add_waiting_locked", 0);
849 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
850 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
851 calld->waiting_ops =
852 gpr_realloc(calld->waiting_ops,
853 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
854 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700855 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700856 GPR_TIMER_END("add_waiting_locked", 0);
857}
858
859static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
860 grpc_error *error) {
861 size_t i;
862 for (i = 0; i < calld->waiting_ops_count; i++) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700863 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700864 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700865 }
866 calld->waiting_ops_count = 0;
867 GRPC_ERROR_UNREF(error);
868}
869
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700870static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700871 if (calld->waiting_ops_count == 0) {
872 return;
873 }
874
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800875 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -0700876 grpc_transport_stream_op_batch **ops = calld->waiting_ops;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800877 size_t nops = calld->waiting_ops_count;
878 if (call == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700879 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
880 return;
881 }
882 calld->waiting_ops = NULL;
883 calld->waiting_ops_count = 0;
884 calld->waiting_ops_capacity = 0;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800885 for (size_t i = 0; i < nops; i++) {
886 grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
887 }
Craig Tiller9efea882017-02-09 13:06:52 -0800888 gpr_free(ops);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700889}
890
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700891// Sets calld->method_params and calld->retry_throttle_data.
Craig Tiller11c17d42017-03-13 13:36:34 -0700892// If the method params specify a timeout, populates
893// *per_method_deadline and returns true.
894static bool set_call_method_params_from_service_config_locked(
895 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
896 gpr_timespec *per_method_deadline) {
897 channel_data *chand = elem->channel_data;
898 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700899 if (chand->retry_throttle_data != NULL) {
900 calld->retry_throttle_data =
901 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
902 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700903 if (chand->method_params_table != NULL) {
904 calld->method_params = grpc_method_config_table_get(
905 exec_ctx, chand->method_params_table, calld->path);
906 if (calld->method_params != NULL) {
907 method_parameters_ref(calld->method_params);
908 if (gpr_time_cmp(calld->method_params->timeout,
909 gpr_time_0(GPR_TIMESPAN)) != 0) {
910 *per_method_deadline =
911 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
912 return true;
913 }
914 }
915 }
916 return false;
917}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700918
Craig Tiller11c17d42017-03-13 13:36:34 -0700919static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
920 grpc_call_element *elem) {
921 /* apply service-config level configuration to the call (now that we're
922 * certain it exists) */
923 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700924 channel_data *chand = elem->channel_data;
Craig Tiller11c17d42017-03-13 13:36:34 -0700925 gpr_timespec per_method_deadline;
926 if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
927 &per_method_deadline)) {
928 // If the deadline from the service config is shorter than the one
929 // from the client API, reset the deadline timer.
Craig Tiller3be7dd02017-04-03 14:30:03 -0700930 if (chand->deadline_checking_enabled &&
931 gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700932 calld->deadline = per_method_deadline;
933 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
934 }
935 }
936}
937
Craig Tillerbefafe62017-02-09 11:30:54 -0800938static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
939 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700940 grpc_call_element *elem = arg;
941 call_data *calld = elem->call_data;
942 channel_data *chand = elem->channel_data;
Mark D. Roth64a317c2017-05-02 08:27:08 -0700943 GPR_ASSERT(calld->pick_pending);
944 calld->pick_pending = false;
Yuchen Zeng19656b12016-09-01 18:00:45 -0700945 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
946 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700947 if (calld->connected_subchannel == NULL) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700948 gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL);
ncteisen4b36a3d2017-03-13 19:08:06 -0700949 fail_locked(exec_ctx, calld,
Mark D. Rothd7389b42017-05-17 12:22:17 -0700950 error == GRPC_ERROR_NONE
951 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
952 "Call dropped by load balancing policy")
953 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
954 "Failed to create subchannel", &error, 1));
Mark D. Roth72f6da82016-09-02 13:42:38 -0700955 } else if (GET_CALL(calld) == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700956 /* already cancelled before subchannel became ready */
ncteisen4b36a3d2017-03-13 19:08:06 -0700957 grpc_error *cancellation_error =
958 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
959 "Cancelled before creating subchannel", &error, 1);
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800960 /* if due to deadline, attach the deadline exceeded status to the error */
961 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
962 cancellation_error =
963 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
964 GRPC_STATUS_DEADLINE_EXCEEDED);
965 }
966 fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700967 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700968 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700969 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -0700970 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -0700971 .pollent = calld->pollent,
972 .path = calld->path,
973 .start_time = calld->call_start_time,
974 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700975 .arena = calld->arena,
976 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700977 grpc_error *new_error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -0700978 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700979 gpr_atm_rel_store(&calld->subchannel_call,
980 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth61a63982017-04-05 09:56:12 -0700981 if (new_error != GRPC_ERROR_NONE) {
982 new_error = grpc_error_add_child(new_error, error);
983 fail_locked(exec_ctx, calld, new_error);
984 } else {
985 retry_waiting_locked(exec_ctx, calld);
986 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700987 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700988 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
989}
990
991static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
992 call_data *calld = elem->call_data;
993 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
994 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
995 return NULL;
996 } else {
997 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
998 }
999}
1000
Craig Tiller577c9b22015-11-02 14:11:15 -08001001typedef struct {
1002 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -08001003 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -08001004 grpc_connected_subchannel **connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001005 grpc_call_context_element *subchannel_call_context;
Craig Tiller577c9b22015-11-02 14:11:15 -08001006 grpc_closure *on_ready;
1007 grpc_call_element *elem;
1008 grpc_closure closure;
1009} continue_picking_args;
1010
Yuchen Zeng144ce652016-09-01 18:19:34 -07001011/** Return true if subchannel is available immediately (in which case on_ready
1012 should not be called), or false otherwise (in which case on_ready should be
1013 called when the subchannel is available). */
Craig Tillerbefafe62017-02-09 11:30:54 -08001014static bool pick_subchannel_locked(
1015 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1016 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001017 grpc_connected_subchannel **connected_subchannel,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -07001018 grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready);
Craig Tiller577c9b22015-11-02 14:11:15 -08001019
Craig Tillerbefafe62017-02-09 11:30:54 -08001020static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
1021 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001022 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -07001023 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001024 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -07001025 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller91031da2016-12-28 15:44:25 -08001026 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001027 } else {
Mark D. Rothb9b0efd2017-05-04 07:57:43 -07001028 if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
1029 cpa->initial_metadata_flags,
1030 cpa->connected_subchannel,
1031 cpa->subchannel_call_context, cpa->on_ready)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001032 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07001033 }
Craig Tiller577c9b22015-11-02 14:11:15 -08001034 }
1035 gpr_free(cpa);
1036}
1037
Mark D. Roth64a317c2017-05-02 08:27:08 -07001038static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1039 grpc_error *error) {
1040 channel_data *chand = elem->channel_data;
1041 call_data *calld = elem->call_data;
1042 if (chand->lb_policy != NULL) {
1043 grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
1044 &calld->connected_subchannel,
1045 GRPC_ERROR_REF(error));
1046 }
1047 for (grpc_closure *closure = chand->waiting_for_config_closures.head;
1048 closure != NULL; closure = closure->next_data.next) {
1049 continue_picking_args *cpa = closure->cb_arg;
1050 if (cpa->connected_subchannel == &calld->connected_subchannel) {
1051 cpa->connected_subchannel = NULL;
1052 grpc_closure_sched(exec_ctx, cpa->on_ready,
1053 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1054 "Pick cancelled", &error, 1));
1055 }
1056 }
1057 GRPC_ERROR_UNREF(error);
1058}
1059
Craig Tillerbefafe62017-02-09 11:30:54 -08001060static bool pick_subchannel_locked(
1061 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1062 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001063 grpc_connected_subchannel **connected_subchannel,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -07001064 grpc_call_context_element *subchannel_call_context,
1065 grpc_closure *on_ready) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001066 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001067
Craig Tiller577c9b22015-11-02 14:11:15 -08001068 channel_data *chand = elem->channel_data;
1069 call_data *calld = elem->call_data;
Craig Tiller577c9b22015-11-02 14:11:15 -08001070
Craig Tillerb5585d42015-11-17 07:18:31 -08001071 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -08001072
Craig Tiller577c9b22015-11-02 14:11:15 -08001073 if (chand->lb_policy != NULL) {
Craig Tiller2e0788a2017-03-14 06:55:44 -07001074 apply_final_configuration_locked(exec_ctx, elem);
Craig Tiller86c0f8a2015-12-01 20:05:40 -08001075 grpc_lb_policy *lb_policy = chand->lb_policy;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001076 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothe40dd292016-10-05 14:58:37 -07001077 // If the application explicitly set wait_for_ready, use that.
1078 // Otherwise, if the service config specified a value for this
1079 // method, use that.
Mark D. Rothc1c38582016-10-11 11:03:27 -07001080 const bool wait_for_ready_set_from_api =
1081 initial_metadata_flags &
1082 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1083 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001084 calld->method_params != NULL &&
1085 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001086 if (!wait_for_ready_set_from_api &&
1087 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001088 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001089 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1090 } else {
1091 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1092 }
1093 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001094 const grpc_lb_policy_pick_args inputs = {
Mark D. Roth64d922a2017-05-03 12:52:04 -07001095 initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem};
David Garcia Quintas956f7002017-04-13 15:40:06 -07001096
1097 // Wrap the user-provided callback in order to hold a strong reference to
1098 // the LB policy for the duration of the pick.
1099 wrapped_on_pick_closure_arg *w_on_pick_arg =
1100 gpr_zalloc(sizeof(*w_on_pick_arg));
1101 grpc_closure_init(&w_on_pick_arg->wrapper_closure,
1102 wrapped_on_pick_closure_cb, w_on_pick_arg,
1103 grpc_schedule_on_exec_ctx);
1104 w_on_pick_arg->wrapped_closure = on_ready;
1105 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
1106 w_on_pick_arg->lb_policy = lb_policy;
David Garcia Quintas956f7002017-04-13 15:40:06 -07001107 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Roth09e458c2017-05-02 08:13:26 -07001108 exec_ctx, lb_policy, &inputs, connected_subchannel,
1109 subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001110 if (pick_done) {
1111 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
1112 GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
1113 "pick_subchannel_wrapping");
David Garcia Quintas37251282017-04-14 13:46:03 -07001114 gpr_free(w_on_pick_arg);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001115 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001116 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
1117 GPR_TIMER_END("pick_subchannel", 0);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001118 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001119 }
1120 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001121 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -08001122 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -08001123 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1124 &chand->resolver_result,
1125 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -08001126 }
Craig Tiller0eab6972016-04-23 12:59:57 -07001127 if (chand->resolver != NULL) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001128 continue_picking_args *cpa = gpr_malloc(sizeof(*cpa));
Craig Tiller0eab6972016-04-23 12:59:57 -07001129 cpa->initial_metadata = initial_metadata;
1130 cpa->initial_metadata_flags = initial_metadata_flags;
1131 cpa->connected_subchannel = connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001132 cpa->subchannel_call_context = subchannel_call_context;
Craig Tiller0eab6972016-04-23 12:59:57 -07001133 cpa->on_ready = on_ready;
1134 cpa->elem = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001135 grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
1136 grpc_combiner_scheduler(chand->combiner, true));
Craig Tiller804ff712016-05-05 16:25:40 -07001137 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
1138 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -07001139 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -07001140 grpc_closure_sched(exec_ctx, on_ready,
1141 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001142 }
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001143
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001144 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001145 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001146}
1147
Craig Tillere1b51da2017-03-31 15:44:33 -07001148static void start_transport_stream_op_batch_locked_inner(
1149 grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
1150 grpc_call_element *elem) {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001151 channel_data *chand = elem->channel_data;
Craig Tillera11bfc82017-02-14 09:56:33 -08001152 call_data *calld = elem->call_data;
Craig Tillerbefafe62017-02-09 11:30:54 -08001153 grpc_subchannel_call *call;
1154
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001155 /* need to recheck that another thread hasn't set the call */
1156 call = GET_CALL(calld);
1157 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001158 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Rothf28763c2016-09-14 15:18:40 -07001159 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001160 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001161 return;
1162 }
1163 if (call != NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001164 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001165 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001166 return;
1167 }
1168 /* if this is a cancellation, then we can raise our cancelled flag */
Craig Tillerc55c1022017-03-10 10:26:42 -08001169 if (op->cancel_stream) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001170 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
1171 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tillera11bfc82017-02-14 09:56:33 -08001172 /* recurse to retry */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001173 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001174 /* early out */
1175 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001176 } else {
Craig Tillerbe9691a2017-02-14 10:00:42 -08001177 /* Stash a copy of cancel_error in our call data, so that we can use
1178 it for subsequent operations. This ensures that if the call is
1179 cancelled before any ops are passed down (e.g., if the deadline
1180 is in the past when the call starts), we can return the right
1181 error to the caller when the first op does get passed down. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001182 calld->cancel_error =
1183 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
Mark D. Roth64a317c2017-05-02 08:27:08 -07001184 if (calld->pick_pending) {
1185 cancel_pick_locked(
1186 exec_ctx, elem,
1187 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
1188 } else {
1189 fail_locked(exec_ctx, calld,
1190 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001191 }
Craig Tillera0f3abd2017-03-31 15:42:16 -07001192 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tillerc55c1022017-03-10 10:26:42 -08001193 exec_ctx, op,
1194 GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -08001195 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001196 return;
1197 }
1198 }
1199 /* if we don't have a subchannel, try to get one */
Mark D. Roth64a317c2017-05-02 08:27:08 -07001200 if (!calld->pick_pending && calld->connected_subchannel == NULL &&
1201 op->send_initial_metadata) {
1202 calld->pick_pending = true;
Craig Tillerbefafe62017-02-09 11:30:54 -08001203 grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
1204 grpc_combiner_scheduler(chand->combiner, true));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001205 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001206 /* If a subchannel is not available immediately, the polling entity from
1207 call_data should be provided to channel_data's interested_parties, so
1208 that IO of the lb_policy and resolver could be done under it. */
Craig Tillerc55c1022017-03-10 10:26:42 -08001209 if (pick_subchannel_locked(
1210 exec_ctx, elem,
1211 op->payload->send_initial_metadata.send_initial_metadata,
1212 op->payload->send_initial_metadata.send_initial_metadata_flags,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001213 &calld->connected_subchannel, calld->subchannel_call_context,
Mark D. Rothb7e6fa52017-05-02 15:01:29 -07001214 &calld->next_step)) {
Mark D. Roth64a317c2017-05-02 08:27:08 -07001215 calld->pick_pending = false;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001216 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Rothd7389b42017-05-17 12:22:17 -07001217 if (calld->connected_subchannel == NULL) {
1218 gpr_atm_no_barrier_store(&calld->subchannel_call,
1219 (gpr_atm)CANCELLED_CALL);
1220 grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1221 "Call dropped by load balancing policy");
1222 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
1223 grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
1224 return; // Early out.
1225 }
Yuchen Zeng19656b12016-09-01 18:00:45 -07001226 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001227 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1228 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001229 }
1230 }
1231 /* if we've got a subchannel, then let's ask it to create a call */
Mark D. Roth64a317c2017-05-02 08:27:08 -07001232 if (!calld->pick_pending && calld->connected_subchannel != NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001233 grpc_subchannel_call *subchannel_call = NULL;
Craig Tillerfb9d1122017-03-14 09:26:27 -07001234 const grpc_connected_subchannel_call_args call_args = {
Craig Tillerd426cac2017-03-13 12:30:45 -07001235 .pollent = calld->pollent,
1236 .path = calld->path,
1237 .start_time = calld->call_start_time,
1238 .deadline = calld->deadline,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001239 .arena = calld->arena,
1240 .context = calld->subchannel_call_context};
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001241 grpc_error *error = grpc_connected_subchannel_create_call(
Craig Tillerd426cac2017-03-13 12:30:45 -07001242 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001243 gpr_atm_rel_store(&calld->subchannel_call,
1244 (gpr_atm)(uintptr_t)subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001245 if (error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001246 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001247 grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
Mark D. Rothca136032017-04-04 13:53:29 -07001248 } else {
Mark D. Rothca136032017-04-04 13:53:29 -07001249 retry_waiting_locked(exec_ctx, calld);
1250 /* recurse to retry */
1251 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001252 }
Craig Tillera11bfc82017-02-14 09:56:33 -08001253 /* early out */
1254 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001255 }
1256 /* nothing to be done but wait */
1257 add_waiting_locked(calld, op);
Craig Tillera11bfc82017-02-14 09:56:33 -08001258}
1259
Mark D. Rothde144102017-03-15 10:11:03 -07001260static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001261 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001262 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001263 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001264 if (error == GRPC_ERROR_NONE) {
1265 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001266 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001267 } else {
1268 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001269 // decide whether or not to retry. Note that we should only
1270 // record failures whose statuses match the configured retryable
1271 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001272 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001273 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001274 }
1275 }
Mark D. Roth95039b52017-02-24 07:59:45 -08001276 grpc_closure_run(exec_ctx, calld->original_on_complete,
1277 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001278}
1279
Craig Tillere1b51da2017-03-31 15:44:33 -07001280static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1281 void *arg,
1282 grpc_error *error_ignored) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001283 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001284
Craig Tillera0f3abd2017-03-31 15:42:16 -07001285 grpc_transport_stream_op_batch *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -08001286 grpc_call_element *elem = op->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001287 call_data *calld = elem->call_data;
1288
Craig Tillerf6cb0c02017-03-28 14:11:49 -07001289 if (op->recv_trailing_metadata) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001290 GPR_ASSERT(op->on_complete != NULL);
1291 calld->original_on_complete = op->on_complete;
Mark D. Rothde144102017-03-15 10:11:03 -07001292 grpc_closure_init(&calld->on_complete, on_complete, elem,
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001293 grpc_schedule_on_exec_ctx);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001294 op->on_complete = &calld->on_complete;
1295 }
1296
Craig Tillera0f3abd2017-03-31 15:42:16 -07001297 start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -08001298
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001299 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001300 "start_transport_stream_op_batch");
1301 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillerbefafe62017-02-09 11:30:54 -08001302}
1303
Craig Tillerbe9691a2017-02-14 10:00:42 -08001304/* The logic here is fairly complicated, due to (a) the fact that we
1305 need to handle the case where we receive the send op before the
1306 initial metadata op, and (b) the need for efficiency, especially in
1307 the streaming case.
1308
1309 We use double-checked locking to initially see if initialization has been
1310 performed. If it has not, we acquire the combiner and perform initialization.
1311 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001312static void cc_start_transport_stream_op_batch(
1313 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1314 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001315 call_data *calld = elem->call_data;
1316 channel_data *chand = elem->channel_data;
1317 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001318 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001319 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
1320 op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001321 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001322 /* try to (atomically) get the call */
1323 grpc_subchannel_call *call = GET_CALL(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001324 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001325 if (call == CANCELLED_CALL) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001326 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001327 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001328 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001329 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001330 return;
1331 }
1332 if (call != NULL) {
1333 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001334 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001335 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001336 return;
1337 }
1338 /* we failed; lock and figure out what to do */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001339 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Craig Tillerc55c1022017-03-10 10:26:42 -08001340 op->handler_private.extra_arg = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001341 grpc_closure_sched(
1342 exec_ctx,
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001343 grpc_closure_init(&op->handler_private.closure,
Craig Tillera0f3abd2017-03-31 15:42:16 -07001344 start_transport_stream_op_batch_locked, op,
Craig Tillerbefafe62017-02-09 11:30:54 -08001345 grpc_combiner_scheduler(chand->combiner, false)),
1346 GRPC_ERROR_NONE);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001347 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001348}
1349
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001350/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001351static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1352 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001353 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001354 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001355 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001356 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001357 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001358 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001359 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001360 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001361 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001362 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001363 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001364 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001365 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001366}
1367
1368/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001369static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1370 grpc_call_element *elem,
1371 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001372 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001373 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001374 channel_data *chand = elem->channel_data;
1375 if (chand->deadline_checking_enabled) {
1376 grpc_deadline_state_destroy(exec_ctx, elem);
1377 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001378 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001379 if (calld->method_params != NULL) {
1380 method_parameters_unref(calld->method_params);
1381 }
Mark D. Rothf28763c2016-09-14 15:18:40 -07001382 GRPC_ERROR_UNREF(calld->cancel_error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001383 grpc_subchannel_call *call = GET_CALL(calld);
1384 if (call != NULL && call != CANCELLED_CALL) {
Craig Tillerd426cac2017-03-13 12:30:45 -07001385 grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
1386 then_schedule_closure = NULL;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001387 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
1388 }
Mark D. Roth64a317c2017-05-02 08:27:08 -07001389 GPR_ASSERT(!calld->pick_pending);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001390 GPR_ASSERT(calld->waiting_ops_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001391 if (calld->connected_subchannel != NULL) {
1392 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1393 "picked");
1394 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001395 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1396 if (calld->subchannel_call_context[i].value != NULL) {
1397 calld->subchannel_call_context[i].destroy(
1398 calld->subchannel_call_context[i].value);
1399 }
1400 }
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001401 gpr_free(calld->waiting_ops);
Craig Tillerd426cac2017-03-13 12:30:45 -07001402 grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001403}
1404
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001405static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1406 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001407 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001408 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001409 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001410}
1411
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001412/*************************************************************************
1413 * EXPORTED SYMBOLS
1414 */
1415
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001416const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001417 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001418 cc_start_transport_op,
1419 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001420 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001421 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001422 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001423 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001424 cc_init_channel_elem,
1425 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001426 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001427 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001428 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001429};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001430
Craig Tiller613dafa2017-02-09 12:00:43 -08001431static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1432 grpc_error *error_ignored) {
1433 channel_data *chand = arg;
1434 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001435 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001436 } else {
1437 chand->exit_idle_when_lb_policy_arrives = true;
1438 if (!chand->started_resolving && chand->resolver != NULL) {
1439 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1440 chand->started_resolving = true;
Craig Tiller972470b2017-02-09 15:05:36 -08001441 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1442 &chand->resolver_result,
1443 &chand->on_resolver_result_changed);
Craig Tiller613dafa2017-02-09 12:00:43 -08001444 }
1445 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001446 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001447}
1448
Craig Tillera82950e2015-09-22 12:33:20 -07001449grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1450 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001451 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001452 grpc_connectivity_state out =
1453 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001454 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001455 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001456 grpc_closure_sched(
1457 exec_ctx,
1458 grpc_closure_create(try_to_connect_locked, chand,
1459 grpc_combiner_scheduler(chand->combiner, false)),
1460 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001461 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001462 return out;
1463}
1464
Alexander Polcync3b1f182017-04-18 13:51:36 -07001465typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001466 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001467 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001468 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001469 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001470 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001471 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001472 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001473} external_connectivity_watcher;
1474
Alexander Polcync3b1f182017-04-18 13:51:36 -07001475static external_connectivity_watcher *lookup_external_connectivity_watcher(
1476 channel_data *chand, grpc_closure *on_complete) {
1477 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1478 external_connectivity_watcher *w =
1479 chand->external_connectivity_watcher_list_head;
1480 while (w != NULL && w->on_complete != on_complete) {
1481 w = w->next;
1482 }
1483 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1484 return w;
1485}
1486
1487static void external_connectivity_watcher_list_append(
1488 channel_data *chand, external_connectivity_watcher *w) {
1489 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1490
1491 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1492 GPR_ASSERT(!w->next);
1493 w->next = chand->external_connectivity_watcher_list_head;
1494 chand->external_connectivity_watcher_list_head = w;
1495 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1496}
1497
1498static void external_connectivity_watcher_list_remove(
1499 channel_data *chand, external_connectivity_watcher *too_remove) {
1500 GPR_ASSERT(
1501 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1502 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1503 if (too_remove == chand->external_connectivity_watcher_list_head) {
1504 chand->external_connectivity_watcher_list_head = too_remove->next;
1505 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1506 return;
1507 }
1508 external_connectivity_watcher *w =
1509 chand->external_connectivity_watcher_list_head;
1510 while (w != NULL) {
1511 if (w->next == too_remove) {
1512 w->next = w->next->next;
1513 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1514 return;
1515 }
1516 w = w->next;
1517 }
1518 GPR_UNREACHABLE_CODE(return );
1519}
1520
1521int grpc_client_channel_num_external_connectivity_watchers(
1522 grpc_channel_element *elem) {
1523 channel_data *chand = elem->channel_data;
1524 int count = 0;
1525
1526 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1527 external_connectivity_watcher *w =
1528 chand->external_connectivity_watcher_list_head;
1529 while (w != NULL) {
1530 count++;
1531 w = w->next;
1532 }
1533 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1534
1535 return count;
1536}
1537
Craig Tiller1d881fb2015-12-01 07:39:04 -08001538static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001539 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001540 external_connectivity_watcher *w = arg;
1541 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001542 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1543 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001544 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1545 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001546 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001547 gpr_free(w);
Craig Tiller613dafa2017-02-09 12:00:43 -08001548 grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
1549}
1550
Craig Tillera8610c02017-02-14 10:05:11 -08001551static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1552 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001553 external_connectivity_watcher *w = arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001554 external_connectivity_watcher *found = NULL;
1555 if (w->state != NULL) {
1556 external_connectivity_watcher_list_append(w->chand, w);
1557 grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1558 grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
1559 grpc_schedule_on_exec_ctx);
1560 grpc_connectivity_state_notify_on_state_change(
1561 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1562 } else {
1563 GPR_ASSERT(w->watcher_timer_init == NULL);
1564 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1565 if (found) {
1566 GPR_ASSERT(found->on_complete == w->on_complete);
1567 grpc_connectivity_state_notify_on_state_change(
1568 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1569 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001570 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1571 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001572 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1573 "external_connectivity_watcher");
1574 gpr_free(w);
1575 }
Craig Tiller86c99582015-11-25 15:22:26 -08001576}
1577
Craig Tillera82950e2015-09-22 12:33:20 -07001578void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001579 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1580 grpc_polling_entity pollent, grpc_connectivity_state *state,
1581 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001582 channel_data *chand = elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001583 external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001584 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001585 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001586 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001587 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001588 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001589 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1590 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001591 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1592 "external_connectivity_watcher");
Craig Tiller613dafa2017-02-09 12:00:43 -08001593 grpc_closure_sched(
1594 exec_ctx,
Craig Tillera8610c02017-02-14 10:05:11 -08001595 grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tiller613dafa2017-02-09 12:00:43 -08001596 grpc_combiner_scheduler(chand->combiner, true)),
1597 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001598}