blob: de516ab4c9738850f490f3a3dcc3294fc0166e79 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Craig Tiller9eb0fde2017-03-31 16:59:30 -070019#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080020
Mark D. Roth4c0fe492016-08-31 13:51:55 -070021#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070023#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080024
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080025#include <grpc/support/alloc.h>
26#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070027#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028#include <grpc/support/sync.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9eb0fde2017-03-31 16:59:30 -070031#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
32#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
34#include "src/core/ext/filters/client_channel/resolver_registry.h"
35#include "src/core/ext/filters/client_channel/retry_throttle.h"
36#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070037#include "src/core/ext/filters/deadline/deadline_filter.h"
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/channel/channel_args.h"
39#include "src/core/lib/channel/connected_channel.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080040#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070042#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070043#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080044#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070045#include "src/core/lib/support/string.h"
46#include "src/core/lib/surface/channel.h"
47#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070048#include "src/core/lib/transport/metadata.h"
49#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070050#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070051#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070052
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053/* Client channel implementation */
54
Mark D. Roth26b7be42016-10-24 10:08:07 -070055/*************************************************************************
56 * METHOD-CONFIG TABLE
57 */
58
Mark D. Roth9d480942016-10-19 14:18:05 -070059typedef enum {
Craig Tiller7acc37e2017-02-28 10:01:37 -080060 /* zero so it can be default initialized */
61 WAIT_FOR_READY_UNSET = 0,
Mark D. Roth9d480942016-10-19 14:18:05 -070062 WAIT_FOR_READY_FALSE,
63 WAIT_FOR_READY_TRUE
64} wait_for_ready_value;
65
Mark D. Roth95b627b2017-02-24 11:02:58 -080066typedef struct {
67 gpr_refcount refs;
Mark D. Roth9d480942016-10-19 14:18:05 -070068 gpr_timespec timeout;
69 wait_for_ready_value wait_for_ready;
70} method_parameters;
71
Mark D. Roth722de8d2017-02-27 10:50:44 -080072static method_parameters *method_parameters_ref(
Mark D. Roth95b627b2017-02-24 11:02:58 -080073 method_parameters *method_params) {
74 gpr_ref(&method_params->refs);
75 return method_params;
Mark D. Roth9d480942016-10-19 14:18:05 -070076}
77
Mark D. Roth95b627b2017-02-24 11:02:58 -080078static void method_parameters_unref(method_parameters *method_params) {
79 if (gpr_unref(&method_params->refs)) {
80 gpr_free(method_params);
81 }
82}
83
Mark D. Roth95b627b2017-02-24 11:02:58 -080084static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
85 method_parameters_unref(value);
Craig Tiller87a7e1f2016-11-09 09:42:19 -080086}
87
Mark D. Roth95b627b2017-02-24 11:02:58 -080088static bool parse_wait_for_ready(grpc_json *field,
89 wait_for_ready_value *wait_for_ready) {
90 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
91 return false;
92 }
93 *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
94 : WAIT_FOR_READY_FALSE;
95 return true;
96}
97
Mark D. Roth722de8d2017-02-27 10:50:44 -080098static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
Mark D. Roth95b627b2017-02-24 11:02:58 -080099 if (field->type != GRPC_JSON_STRING) return false;
100 size_t len = strlen(field->value);
101 if (field->value[len - 1] != 's') return false;
102 char *buf = gpr_strdup(field->value);
103 buf[len - 1] = '\0'; // Remove trailing 's'.
104 char *decimal_point = strchr(buf, '.');
105 if (decimal_point != NULL) {
106 *decimal_point = '\0';
107 timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
108 if (timeout->tv_nsec == -1) {
109 gpr_free(buf);
110 return false;
111 }
112 // There should always be exactly 3, 6, or 9 fractional digits.
113 int multiplier = 1;
114 switch (strlen(decimal_point + 1)) {
115 case 9:
116 break;
117 case 6:
118 multiplier *= 1000;
119 break;
120 case 3:
121 multiplier *= 1000000;
122 break;
123 default: // Unsupported number of digits.
124 gpr_free(buf);
125 return false;
126 }
127 timeout->tv_nsec *= multiplier;
128 }
129 timeout->tv_sec = gpr_parse_nonnegative_int(buf);
130 gpr_free(buf);
131 if (timeout->tv_sec == -1) return false;
132 return true;
133}
134
Mark D. Rothe30baeb2016-11-03 08:16:19 -0700135static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700136 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -0700137 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
138 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700139 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800140 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700141 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800142 if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700143 } else if (strcmp(field->key, "timeout") == 0) {
144 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth95b627b2017-02-24 11:02:58 -0800145 if (!parse_timeout(field, &timeout)) return NULL;
Mark D. Rothc968e602016-11-02 14:07:36 -0700146 }
147 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700148 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Roth95b627b2017-02-24 11:02:58 -0800149 gpr_ref_init(&value->refs, 1);
Mark D. Rothc968e602016-11-02 14:07:36 -0700150 value->timeout = timeout;
151 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700152 return value;
153}
154
Alexander Polcync3b1f182017-04-18 13:51:36 -0700155struct external_connectivity_watcher;
156
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700157/*************************************************************************
158 * CHANNEL-WIDE FUNCTIONS
159 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800160
Craig Tiller800dacb2015-10-06 09:10:26 -0700161typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700162 /** resolver for this channel */
163 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700164 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700165 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -0700166 /** is deadline checking enabled? */
167 bool deadline_checking_enabled;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700168 /** client channel factory */
169 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700170
Craig Tillerbefafe62017-02-09 11:30:54 -0800171 /** combiner protecting all variables below in this data structure */
172 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700173 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700174 grpc_lb_policy *lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800175 /** retry throttle data */
176 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700177 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800178 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700179 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700180 grpc_channel_args *resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700181 /** a list of closures that are all waiting for resolver result to come in */
182 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700183 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700184 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700185 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700186 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700187 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700188 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800189 /** owning stack */
190 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800191 /** interested parties (owned) */
192 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800193
Alexander Polcync3b1f182017-04-18 13:51:36 -0700194 /* external_connectivity_watcher_list head is guarded by its own mutex, since
195 * counts need to be grabbed immediately without polling on a cq */
196 gpr_mu external_connectivity_watcher_list_mu;
197 struct external_connectivity_watcher *external_connectivity_watcher_list_head;
198
Craig Tiller613dafa2017-02-09 12:00:43 -0800199 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800200 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800201 gpr_mu info_mu;
202 char *info_lb_policy_name;
203 /** service config in JSON form */
204 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205} channel_data;
206
Craig Tillerd6c98df2015-08-18 09:33:44 -0700207/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700208 resolver, to watch for state changes from the lb_policy. When a state
209 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700210typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700211 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700212 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700213 grpc_connectivity_state state;
214 grpc_lb_policy *lb_policy;
215} lb_policy_connectivity_watcher;
216
Craig Tiller2400bf52017-02-09 16:25:19 -0800217static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
218 grpc_lb_policy *lb_policy,
219 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700220
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800221static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
222 channel_data *chand,
223 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700224 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800225 const char *reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700226 /* TODO: Improve failure handling:
227 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
228 * - Hand over pending picks from old policies during the switch that happens
229 * when resolver provides an update. */
David Garcia Quintas956f7002017-04-13 15:40:06 -0700230 if (chand->lb_policy != NULL) {
231 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
232 /* cancel picks with wait_for_ready=false */
233 grpc_lb_policy_cancel_picks_locked(
234 exec_ctx, chand->lb_policy,
235 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
236 /* check= */ 0, GRPC_ERROR_REF(error));
237 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
238 /* cancel all picks */
239 grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
240 /* mask= */ 0, /* check= */ 0,
241 GRPC_ERROR_REF(error));
242 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800243 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700244 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
245 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800246}
247
Craig Tiller804ff712016-05-05 16:25:40 -0700248static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800249 void *arg, grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700250 lb_policy_connectivity_watcher *w = arg;
Craig Tiller5d44c062015-07-01 08:55:28 -0700251 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800252 /* check if the notification is for the latest policy */
253 if (w->lb_policy == w->chand->lb_policy) {
254 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
255 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800256 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800257 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
258 w->chand->lb_policy = NULL;
259 }
260 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
261 GRPC_ERROR_REF(error), "lb_changed");
262 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800263 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800264 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700266
Craig Tiller906e3bc2015-11-24 07:31:31 -0800267 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700268 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700269}
270
Craig Tiller2400bf52017-02-09 16:25:19 -0800271static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
272 grpc_lb_policy *lb_policy,
273 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700274 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800275 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700276
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700277 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700278 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700279 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700280 w->state = current_state;
281 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800282 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
283 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700284}
285
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800286typedef struct {
287 char *server_name;
288 grpc_server_retry_throttle_data *retry_throttle_data;
289} service_config_parsing_state;
290
291static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
292 service_config_parsing_state *parsing_state = arg;
293 if (strcmp(field->key, "retryThrottling") == 0) {
294 if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
295 if (field->type != GRPC_JSON_OBJECT) return;
296 int max_milli_tokens = 0;
297 int milli_token_ratio = 0;
298 for (grpc_json *sub_field = field->child; sub_field != NULL;
299 sub_field = sub_field->next) {
Mark D. Rothb3322562017-02-23 14:38:02 -0800300 if (sub_field->key == NULL) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800301 if (strcmp(sub_field->key, "maxTokens") == 0) {
302 if (max_milli_tokens != 0) return; // Duplicate.
303 if (sub_field->type != GRPC_JSON_NUMBER) return;
304 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
305 if (max_milli_tokens == -1) return;
306 max_milli_tokens *= 1000;
307 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
308 if (milli_token_ratio != 0) return; // Duplicate.
309 if (sub_field->type != GRPC_JSON_NUMBER) return;
310 // We support up to 3 decimal digits.
311 size_t whole_len = strlen(sub_field->value);
312 uint32_t multiplier = 1;
313 uint32_t decimal_value = 0;
314 const char *decimal_point = strchr(sub_field->value, '.');
315 if (decimal_point != NULL) {
316 whole_len = (size_t)(decimal_point - sub_field->value);
317 multiplier = 1000;
318 size_t decimal_len = strlen(decimal_point + 1);
319 if (decimal_len > 3) decimal_len = 3;
320 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
321 &decimal_value)) {
322 return;
323 }
324 uint32_t decimal_multiplier = 1;
325 for (size_t i = 0; i < (3 - decimal_len); ++i) {
326 decimal_multiplier *= 10;
327 }
328 decimal_value *= decimal_multiplier;
329 }
330 uint32_t whole_value;
331 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
332 &whole_value)) {
333 return;
334 }
335 milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800336 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800337 }
338 }
339 parsing_state->retry_throttle_data =
340 grpc_retry_throttle_map_get_data_for_server(
341 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
342 }
343}
344
Craig Tillerbefafe62017-02-09 11:30:54 -0800345static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
346 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700347 channel_data *chand = arg;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700348 // Extract the following fields from the resolver result, if non-NULL.
Mark D. Rothb2d24882016-10-27 15:44:07 -0700349 char *lb_policy_name = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700350 grpc_lb_policy *new_lb_policy = NULL;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800351 char *service_config_json = NULL;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700352 grpc_server_retry_throttle_data *retry_throttle_data = NULL;
353 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth046cf762016-09-26 11:13:51 -0700354 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700355 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700356 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700357 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700358 if (channel_arg != NULL) {
359 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
360 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700361 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700362 // Special case: If at least one balancer address is present, we use
363 // the grpclb policy, regardless of what the resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700364 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700365 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas143cb772017-03-31 13:39:27 -0700366 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
Mark D. Roth557c9902016-10-24 11:12:05 -0700367 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700368 bool found_balancer_address = false;
Mark D. Rothaf842452016-10-21 15:05:15 -0700369 for (size_t i = 0; i < addresses->num_addresses; ++i) {
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700370 if (addresses->addresses[i].is_balancer) {
371 found_balancer_address = true;
Mark D. Rothaf842452016-10-21 15:05:15 -0700372 break;
373 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700374 }
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700375 if (found_balancer_address) {
Mark D. Rothaf842452016-10-21 15:05:15 -0700376 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
377 gpr_log(GPR_INFO,
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700378 "resolver requested LB policy %s but provided at least one "
379 "balancer address -- forcing use of grpclb LB policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700380 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700381 }
382 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700383 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700384 }
385 // Use pick_first if nothing was specified and we didn't select grpclb
386 // above.
387 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700388 grpc_lb_policy_args lb_policy_args;
389 lb_policy_args.args = chand->resolver_result;
390 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800391 lb_policy_args.combiner = chand->combiner;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700392 // Check to see if we're already using the right LB policy.
393 // Note: It's safe to use chand->info_lb_policy_name here without
394 // taking a lock on chand->info_mu, because this function is the
395 // only thing that modifies its value, and it can only be invoked
396 // once at any given time.
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700397 const bool lb_policy_type_changed =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700398 chand->info_lb_policy_name == NULL ||
399 strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700400 if (chand->lb_policy != NULL && !lb_policy_type_changed) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700401 // Continue using the same LB policy. Update with new addresses.
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700402 grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
403 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700404 // Instantiate new LB policy.
405 new_lb_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700406 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700407 if (new_lb_policy == NULL) {
408 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700409 }
Craig Tiller45724b32015-09-22 10:42:19 -0700410 }
Mark D. Roth41124992016-11-03 11:22:20 -0700411 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700412 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700413 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700414 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700415 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800416 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700417 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800418 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700419 if (service_config != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800420 channel_arg =
421 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
422 GPR_ASSERT(channel_arg != NULL);
423 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700424 grpc_uri *uri =
425 grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800426 GPR_ASSERT(uri->path[0] != '\0');
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700427 service_config_parsing_state parsing_state;
428 memset(&parsing_state, 0, sizeof(parsing_state));
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800429 parsing_state.server_name =
430 uri->path[0] == '/' ? uri->path + 1 : uri->path;
431 grpc_service_config_parse_global_params(
432 service_config, parse_retry_throttle_params, &parsing_state);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800433 grpc_uri_destroy(uri);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700434 retry_throttle_data = parsing_state.retry_throttle_data;
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700435 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800436 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothe3006702017-04-19 07:43:56 -0700437 method_parameters_free);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700438 grpc_service_config_destroy(service_config);
439 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700440 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700441 // Before we clean up, save a copy of lb_policy_name, since it might
442 // be pointing to data inside chand->resolver_result.
443 // The copy will be saved in chand->lb_policy_name below.
444 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800445 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700446 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700447 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700448 // Now swap out fields in chand. Note that the new values may still
449 // be NULL if (e.g.) the resolver failed to return results or the
450 // results did not contain the necessary data.
451 //
452 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800453 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700454 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800455 gpr_free(chand->info_lb_policy_name);
456 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700457 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800458 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800459 gpr_free(chand->info_service_config_json);
460 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800461 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800462 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700463 // Swap out the retry throttle data.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800464 if (chand->retry_throttle_data != NULL) {
465 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
466 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700467 chand->retry_throttle_data = retry_throttle_data;
468 // Swap out the method params table.
Mark D. Roth9d480942016-10-19 14:18:05 -0700469 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800470 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700471 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700472 chand->method_params_table = method_params_table;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700473 // If we have a new LB policy or are shutting down (in which case
474 // new_lb_policy will be NULL), swap out the LB policy, unreffing the
475 // old one and removing its fds from chand->interested_parties.
476 // Note that we do NOT do this if either (a) we updated the existing
477 // LB policy above or (b) we failed to create the new LB policy (in
478 // which case we want to continue using the most recent one we had).
479 if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
480 chand->resolver == NULL) {
481 if (chand->lb_policy != NULL) {
482 grpc_pollset_set_del_pollset_set(exec_ctx,
483 chand->lb_policy->interested_parties,
484 chand->interested_parties);
485 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700486 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700487 chand->lb_policy = new_lb_policy;
488 }
489 // Now that we've swapped out the relevant fields of chand, check for
490 // error or shutdown.
491 if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800492 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800493 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800494 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
495 chand->resolver = NULL;
496 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800497 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700498 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700499 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700500 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700501 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700502 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
503 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
504 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
505 "Channel disconnected", &error, 1));
506 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
507 &chand->waiting_for_resolver_result_closures);
508 } else { // Not shutting down.
509 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
510 grpc_error *state_error =
511 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
512 if (new_lb_policy != NULL) {
513 GRPC_ERROR_UNREF(state_error);
514 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
515 &state_error);
516 grpc_pollset_set_add_pollset_set(exec_ctx,
517 new_lb_policy->interested_parties,
518 chand->interested_parties);
519 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
520 &chand->waiting_for_resolver_result_closures);
521 if (chand->exit_idle_when_lb_policy_arrives) {
522 grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
523 chand->exit_idle_when_lb_policy_arrives = false;
524 }
525 watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
526 }
527 set_channel_connectivity_state_locked(
528 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
529 grpc_resolver_next_locked(exec_ctx, chand->resolver,
530 &chand->resolver_result,
531 &chand->on_resolver_result_changed);
532 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700533 }
Craig Tiller3f475422015-06-25 10:43:05 -0700534}
535
Craig Tillera8610c02017-02-14 10:05:11 -0800536static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
537 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800538 grpc_transport_op *op = arg;
Craig Tillerc55c1022017-03-10 10:26:42 -0800539 grpc_channel_element *elem = op->handler_private.extra_arg;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700540 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700541
Craig Tillera82950e2015-09-22 12:33:20 -0700542 if (op->on_connectivity_state_change != NULL) {
543 grpc_connectivity_state_notify_on_state_change(
544 exec_ctx, &chand->state_tracker, op->connectivity_state,
545 op->on_connectivity_state_change);
546 op->on_connectivity_state_change = NULL;
547 op->connectivity_state = NULL;
548 }
549
Craig Tiller26dab312015-12-07 14:43:47 -0800550 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800551 if (chand->lb_policy == NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700552 GRPC_CLOSURE_SCHED(
ncteisen4b36a3d2017-03-13 19:08:06 -0700553 exec_ctx, op->send_ping,
554 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800555 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800556 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800557 op->bind_pollset = NULL;
558 }
559 op->send_ping = NULL;
560 }
561
Craig Tiller1c51edc2016-05-07 16:18:43 -0700562 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
563 if (chand->resolver != NULL) {
564 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700565 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700566 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800567 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700568 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
569 chand->resolver = NULL;
570 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700571 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700572 GRPC_ERROR_REF(op->disconnect_with_error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700573 GRPC_CLOSURE_LIST_SCHED(exec_ctx,
574 &chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700575 }
576 if (chand->lb_policy != NULL) {
577 grpc_pollset_set_del_pollset_set(exec_ctx,
578 chand->lb_policy->interested_parties,
579 chand->interested_parties);
580 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
581 chand->lb_policy = NULL;
582 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700583 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700584 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700585 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800586 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
587
ncteisen274bbbe2017-06-08 14:57:11 -0700588 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800589}
590
591static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
592 grpc_channel_element *elem,
593 grpc_transport_op *op) {
594 channel_data *chand = elem->channel_data;
595
Craig Tillerbefafe62017-02-09 11:30:54 -0800596 GPR_ASSERT(op->set_accept_stream == false);
597 if (op->bind_pollset != NULL) {
598 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
599 op->bind_pollset);
600 }
601
Craig Tillerc55c1022017-03-10 10:26:42 -0800602 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800603 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700604 GRPC_CLOSURE_SCHED(
Craig Tillerc55c1022017-03-10 10:26:42 -0800605 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -0700606 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700607 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800608 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700609}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800610
Mark D. Rothb2d24882016-10-27 15:44:07 -0700611static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
612 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700613 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700614 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800615 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700616 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800617 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700618 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800619 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700620 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800621 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800622 *info->service_config_json =
623 chand->info_service_config_json == NULL
624 ? NULL
625 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800626 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800627 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700628}
629
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700630/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800631static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800632 grpc_channel_element *elem,
633 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700634 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700635 GPR_ASSERT(args->is_last);
636 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800637 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700638 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800639 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700640 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
641
642 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
643 chand->external_connectivity_watcher_list_head = NULL;
644 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
645
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800646 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700647 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800648 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700649 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800650 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700651 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
652 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800653 // Record client channel factory.
654 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
655 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700656 if (arg == NULL) {
657 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
658 "Missing client channel factory in args for client channel filter");
659 }
660 if (arg->type != GRPC_ARG_POINTER) {
661 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
662 "client channel factory arg must be a pointer");
663 }
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800664 grpc_client_channel_factory_ref(arg->value.pointer.p);
665 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800666 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800667 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700668 if (arg == NULL) {
669 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
670 "Missing server uri in args for client channel filter");
671 }
672 if (arg->type != GRPC_ARG_STRING) {
673 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
674 "server uri arg must be a string");
675 }
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800676 char *proxy_name = NULL;
677 grpc_channel_args *new_args = NULL;
678 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
679 &proxy_name, &new_args);
680 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800681 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800682 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
683 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800684 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800685 if (proxy_name != NULL) gpr_free(proxy_name);
686 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800687 if (chand->resolver == NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700688 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800689 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700690 chand->deadline_checking_enabled =
691 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800692 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700693}
694
Craig Tiller972470b2017-02-09 15:05:36 -0800695static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
696 grpc_error *error) {
697 grpc_resolver *resolver = arg;
698 grpc_resolver_shutdown_locked(exec_ctx, resolver);
699 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
700}
701
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700702/* Destructor for channel_data */
703static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
704 grpc_channel_element *elem) {
705 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700706 if (chand->resolver != NULL) {
ncteisen274bbbe2017-06-08 14:57:11 -0700707 GRPC_CLOSURE_SCHED(
708 exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
Craig Tilleree4b1452017-05-12 10:56:03 -0700709 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800710 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700711 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700712 if (chand->client_channel_factory != NULL) {
713 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
714 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700715 if (chand->lb_policy != NULL) {
716 grpc_pollset_set_del_pollset_set(exec_ctx,
717 chand->lb_policy->interested_parties,
718 chand->interested_parties);
719 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
720 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800721 gpr_free(chand->info_lb_policy_name);
722 gpr_free(chand->info_service_config_json);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800723 if (chand->retry_throttle_data != NULL) {
724 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
725 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700726 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800727 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700728 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700729 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800730 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800731 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800732 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700733 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734}
735
736/*************************************************************************
737 * PER-CALL FUNCTIONS
738 */
739
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700740// Max number of batches that can be pending on a call at any given
741// time. This includes:
742// recv_initial_metadata
743// send_initial_metadata
744// recv_message
745// send_message
746// recv_trailing_metadata
747// send_trailing_metadata
748#define MAX_WAITING_BATCHES 6
749
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700750/** Call data. Holds a pointer to grpc_subchannel_call and the
751 associated machinery to create such a pointer.
752 Handles queueing of stream ops until a call object is ready, waiting
753 for initial metadata before trying to create a call object,
754 and handling cancellation gracefully. */
755typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700756 // State for handling deadlines.
757 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700758 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
759 // and this struct both independently store a pointer to the call
760 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700761 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700762 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700763
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800764 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700765 gpr_timespec call_start_time;
766 gpr_timespec deadline;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700767 grpc_server_retry_throttle_data *retry_throttle_data;
Mark D. Roth95b627b2017-02-24 11:02:58 -0800768 method_parameters *method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700769
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700770 /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
771 bit is 0), or a pointer to an error (if the lowest bit is 1) */
772 gpr_atm subchannel_call_or_error;
Craig Tillerd426cac2017-03-13 12:30:45 -0700773 gpr_arena *arena;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700774
Mark D. Roth64a317c2017-05-02 08:27:08 -0700775 bool pick_pending;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700776 grpc_connected_subchannel *connected_subchannel;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700777 grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700778 grpc_polling_entity *pollent;
779
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700780 grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
781 size_t waiting_for_pick_batches_count;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700782
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700783 grpc_transport_stream_op_batch_payload *initial_metadata_payload;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700784
785 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200786
787 grpc_linked_mdelem lb_token_mdelem;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800788
789 grpc_closure on_complete;
790 grpc_closure *original_on_complete;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700791} call_data;
792
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700793typedef struct {
794 grpc_subchannel_call *subchannel_call;
795 grpc_error *error;
796} call_or_error;
797
798static call_or_error get_call_or_error(call_data *p) {
799 gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
800 if (c == 0)
801 return (call_or_error){NULL, NULL};
802 else if (c & 1)
803 return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
804 else
805 return (call_or_error){(grpc_subchannel_call *)c, NULL};
806}
807
Craig Tiller03155722017-05-23 23:51:51 +0000808static bool set_call_or_error(call_data *p, call_or_error coe) {
809 // this should always be under a lock
810 call_or_error existing = get_call_or_error(p);
811 if (existing.error != GRPC_ERROR_NONE) {
812 GRPC_ERROR_UNREF(coe.error);
813 return false;
814 }
815 GPR_ASSERT(existing.subchannel_call == NULL);
816 if (coe.error != GRPC_ERROR_NONE) {
Craig Tiller26e69f62017-05-24 15:09:23 -0700817 GPR_ASSERT(coe.subchannel_call == NULL);
818 gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
Craig Tiller03155722017-05-23 23:51:51 +0000819 } else {
Craig Tiller26e69f62017-05-24 15:09:23 -0700820 GPR_ASSERT(coe.subchannel_call != NULL);
821 gpr_atm_rel_store(&p->subchannel_call_or_error,
822 (gpr_atm)coe.subchannel_call);
Craig Tiller03155722017-05-23 23:51:51 +0000823 }
824 return true;
825}
826
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800827grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
828 grpc_call_element *call_elem) {
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700829 return get_call_or_error(call_elem->call_data).subchannel_call;
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800830}
831
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700832static void waiting_for_pick_batches_add_locked(
833 call_data *calld, grpc_transport_stream_op_batch *batch) {
834 GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
835 calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
836 batch;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700837}
838
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700839static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
840 call_data *calld,
841 grpc_error *error) {
842 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
Craig Tillera0f3abd2017-03-31 15:42:16 -0700843 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700844 exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700845 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700846 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700847 GRPC_ERROR_UNREF(error);
848}
849
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700850static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
851 call_data *calld) {
852 if (calld->waiting_for_pick_batches_count == 0) return;
853 call_or_error coe = get_call_or_error(calld);
854 if (coe.error != GRPC_ERROR_NONE) {
855 waiting_for_pick_batches_fail_locked(exec_ctx, calld,
856 GRPC_ERROR_REF(coe.error));
Craig Tiller57726ca2016-09-12 11:59:45 -0700857 return;
858 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700859 for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
860 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
861 calld->waiting_for_pick_batches[i]);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700862 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700863 calld->waiting_for_pick_batches_count = 0;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700864}
865
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700866// Applies service config to the call. Must be invoked once we know
867// that the resolver has returned results to the channel.
868static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
869 grpc_call_element *elem) {
Craig Tiller11c17d42017-03-13 13:36:34 -0700870 channel_data *chand = elem->channel_data;
871 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -0700872 if (chand->retry_throttle_data != NULL) {
873 calld->retry_throttle_data =
874 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
875 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700876 if (chand->method_params_table != NULL) {
877 calld->method_params = grpc_method_config_table_get(
878 exec_ctx, chand->method_params_table, calld->path);
879 if (calld->method_params != NULL) {
880 method_parameters_ref(calld->method_params);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700881 // If the deadline from the service config is shorter than the one
882 // from the client API, reset the deadline timer.
883 if (chand->deadline_checking_enabled &&
884 gpr_time_cmp(calld->method_params->timeout,
Craig Tiller11c17d42017-03-13 13:36:34 -0700885 gpr_time_0(GPR_TIMESPAN)) != 0) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700886 const gpr_timespec per_method_deadline =
Craig Tiller11c17d42017-03-13 13:36:34 -0700887 gpr_time_add(calld->call_start_time, calld->method_params->timeout);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700888 if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
889 calld->deadline = per_method_deadline;
890 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
891 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700892 }
893 }
894 }
Craig Tiller11c17d42017-03-13 13:36:34 -0700895}
Craig Tillerea4a4f12017-03-13 13:36:52 -0700896
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700897static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
898 call_data *calld, grpc_error *error) {
899 grpc_subchannel_call *subchannel_call = NULL;
900 const grpc_connected_subchannel_call_args call_args = {
901 .pollent = calld->pollent,
902 .path = calld->path,
903 .start_time = calld->call_start_time,
904 .deadline = calld->deadline,
905 .arena = calld->arena,
906 .context = calld->subchannel_call_context};
907 grpc_error *new_error = grpc_connected_subchannel_create_call(
908 exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
909 GPR_ASSERT(set_call_or_error(
910 calld, (call_or_error){.subchannel_call = subchannel_call}));
911 if (new_error != GRPC_ERROR_NONE) {
912 new_error = grpc_error_add_child(new_error, error);
913 waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
914 } else {
915 waiting_for_pick_batches_resume_locked(exec_ctx, calld);
Craig Tiller11c17d42017-03-13 13:36:34 -0700916 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700917 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -0700918}
919
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700920static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
921 grpc_call_element *elem,
Craig Tillerbefafe62017-02-09 11:30:54 -0800922 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700923 call_data *calld = elem->call_data;
924 channel_data *chand = elem->channel_data;
Mark D. Roth64a317c2017-05-02 08:27:08 -0700925 GPR_ASSERT(calld->pick_pending);
926 calld->pick_pending = false;
Yuchen Zeng19656b12016-09-01 18:00:45 -0700927 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
928 chand->interested_parties);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700929 call_or_error coe = get_call_or_error(calld);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700930 if (calld->connected_subchannel == NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700931 // Failed to create subchannel.
Craig Tillerd3ec4aa2017-05-18 10:22:43 -0700932 grpc_error *failure =
933 error == GRPC_ERROR_NONE
934 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
935 "Call dropped by load balancing policy")
936 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
937 "Failed to create subchannel", &error, 1);
Craig Tiller03155722017-05-23 23:51:51 +0000938 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700939 waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700940 } else if (coe.error != GRPC_ERROR_NONE) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700941 /* already cancelled before subchannel became ready */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700942 grpc_error *child_errors[] = {error, coe.error};
ncteisen4b36a3d2017-03-13 19:08:06 -0700943 grpc_error *cancellation_error =
944 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700945 "Cancelled before creating subchannel", child_errors,
946 GPR_ARRAY_SIZE(child_errors));
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800947 /* if due to deadline, attach the deadline exceeded status to the error */
948 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
949 cancellation_error =
950 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
951 GRPC_STATUS_DEADLINE_EXCEEDED);
952 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700953 waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700954 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700955 /* Create call on subchannel. */
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700956 create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700957 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700958 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700959 GRPC_ERROR_UNREF(error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700960}
961
962static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
963 call_data *calld = elem->call_data;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700964 grpc_subchannel_call *subchannel_call =
965 get_call_or_error(calld).subchannel_call;
966 if (subchannel_call == NULL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700967 return NULL;
968 } else {
969 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
970 }
971}
972
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700973/** Return true if subchannel is available immediately (in which case
974 subchannel_ready_locked() should not be called), or false otherwise (in
975 which case subchannel_ready_locked() should be called when the subchannel
976 is available). */
977static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
978 grpc_call_element *elem);
979
Craig Tiller577c9b22015-11-02 14:11:15 -0800980typedef struct {
Craig Tiller577c9b22015-11-02 14:11:15 -0800981 grpc_call_element *elem;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700982 bool cancelled;
Craig Tiller577c9b22015-11-02 14:11:15 -0800983 grpc_closure closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700984} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -0800985
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700986static void continue_picking_after_resolver_result_locked(
987 grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
988 pick_after_resolver_result_args *args = arg;
989 if (args->cancelled) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800990 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700991 } else if (error != GRPC_ERROR_NONE) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700992 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700993 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700994 if (pick_subchannel_locked(exec_ctx, args->elem)) {
995 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700996 }
Craig Tiller577c9b22015-11-02 14:11:15 -0800997 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700998 gpr_free(args);
Craig Tiller577c9b22015-11-02 14:11:15 -0800999}
1000
Mark D. Roth64a317c2017-05-02 08:27:08 -07001001static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1002 grpc_error *error) {
1003 channel_data *chand = elem->channel_data;
1004 call_data *calld = elem->call_data;
1005 if (chand->lb_policy != NULL) {
1006 grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
1007 &calld->connected_subchannel,
1008 GRPC_ERROR_REF(error));
1009 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001010 // If we don't yet have a resolver result, then a closure for
1011 // continue_picking_after_resolver_result_locked() will have been added to
1012 // chand->waiting_for_resolver_result_closures, and it may not be invoked
1013 // until after this call has been destroyed. We mark the operation as
1014 // cancelled, so that when continue_picking_after_resolver_result_locked()
1015 // is called, it will be a no-op. We also immediately invoke
1016 // subchannel_ready_locked() to propagate the error back to the caller.
1017 for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
Mark D. Roth64a317c2017-05-02 08:27:08 -07001018 closure != NULL; closure = closure->next_data.next) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001019 pick_after_resolver_result_args *args = closure->cb_arg;
1020 if (!args->cancelled && args->elem == elem) {
1021 args->cancelled = true;
1022 subchannel_ready_locked(exec_ctx, elem,
1023 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1024 "Pick cancelled", &error, 1));
Mark D. Roth64a317c2017-05-02 08:27:08 -07001025 }
1026 }
1027 GRPC_ERROR_UNREF(error);
1028}
1029
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001030// State for pick callback that holds a reference to the LB policy
1031// from which the pick was requested.
1032typedef struct {
1033 grpc_lb_policy *lb_policy;
1034 grpc_call_element *elem;
1035 grpc_closure closure;
1036} pick_callback_args;
Craig Tillerbfc9adc2016-06-27 13:16:22 -07001037
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001038// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
1039// Unrefs the LB policy after invoking subchannel_ready_locked().
1040static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1041 grpc_error *error) {
1042 pick_callback_args *args = arg;
1043 GPR_ASSERT(args != NULL);
1044 GPR_ASSERT(args->lb_policy != NULL);
1045 subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
1046 GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
1047 gpr_free(args);
1048}
1049
1050// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
1051// If the pick was completed synchronously, unrefs the LB policy and
1052// returns true.
1053static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
1054 grpc_call_element *elem,
1055 const grpc_lb_policy_pick_args *inputs) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001056 channel_data *chand = elem->channel_data;
1057 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001058 pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
1059 GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
1060 pick_args->lb_policy = chand->lb_policy;
1061 pick_args->elem = elem;
1062 GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
1063 grpc_combiner_scheduler(chand->combiner));
1064 const bool pick_done = grpc_lb_policy_pick_locked(
1065 exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
1066 calld->subchannel_call_context, NULL, &pick_args->closure);
1067 if (pick_done) {
1068 /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
1069 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
1070 gpr_free(pick_args);
1071 }
1072 return pick_done;
1073}
Craig Tiller577c9b22015-11-02 14:11:15 -08001074
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001075static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
1076 grpc_call_element *elem) {
1077 GPR_TIMER_BEGIN("pick_subchannel", 0);
1078 channel_data *chand = elem->channel_data;
1079 call_data *calld = elem->call_data;
1080 bool pick_done = false;
Craig Tiller577c9b22015-11-02 14:11:15 -08001081 if (chand->lb_policy != NULL) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001082 apply_service_config_to_call_locked(exec_ctx, elem);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001083 // If the application explicitly set wait_for_ready, use that.
1084 // Otherwise, if the service config specified a value for this
1085 // method, use that.
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001086 uint32_t initial_metadata_flags =
1087 calld->initial_metadata_payload->send_initial_metadata
1088 .send_initial_metadata_flags;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001089 const bool wait_for_ready_set_from_api =
1090 initial_metadata_flags &
1091 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1092 const bool wait_for_ready_set_from_service_config =
Mark D. Roth95b627b2017-02-24 11:02:58 -08001093 calld->method_params != NULL &&
1094 calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
Mark D. Rothc1c38582016-10-11 11:03:27 -07001095 if (!wait_for_ready_set_from_api &&
1096 wait_for_ready_set_from_service_config) {
Mark D. Roth95b627b2017-02-24 11:02:58 -08001097 if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001098 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1099 } else {
1100 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
1101 }
1102 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001103 const grpc_lb_policy_pick_args inputs = {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001104 calld->initial_metadata_payload->send_initial_metadata
1105 .send_initial_metadata,
1106 initial_metadata_flags, &calld->lb_token_mdelem};
1107 pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
1108 } else if (chand->resolver != NULL) {
1109 if (!chand->started_resolving) {
1110 chand->started_resolving = true;
1111 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1112 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1113 &chand->resolver_result,
1114 &chand->on_resolver_result_changed);
David Garcia Quintas956f7002017-04-13 15:40:06 -07001115 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001116 pick_after_resolver_result_args *args =
1117 (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
1118 args->elem = elem;
1119 GRPC_CLOSURE_INIT(&args->closure,
1120 continue_picking_after_resolver_result_locked, args,
Craig Tilleree4b1452017-05-12 10:56:03 -07001121 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001122 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
1123 &args->closure, GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -07001124 } else {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001125 subchannel_ready_locked(
1126 exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -07001127 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001128 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001129 return pick_done;
Craig Tiller577c9b22015-11-02 14:11:15 -08001130}
1131
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001132static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
1133 void *arg,
1134 grpc_error *error_ignored) {
1135 GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
1136 grpc_transport_stream_op_batch *op = arg;
1137 grpc_call_element *elem = op->handler_private.extra_arg;
Craig Tillera11bfc82017-02-14 09:56:33 -08001138 call_data *calld = elem->call_data;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001139 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001140 /* need to recheck that another thread hasn't set the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001141 call_or_error coe = get_call_or_error(calld);
1142 if (coe.error != GRPC_ERROR_NONE) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001143 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001144 exec_ctx, op, GRPC_ERROR_REF(coe.error));
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001145 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001146 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001147 if (coe.subchannel_call != NULL) {
1148 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001149 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001150 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001151 // Add to waiting-for-pick list. If we succeed in getting a
1152 // subchannel call below, we'll handle this batch (along with any
1153 // other waiting batches) in waiting_for_pick_batches_resume_locked().
1154 waiting_for_pick_batches_add_locked(calld, op);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001155 /* if this is a cancellation, then we can raise our cancelled flag */
Craig Tillerc55c1022017-03-10 10:26:42 -08001156 if (op->cancel_stream) {
Craig Tiller03155722017-05-23 23:51:51 +00001157 grpc_error *error = op->payload->cancel_stream.cancel_error;
1158 /* Stash a copy of cancel_error in our call data, so that we can use
1159 it for subsequent operations. This ensures that if the call is
1160 cancelled before any ops are passed down (e.g., if the deadline
1161 is in the past when the call starts), we can return the right
1162 error to the caller when the first op does get passed down. */
1163 set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
1164 if (calld->pick_pending) {
1165 cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001166 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001167 waiting_for_pick_batches_fail_locked(exec_ctx, calld,
1168 GRPC_ERROR_REF(error));
1169 goto done;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001170 }
1171 /* if we don't have a subchannel, try to get one */
Mark D. Roth64a317c2017-05-02 08:27:08 -07001172 if (!calld->pick_pending && calld->connected_subchannel == NULL &&
1173 op->send_initial_metadata) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001174 calld->initial_metadata_payload = op->payload;
Mark D. Roth64a317c2017-05-02 08:27:08 -07001175 calld->pick_pending = true;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001176 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Yuchen Zeng144ce652016-09-01 18:19:34 -07001177 /* If a subchannel is not available immediately, the polling entity from
1178 call_data should be provided to channel_data's interested_parties, so
1179 that IO of the lb_policy and resolver could be done under it. */
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001180 if (pick_subchannel_locked(exec_ctx, elem)) {
1181 // Pick was returned synchronously.
Mark D. Roth64a317c2017-05-02 08:27:08 -07001182 calld->pick_pending = false;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001183 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
Mark D. Rothd7389b42017-05-17 12:22:17 -07001184 if (calld->connected_subchannel == NULL) {
Mark D. Rothd7389b42017-05-17 12:22:17 -07001185 grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1186 "Call dropped by load balancing policy");
Craig Tiller26e69f62017-05-24 15:09:23 -07001187 set_call_or_error(calld,
1188 (call_or_error){.error = GRPC_ERROR_REF(error)});
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001189 waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
1190 } else {
1191 // Create subchannel call.
1192 create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -07001193 }
Yuchen Zeng19656b12016-09-01 18:00:45 -07001194 } else {
Yuchen Zeng19656b12016-09-01 18:00:45 -07001195 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
1196 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001197 }
1198 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001199done:
1200 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
1201 "start_transport_stream_op_batch");
1202 GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001203}
1204
Mark D. Rothde144102017-03-15 10:11:03 -07001205static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001206 grpc_call_element *elem = arg;
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001207 call_data *calld = elem->call_data;
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001208 if (calld->retry_throttle_data != NULL) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001209 if (error == GRPC_ERROR_NONE) {
1210 grpc_server_retry_throttle_data_record_success(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001211 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001212 } else {
1213 // TODO(roth): In a subsequent PR, check the return value here and
Mark D. Rothb3322562017-02-23 14:38:02 -08001214 // decide whether or not to retry. Note that we should only
1215 // record failures whose statuses match the configured retryable
1216 // or non-fatal status codes.
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001217 grpc_server_retry_throttle_data_record_failure(
Mark D. Roth9ccbc4d2017-03-15 08:30:04 -07001218 calld->retry_throttle_data);
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001219 }
1220 }
ncteisen274bbbe2017-06-08 14:57:11 -07001221 GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
Mark D. Roth95039b52017-02-24 07:59:45 -08001222 GRPC_ERROR_REF(error));
Mark D. Rothd6d192d2017-02-23 08:58:42 -08001223}
1224
Craig Tillerbe9691a2017-02-14 10:00:42 -08001225/* The logic here is fairly complicated, due to (a) the fact that we
1226 need to handle the case where we receive the send op before the
1227 initial metadata op, and (b) the need for efficiency, especially in
1228 the streaming case.
1229
1230 We use double-checked locking to initially see if initialization has been
1231 performed. If it has not, we acquire the combiner and perform initialization.
1232 If it has, we proceed on the fast path. */
Craig Tillere1b51da2017-03-31 15:44:33 -07001233static void cc_start_transport_stream_op_batch(
1234 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
1235 grpc_transport_stream_op_batch *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001236 call_data *calld = elem->call_data;
1237 channel_data *chand = elem->channel_data;
1238 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001239 if (chand->deadline_checking_enabled) {
Craig Tiller29ebc572017-04-04 08:00:55 -07001240 grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
1241 op);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001242 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001243 // Intercept on_complete for recv_trailing_metadata so that we can
1244 // check retry throttle status.
1245 if (op->recv_trailing_metadata) {
1246 GPR_ASSERT(op->on_complete != NULL);
1247 calld->original_on_complete = op->on_complete;
1248 GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
1249 grpc_schedule_on_exec_ctx);
1250 op->on_complete = &calld->on_complete;
1251 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001252 /* try to (atomically) get the call */
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001253 call_or_error coe = get_call_or_error(calld);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001254 GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001255 if (coe.error != GRPC_ERROR_NONE) {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001256 grpc_transport_stream_op_batch_finish_with_failure(
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001257 exec_ctx, op, GRPC_ERROR_REF(coe.error));
Craig Tillera0f3abd2017-03-31 15:42:16 -07001258 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001259 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001260 return;
1261 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001262 if (coe.subchannel_call != NULL) {
1263 grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001264 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001265 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001266 return;
1267 }
1268 /* we failed; lock and figure out what to do */
Craig Tillera0f3abd2017-03-31 15:42:16 -07001269 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
Craig Tillerc55c1022017-03-10 10:26:42 -08001270 op->handler_private.extra_arg = elem;
ncteisen274bbbe2017-06-08 14:57:11 -07001271 GRPC_CLOSURE_SCHED(
1272 exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure,
Craig Tilleree4b1452017-05-12 10:56:03 -07001273 start_transport_stream_op_batch_locked, op,
1274 grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -08001275 GRPC_ERROR_NONE);
Craig Tillera0f3abd2017-03-31 15:42:16 -07001276 GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001277}
1278
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001279/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001280static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1281 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001282 const grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001283 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001284 channel_data *chand = elem->channel_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001285 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001286 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001287 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001288 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001289 calld->owning_call = args->call_stack;
Craig Tillerd426cac2017-03-13 12:30:45 -07001290 calld->arena = args->arena;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001291 if (chand->deadline_checking_enabled) {
Craig Tiller71d6ce62017-04-06 09:10:09 -07001292 grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07001293 }
Mark D. Roth0badbe82016-06-23 10:15:12 -07001294 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001295}
1296
1297/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001298static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1299 grpc_call_element *elem,
1300 const grpc_call_final_info *final_info,
Craig Tillerd426cac2017-03-13 12:30:45 -07001301 grpc_closure *then_schedule_closure) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001302 call_data *calld = elem->call_data;
Craig Tiller3be7dd02017-04-03 14:30:03 -07001303 channel_data *chand = elem->channel_data;
1304 if (chand->deadline_checking_enabled) {
1305 grpc_deadline_state_destroy(exec_ctx, elem);
1306 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001307 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Roth95b627b2017-02-24 11:02:58 -08001308 if (calld->method_params != NULL) {
1309 method_parameters_unref(calld->method_params);
1310 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001311 call_or_error coe = get_call_or_error(calld);
1312 GRPC_ERROR_UNREF(coe.error);
1313 if (coe.subchannel_call != NULL) {
1314 grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
1315 then_schedule_closure);
Craig Tillerd426cac2017-03-13 12:30:45 -07001316 then_schedule_closure = NULL;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07001317 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
1318 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001319 }
Mark D. Roth64a317c2017-05-02 08:27:08 -07001320 GPR_ASSERT(!calld->pick_pending);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001321 GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001322 if (calld->connected_subchannel != NULL) {
1323 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1324 "picked");
1325 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001326 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
1327 if (calld->subchannel_call_context[i].value != NULL) {
1328 calld->subchannel_call_context[i].destroy(
1329 calld->subchannel_call_context[i].value);
1330 }
1331 }
ncteisen274bbbe2017-06-08 14:57:11 -07001332 GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001333}
1334
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001335static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1336 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001337 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001338 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001339 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001340}
1341
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001342/*************************************************************************
1343 * EXPORTED SYMBOLS
1344 */
1345
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001346const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07001347 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07001348 cc_start_transport_op,
1349 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001350 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001351 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001352 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001353 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001354 cc_init_channel_elem,
1355 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001356 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001357 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001358 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001359};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001360
Craig Tiller613dafa2017-02-09 12:00:43 -08001361static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1362 grpc_error *error_ignored) {
1363 channel_data *chand = arg;
1364 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001365 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001366 } else {
1367 chand->exit_idle_when_lb_policy_arrives = true;
1368 if (!chand->started_resolving && chand->resolver != NULL) {
1369 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1370 chand->started_resolving = true;
Craig Tiller972470b2017-02-09 15:05:36 -08001371 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1372 &chand->resolver_result,
1373 &chand->on_resolver_result_changed);
Craig Tiller613dafa2017-02-09 12:00:43 -08001374 }
1375 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001376 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001377}
1378
Craig Tillera82950e2015-09-22 12:33:20 -07001379grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1380 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001381 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001382 grpc_connectivity_state out =
1383 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001384 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001385 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07001386 GRPC_CLOSURE_SCHED(
1387 exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -07001388 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001389 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001390 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001391 return out;
1392}
1393
Alexander Polcync3b1f182017-04-18 13:51:36 -07001394typedef struct external_connectivity_watcher {
Craig Tiller86c99582015-11-25 15:22:26 -08001395 channel_data *chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001396 grpc_polling_entity pollent;
Craig Tiller86c99582015-11-25 15:22:26 -08001397 grpc_closure *on_complete;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001398 grpc_closure *watcher_timer_init;
Craig Tiller613dafa2017-02-09 12:00:43 -08001399 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001400 grpc_closure my_closure;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001401 struct external_connectivity_watcher *next;
Craig Tiller86c99582015-11-25 15:22:26 -08001402} external_connectivity_watcher;
1403
Alexander Polcync3b1f182017-04-18 13:51:36 -07001404static external_connectivity_watcher *lookup_external_connectivity_watcher(
1405 channel_data *chand, grpc_closure *on_complete) {
1406 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1407 external_connectivity_watcher *w =
1408 chand->external_connectivity_watcher_list_head;
1409 while (w != NULL && w->on_complete != on_complete) {
1410 w = w->next;
1411 }
1412 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1413 return w;
1414}
1415
1416static void external_connectivity_watcher_list_append(
1417 channel_data *chand, external_connectivity_watcher *w) {
1418 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
1419
1420 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
1421 GPR_ASSERT(!w->next);
1422 w->next = chand->external_connectivity_watcher_list_head;
1423 chand->external_connectivity_watcher_list_head = w;
1424 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
1425}
1426
1427static void external_connectivity_watcher_list_remove(
1428 channel_data *chand, external_connectivity_watcher *too_remove) {
1429 GPR_ASSERT(
1430 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
1431 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1432 if (too_remove == chand->external_connectivity_watcher_list_head) {
1433 chand->external_connectivity_watcher_list_head = too_remove->next;
1434 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1435 return;
1436 }
1437 external_connectivity_watcher *w =
1438 chand->external_connectivity_watcher_list_head;
1439 while (w != NULL) {
1440 if (w->next == too_remove) {
1441 w->next = w->next->next;
1442 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1443 return;
1444 }
1445 w = w->next;
1446 }
1447 GPR_UNREACHABLE_CODE(return );
1448}
1449
1450int grpc_client_channel_num_external_connectivity_watchers(
1451 grpc_channel_element *elem) {
1452 channel_data *chand = elem->channel_data;
1453 int count = 0;
1454
1455 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
1456 external_connectivity_watcher *w =
1457 chand->external_connectivity_watcher_list_head;
1458 while (w != NULL) {
1459 count++;
1460 w = w->next;
1461 }
1462 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
1463
1464 return count;
1465}
1466
Craig Tiller1d881fb2015-12-01 07:39:04 -08001467static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001468 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001469 external_connectivity_watcher *w = arg;
1470 grpc_closure *follow_up = w->on_complete;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001471 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1472 w->chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001473 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1474 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07001475 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08001476 gpr_free(w);
ncteisen274bbbe2017-06-08 14:57:11 -07001477 GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08001478}
1479
Craig Tillera8610c02017-02-14 10:05:11 -08001480static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1481 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001482 external_connectivity_watcher *w = arg;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001483 external_connectivity_watcher *found = NULL;
1484 if (w->state != NULL) {
1485 external_connectivity_watcher_list_append(w->chand, w);
ncteisen274bbbe2017-06-08 14:57:11 -07001486 GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
1487 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
Alexander Polcync3b1f182017-04-18 13:51:36 -07001488 grpc_schedule_on_exec_ctx);
1489 grpc_connectivity_state_notify_on_state_change(
1490 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
1491 } else {
1492 GPR_ASSERT(w->watcher_timer_init == NULL);
1493 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
1494 if (found) {
1495 GPR_ASSERT(found->on_complete == w->on_complete);
1496 grpc_connectivity_state_notify_on_state_change(
1497 exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
1498 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001499 grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
1500 w->chand->interested_parties);
Alexander Polcync3b1f182017-04-18 13:51:36 -07001501 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1502 "external_connectivity_watcher");
1503 gpr_free(w);
1504 }
Craig Tiller86c99582015-11-25 15:22:26 -08001505}
1506
Craig Tillera82950e2015-09-22 12:33:20 -07001507void grpc_client_channel_watch_connectivity_state(
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001508 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
1509 grpc_polling_entity pollent, grpc_connectivity_state *state,
1510 grpc_closure *closure, grpc_closure *watcher_timer_init) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001511 channel_data *chand = elem->channel_data;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001512 external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
Craig Tiller86c99582015-11-25 15:22:26 -08001513 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001514 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07001515 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08001516 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07001517 w->watcher_timer_init = watcher_timer_init;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001518 grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
1519 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001520 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1521 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07001522 GRPC_CLOSURE_SCHED(
Craig Tiller613dafa2017-02-09 12:00:43 -08001523 exec_ctx,
ncteisen274bbbe2017-06-08 14:57:11 -07001524 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07001525 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08001526 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001527}