blob: bedb13d974105059a1cc4eab5d31dbcb09131394 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Mark D. Roth2137cd82016-09-14 09:04:00 -070034#include "src/core/ext/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070042#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/sync.h>
44#include <grpc/support/useful.h>
45
Mark D. Rothd58a9852017-01-18 08:28:57 -080046#include "src/core/ext/client_channel/http_connect_handshaker.h"
Mark D. Roth15195742016-10-07 09:02:28 -070047#include "src/core/ext/client_channel/lb_policy_registry.h"
Mark D. Rothdc9bee72017-02-07 12:29:14 -080048#include "src/core/ext/client_channel/proxy_mapper_registry.h"
Mark D. Roth21d4b2d2016-11-18 09:53:41 -080049#include "src/core/ext/client_channel/resolver_registry.h"
Mark D. Roth2137cd82016-09-14 09:04:00 -070050#include "src/core/ext/client_channel/subchannel.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/channel/channel_args.h"
52#include "src/core/lib/channel/connected_channel.h"
Mark D. Roth72f6da82016-09-02 13:42:38 -070053#include "src/core/lib/channel/deadline_filter.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080054#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070055#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070056#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070057#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080058#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070059#include "src/core/lib/support/string.h"
60#include "src/core/lib/surface/channel.h"
61#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/metadata.h"
63#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070064#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070065#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070066
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080067/* Client channel implementation */
68
Mark D. Roth26b7be42016-10-24 10:08:07 -070069/*************************************************************************
70 * METHOD-CONFIG TABLE
71 */
72
Mark D. Roth9d480942016-10-19 14:18:05 -070073typedef enum {
74 WAIT_FOR_READY_UNSET,
75 WAIT_FOR_READY_FALSE,
76 WAIT_FOR_READY_TRUE
77} wait_for_ready_value;
78
79typedef struct method_parameters {
80 gpr_timespec timeout;
81 wait_for_ready_value wait_for_ready;
82} method_parameters;
83
84static void *method_parameters_copy(void *value) {
85 void *new_value = gpr_malloc(sizeof(method_parameters));
86 memcpy(new_value, value, sizeof(method_parameters));
87 return new_value;
88}
89
Craig Tillerb28c7e82016-11-18 10:29:04 -080090static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
Craig Tiller87a7e1f2016-11-09 09:42:19 -080091 gpr_free(p);
92}
93
Craig Tiller7c70b6c2017-01-23 07:48:42 -080094static const grpc_slice_hash_table_vtable method_parameters_vtable = {
Craig Tillerb28c7e82016-11-18 10:29:04 -080095 method_parameters_free, method_parameters_copy};
Mark D. Roth9d480942016-10-19 14:18:05 -070096
Mark D. Rothe30baeb2016-11-03 08:16:19 -070097static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -070098 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -070099 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
100 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700101 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800102 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700103 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
104 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
105 return NULL;
106 }
Mark D. Roth47f10842016-11-03 08:45:27 -0700107 wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
108 : WAIT_FOR_READY_FALSE;
Mark D. Rothc968e602016-11-02 14:07:36 -0700109 } else if (strcmp(field->key, "timeout") == 0) {
110 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth84c8a022016-11-10 09:39:34 -0800111 if (field->type != GRPC_JSON_STRING) return NULL;
112 size_t len = strlen(field->value);
113 if (field->value[len - 1] != 's') return NULL;
Mark D. Rothc19049c2016-11-10 09:43:06 -0800114 char *buf = gpr_strdup(field->value);
Mark D. Roth84c8a022016-11-10 09:39:34 -0800115 buf[len - 1] = '\0'; // Remove trailing 's'.
Mark D. Rothc19049c2016-11-10 09:43:06 -0800116 char *decimal_point = strchr(buf, '.');
Mark D. Roth84c8a022016-11-10 09:39:34 -0800117 if (decimal_point != NULL) {
118 *decimal_point = '\0';
119 timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
120 if (timeout.tv_nsec == -1) {
121 gpr_free(buf);
Mark D. Rothc968e602016-11-02 14:07:36 -0700122 return NULL;
123 }
Mark D. Roth84c8a022016-11-10 09:39:34 -0800124 // There should always be exactly 3, 6, or 9 fractional digits.
125 int multiplier = 1;
126 switch (strlen(decimal_point + 1)) {
127 case 9:
128 break;
129 case 6:
130 multiplier *= 1000;
131 break;
132 case 3:
133 multiplier *= 1000000;
134 break;
135 default: // Unsupported number of digits.
136 gpr_free(buf);
137 return NULL;
138 }
139 timeout.tv_nsec *= multiplier;
Mark D. Rothc968e602016-11-02 14:07:36 -0700140 }
Mark D. Roth84c8a022016-11-10 09:39:34 -0800141 timeout.tv_sec = gpr_parse_nonnegative_int(buf);
142 if (timeout.tv_sec == -1) return NULL;
143 gpr_free(buf);
Mark D. Rothc968e602016-11-02 14:07:36 -0700144 }
145 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700146 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Rothc968e602016-11-02 14:07:36 -0700147 value->timeout = timeout;
148 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700149 return value;
150}
151
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700152/*************************************************************************
153 * CHANNEL-WIDE FUNCTIONS
154 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800155
Craig Tiller800dacb2015-10-06 09:10:26 -0700156typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700157 /** resolver for this channel */
158 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700159 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700160 bool started_resolving;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700161 /** client channel factory */
162 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700163
Craig Tillerbefafe62017-02-09 11:30:54 -0800164 /** combiner protecting all variables below in this data structure */
165 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700166 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700167 grpc_lb_policy *lb_policy;
Mark D. Roth9d480942016-10-19 14:18:05 -0700168 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800169 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700170 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700171 grpc_channel_args *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -0700172 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -0700173 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700174 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700175 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700176 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700177 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700178 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700179 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800180 /** owning stack */
181 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800182 /** interested parties (owned) */
183 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800184
185 /* the following properties are guarded by a mutex since API's require them
186 to be instantaniously available */
187 gpr_mu info_mu;
188 char *info_lb_policy_name;
189 /** service config in JSON form */
190 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191} channel_data;
192
Craig Tillerd6c98df2015-08-18 09:33:44 -0700193/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700194 resolver, to watch for state changes from the lb_policy. When a state
195 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700196typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700197 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700198 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700199 grpc_connectivity_state state;
200 grpc_lb_policy *lb_policy;
201} lb_policy_connectivity_watcher;
202
Craig Tillera82950e2015-09-22 12:33:20 -0700203static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
204 grpc_lb_policy *lb_policy,
205 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700206
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800207static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
208 channel_data *chand,
209 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700210 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800211 const char *reason) {
212 if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
Craig Tiller48ed92e2016-06-02 11:07:12 -0700213 state == GRPC_CHANNEL_SHUTDOWN) &&
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800214 chand->lb_policy != NULL) {
Mark D. Roth59c9f902016-09-28 13:33:21 -0700215 /* cancel picks with wait_for_ready=false */
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800216 grpc_lb_policy_cancel_picks(
217 exec_ctx, chand->lb_policy,
Mark D. Roth59c9f902016-09-28 13:33:21 -0700218 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
Mark D. Roth58f52b72016-09-09 13:55:18 -0700219 /* check= */ 0, GRPC_ERROR_REF(error));
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800220 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700221 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
222 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800223}
224
Craig Tiller804ff712016-05-05 16:25:40 -0700225static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800226 void *arg, grpc_error *error) {
227 lb_policy_connectivity_watcher *w = arg;
Craig Tillercb2609f2015-11-24 17:19:19 -0800228 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800229 /* check if the notification is for the latest policy */
230 if (w->lb_policy == w->chand->lb_policy) {
231 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
232 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
233 grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
234 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
235 w->chand->lb_policy = NULL;
236 }
237 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
238 GRPC_ERROR_REF(error), "lb_changed");
239 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
240 watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
241 }
Craig Tillera82950e2015-09-22 12:33:20 -0700242 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700243
Craig Tiller906e3bc2015-11-24 07:31:31 -0800244 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700245 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700246}
247
Craig Tillera82950e2015-09-22 12:33:20 -0700248static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
249 grpc_lb_policy *lb_policy,
250 grpc_connectivity_state current_state) {
251 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800252 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700253
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700254 w->chand = chand;
Craig Tillerbefafe62017-02-09 11:30:54 -0800255 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
256 grpc_combiner_scheduler(chand->combiner, false));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700257 w->state = current_state;
258 w->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700259 grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
260 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700261}
262
Craig Tillerbefafe62017-02-09 11:30:54 -0800263static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
264 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700265 channel_data *chand = arg;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700266 char *lb_policy_name = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700267 grpc_lb_policy *lb_policy = NULL;
268 grpc_lb_policy *old_lb_policy;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800269 grpc_slice_hash_table *method_params_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700270 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700271 bool exit_idle = false;
Craig Tiller804ff712016-05-05 16:25:40 -0700272 grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800273 char *service_config_json = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700274
Mark D. Roth046cf762016-09-26 11:13:51 -0700275 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700276 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700277 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700278 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700279 if (channel_arg != NULL) {
280 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
281 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700282 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700283 // Special case: If all of the addresses are balancer addresses,
284 // assume that we should use the grpclb policy, regardless of what the
285 // resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700286 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700287 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
Mark D. Rothaf842452016-10-21 15:05:15 -0700288 if (channel_arg != NULL) {
289 GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
Mark D. Roth557c9902016-10-24 11:12:05 -0700290 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Rothaf842452016-10-21 15:05:15 -0700291 bool found_backend_address = false;
292 for (size_t i = 0; i < addresses->num_addresses; ++i) {
293 if (!addresses->addresses[i].is_balancer) {
294 found_backend_address = true;
295 break;
296 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700297 }
Mark D. Rothaf842452016-10-21 15:05:15 -0700298 if (!found_backend_address) {
299 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
300 gpr_log(GPR_INFO,
301 "resolver requested LB policy %s but provided only balancer "
302 "addresses, no backend addresses -- forcing use of grpclb LB "
303 "policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700304 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700305 }
306 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700307 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700308 }
309 // Use pick_first if nothing was specified and we didn't select grpclb
310 // above.
311 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700312 // Instantiate LB policy.
313 grpc_lb_policy_args lb_policy_args;
314 lb_policy_args.args = chand->resolver_result;
315 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Mark D. Roth88405f72016-10-03 08:24:52 -0700316 lb_policy =
317 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Craig Tillera82950e2015-09-22 12:33:20 -0700318 if (lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700319 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tillerf707d622016-05-06 14:26:12 -0700320 GRPC_ERROR_UNREF(state_error);
Craig Tiller804ff712016-05-05 16:25:40 -0700321 state =
322 grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
Craig Tiller45724b32015-09-22 10:42:19 -0700323 }
Mark D. Roth41124992016-11-03 11:22:20 -0700324 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700325 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700326 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700327 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700328 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800329 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700330 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800331 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700332 if (service_config != NULL) {
333 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800334 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700335 &method_parameters_vtable);
336 grpc_service_config_destroy(service_config);
337 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700338 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700339 // Before we clean up, save a copy of lb_policy_name, since it might
340 // be pointing to data inside chand->resolver_result.
341 // The copy will be saved in chand->lb_policy_name below.
342 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800343 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700344 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700345 }
346
Craig Tiller86c99582015-11-25 15:22:26 -0800347 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800348 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
349 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800350 }
351
Craig Tiller613dafa2017-02-09 12:00:43 -0800352 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700353 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800354 gpr_free(chand->info_lb_policy_name);
355 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700356 }
Craig Tiller3f475422015-06-25 10:43:05 -0700357 old_lb_policy = chand->lb_policy;
358 chand->lb_policy = lb_policy;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800359 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800360 gpr_free(chand->info_service_config_json);
361 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800362 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800363 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth9d480942016-10-19 14:18:05 -0700364 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800365 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700366 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700367 chand->method_params_table = method_params_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700368 if (lb_policy != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800369 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller0ede5452016-04-23 12:21:45 -0700370 } else if (chand->resolver == NULL /* disconnected */) {
Craig Tiller804ff712016-05-05 16:25:40 -0700371 grpc_closure_list_fail_all(
372 &chand->waiting_for_config_closures,
373 GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
Craig Tiller91031da2016-12-28 15:44:25 -0800374 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tillera82950e2015-09-22 12:33:20 -0700375 }
376 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
377 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700378 exit_idle = true;
379 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700380 }
Craig Tiller98465032015-06-29 14:36:42 -0700381
Craig Tiller804ff712016-05-05 16:25:40 -0700382 if (error == GRPC_ERROR_NONE && chand->resolver) {
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700383 set_channel_connectivity_state_locked(
384 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Craig Tillera82950e2015-09-22 12:33:20 -0700385 if (lb_policy != NULL) {
386 watch_lb_policy(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700387 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800388 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth046cf762016-09-26 11:13:51 -0700389 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700390 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700391 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800392 if (chand->resolver != NULL) {
393 grpc_resolver_shutdown(exec_ctx, chand->resolver);
394 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
395 chand->resolver = NULL;
396 }
Craig Tiller804ff712016-05-05 16:25:40 -0700397 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800398 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700399 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller804ff712016-05-05 16:25:40 -0700400 GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
401 GPR_ARRAY_SIZE(refs)),
402 "resolver_gone");
Craig Tillera82950e2015-09-22 12:33:20 -0700403 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700404
Craig Tillera82950e2015-09-22 12:33:20 -0700405 if (exit_idle) {
406 grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
407 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
408 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700409
Craig Tillera82950e2015-09-22 12:33:20 -0700410 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800411 grpc_pollset_set_del_pollset_set(
412 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700413 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
414 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700415
Craig Tillera82950e2015-09-22 12:33:20 -0700416 if (lb_policy != NULL) {
417 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
418 }
Craig Tiller45724b32015-09-22 10:42:19 -0700419
Craig Tiller906e3bc2015-11-24 07:31:31 -0800420 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700421 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700422}
423
Craig Tillera8610c02017-02-14 10:05:11 -0800424static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
425 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800426 grpc_transport_op *op = arg;
427 grpc_channel_element *elem = op->transport_private.args[0];
Craig Tillerca3e9d32015-06-27 18:37:27 -0700428 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700429
Craig Tillera82950e2015-09-22 12:33:20 -0700430 if (op->on_connectivity_state_change != NULL) {
431 grpc_connectivity_state_notify_on_state_change(
432 exec_ctx, &chand->state_tracker, op->connectivity_state,
433 op->on_connectivity_state_change);
434 op->on_connectivity_state_change = NULL;
435 op->connectivity_state = NULL;
436 }
437
Craig Tiller26dab312015-12-07 14:43:47 -0800438 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800439 if (chand->lb_policy == NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800440 grpc_closure_sched(exec_ctx, op->send_ping,
441 GRPC_ERROR_CREATE("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800442 } else {
Craig Tiller28bf8912015-12-07 16:07:04 -0800443 grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800444 op->bind_pollset = NULL;
445 }
446 op->send_ping = NULL;
447 }
448
Craig Tiller1c51edc2016-05-07 16:18:43 -0700449 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
450 if (chand->resolver != NULL) {
451 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700452 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700453 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
454 grpc_resolver_shutdown(exec_ctx, chand->resolver);
455 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
456 chand->resolver = NULL;
457 if (!chand->started_resolving) {
458 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
459 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller91031da2016-12-28 15:44:25 -0800460 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700461 }
462 if (chand->lb_policy != NULL) {
463 grpc_pollset_set_del_pollset_set(exec_ctx,
464 chand->lb_policy->interested_parties,
465 chand->interested_parties);
466 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
467 chand->lb_policy = NULL;
468 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700469 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700470 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700471 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800472 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
473
474 grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800475}
476
477static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
478 grpc_channel_element *elem,
479 grpc_transport_op *op) {
480 channel_data *chand = elem->channel_data;
481
Craig Tillerbefafe62017-02-09 11:30:54 -0800482 GPR_ASSERT(op->set_accept_stream == false);
483 if (op->bind_pollset != NULL) {
484 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
485 op->bind_pollset);
486 }
487
488 op->transport_private.args[0] = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800489 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
Craig Tillerbefafe62017-02-09 11:30:54 -0800490 grpc_closure_sched(
Craig Tillera8610c02017-02-14 10:05:11 -0800491 exec_ctx, grpc_closure_init(
492 &op->transport_private.closure, start_transport_op_locked,
493 op, grpc_combiner_scheduler(chand->combiner, false)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800494 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700495}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496
Mark D. Rothb2d24882016-10-27 15:44:07 -0700497static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
498 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700499 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700500 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800501 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700502 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800503 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700504 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800505 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700506 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800507 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800508 *info->service_config_json =
509 chand->info_service_config_json == NULL
510 ? NULL
511 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800512 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800513 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700514}
515
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700516/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800517static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800518 grpc_channel_element *elem,
519 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700520 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700521 memset(chand, 0, sizeof(*chand));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700522 GPR_ASSERT(args->is_last);
523 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800524 // Initialize data members.
Craig Tillerbefafe62017-02-09 11:30:54 -0800525 chand->combiner = grpc_combiner_create(NULL);
Craig Tillerd85477512017-02-09 12:02:39 -0800526 gpr_mu_init(&chand->info_mu);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800527 chand->owning_stack = args->channel_stack;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700528 grpc_closure_init(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800529 on_resolver_result_changed_locked, chand,
530 grpc_combiner_scheduler(chand->combiner, false));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800531 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700532 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
533 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800534 // Record client channel factory.
535 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
536 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
537 GPR_ASSERT(arg != NULL);
538 GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
539 grpc_client_channel_factory_ref(arg->value.pointer.p);
540 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800541 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800542 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800543 GPR_ASSERT(arg != NULL);
544 GPR_ASSERT(arg->type == GRPC_ARG_STRING);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800545 char *proxy_name = NULL;
546 grpc_channel_args *new_args = NULL;
547 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
548 &proxy_name, &new_args);
549 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800550 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800551 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
552 new_args != NULL ? new_args : args->channel_args,
553 chand->interested_parties);
554 if (proxy_name != NULL) gpr_free(proxy_name);
555 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800556 if (chand->resolver == NULL) {
557 return GRPC_ERROR_CREATE("resolver creation failed");
558 }
559 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700560}
561
562/* Destructor for channel_data */
563static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
564 grpc_channel_element *elem) {
565 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700566 if (chand->resolver != NULL) {
567 grpc_resolver_shutdown(exec_ctx, chand->resolver);
568 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
569 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700570 if (chand->client_channel_factory != NULL) {
571 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
572 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700573 if (chand->lb_policy != NULL) {
574 grpc_pollset_set_del_pollset_set(exec_ctx,
575 chand->lb_policy->interested_parties,
576 chand->interested_parties);
577 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
578 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800579 gpr_free(chand->info_lb_policy_name);
580 gpr_free(chand->info_service_config_json);
Mark D. Roth9d480942016-10-19 14:18:05 -0700581 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800582 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700583 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700584 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
585 grpc_pollset_set_destroy(chand->interested_parties);
Craig Tillerf1021672017-02-09 21:29:50 -0800586 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800587 gpr_mu_destroy(&chand->info_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700588}
589
590/*************************************************************************
591 * PER-CALL FUNCTIONS
592 */
593
594#define GET_CALL(call_data) \
595 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
596
597#define CANCELLED_CALL ((grpc_subchannel_call *)1)
598
599typedef enum {
600 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
601 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
602} subchannel_creation_phase;
603
604/** Call data. Holds a pointer to grpc_subchannel_call and the
605 associated machinery to create such a pointer.
606 Handles queueing of stream ops until a call object is ready, waiting
607 for initial metadata before trying to create a call object,
608 and handling cancellation gracefully. */
609typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700610 // State for handling deadlines.
611 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700612 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
613 // and this struct both independently store a pointer to the call
614 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700615 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700616 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700617
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800618 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700619 gpr_timespec call_start_time;
620 gpr_timespec deadline;
Mark D. Roth9d480942016-10-19 14:18:05 -0700621 wait_for_ready_value wait_for_ready_from_service_config;
Mark D. Rothe40dd292016-10-05 14:58:37 -0700622 grpc_closure read_service_config;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700623
Mark D. Rothf28763c2016-09-14 15:18:40 -0700624 grpc_error *cancel_error;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700625
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700626 /** either 0 for no call, 1 for cancelled, or a pointer to a
627 grpc_subchannel_call */
628 gpr_atm subchannel_call;
629
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700630 subchannel_creation_phase creation_phase;
631 grpc_connected_subchannel *connected_subchannel;
632 grpc_polling_entity *pollent;
633
Craig Tiller57726ca2016-09-12 11:59:45 -0700634 grpc_transport_stream_op **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700635 size_t waiting_ops_count;
636 size_t waiting_ops_capacity;
637
638 grpc_closure next_step;
639
640 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200641
642 grpc_linked_mdelem lb_token_mdelem;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700643} call_data;
644
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800645grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
646 grpc_call_element *call_elem) {
647 grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
648 return scc == CANCELLED_CALL ? NULL : scc;
649}
650
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700651static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
652 GPR_TIMER_BEGIN("add_waiting_locked", 0);
653 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
654 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
655 calld->waiting_ops =
656 gpr_realloc(calld->waiting_ops,
657 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
658 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700659 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700660 GPR_TIMER_END("add_waiting_locked", 0);
661}
662
663static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
664 grpc_error *error) {
665 size_t i;
666 for (i = 0; i < calld->waiting_ops_count; i++) {
667 grpc_transport_stream_op_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700668 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700669 }
670 calld->waiting_ops_count = 0;
671 GRPC_ERROR_UNREF(error);
672}
673
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700674static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700675 if (calld->waiting_ops_count == 0) {
676 return;
677 }
678
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800679 grpc_subchannel_call *call = GET_CALL(calld);
680 grpc_transport_stream_op **ops = calld->waiting_ops;
681 size_t nops = calld->waiting_ops_count;
682 if (call == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700683 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
684 return;
685 }
686 calld->waiting_ops = NULL;
687 calld->waiting_ops_count = 0;
688 calld->waiting_ops_capacity = 0;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800689 for (size_t i = 0; i < nops; i++) {
690 grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
691 }
Craig Tiller9efea882017-02-09 13:06:52 -0800692 gpr_free(ops);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700693}
694
Craig Tillerbefafe62017-02-09 11:30:54 -0800695static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
696 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700697 grpc_call_element *elem = arg;
698 call_data *calld = elem->call_data;
699 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700700 GPR_ASSERT(calld->creation_phase ==
701 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
Yuchen Zeng19656b12016-09-01 18:00:45 -0700702 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
703 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700704 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
705 if (calld->connected_subchannel == NULL) {
706 gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
707 fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
708 "Failed to create subchannel", &error, 1));
Mark D. Roth72f6da82016-09-02 13:42:38 -0700709 } else if (GET_CALL(calld) == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700710 /* already cancelled before subchannel became ready */
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800711 grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING(
712 "Cancelled before creating subchannel", &error, 1);
713 /* if due to deadline, attach the deadline exceeded status to the error */
714 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
715 cancellation_error =
716 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
717 GRPC_STATUS_DEADLINE_EXCEEDED);
718 }
719 fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700720 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700721 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700722 grpc_subchannel_call *subchannel_call = NULL;
723 grpc_error *new_error = grpc_connected_subchannel_create_call(
Mark D. Rothaa850a72016-09-26 13:38:02 -0700724 exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
Mark D. Roth3d883412016-11-07 13:42:54 -0800725 calld->call_start_time, calld->deadline, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700726 if (new_error != GRPC_ERROR_NONE) {
727 new_error = grpc_error_add_child(new_error, error);
728 subchannel_call = CANCELLED_CALL;
729 fail_locked(exec_ctx, calld, new_error);
730 }
731 gpr_atm_rel_store(&calld->subchannel_call,
732 (gpr_atm)(uintptr_t)subchannel_call);
733 retry_waiting_locked(exec_ctx, calld);
734 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700735 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
736}
737
738static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
739 call_data *calld = elem->call_data;
740 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
741 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
742 return NULL;
743 } else {
744 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
745 }
746}
747
Craig Tiller577c9b22015-11-02 14:11:15 -0800748typedef struct {
749 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800750 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -0800751 grpc_connected_subchannel **connected_subchannel;
Craig Tiller577c9b22015-11-02 14:11:15 -0800752 grpc_closure *on_ready;
753 grpc_call_element *elem;
754 grpc_closure closure;
755} continue_picking_args;
756
Yuchen Zeng144ce652016-09-01 18:19:34 -0700757/** Return true if subchannel is available immediately (in which case on_ready
758 should not be called), or false otherwise (in which case on_ready should be
759 called when the subchannel is available). */
Craig Tillerbefafe62017-02-09 11:30:54 -0800760static bool pick_subchannel_locked(
761 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
762 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
763 grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
764 grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800765
Craig Tillerbefafe62017-02-09 11:30:54 -0800766static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
767 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800768 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -0700769 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800770 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700771 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller91031da2016-12-28 15:44:25 -0800772 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700773 } else {
Craig Tillerbefafe62017-02-09 11:30:54 -0800774 if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
775 cpa->initial_metadata_flags,
776 cpa->connected_subchannel, cpa->on_ready,
777 GRPC_ERROR_NONE)) {
Craig Tiller91031da2016-12-28 15:44:25 -0800778 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700779 }
Craig Tiller577c9b22015-11-02 14:11:15 -0800780 }
781 gpr_free(cpa);
782}
783
Craig Tillerbefafe62017-02-09 11:30:54 -0800784static bool pick_subchannel_locked(
785 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
786 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
787 grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
788 grpc_error *error) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700789 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700790
Craig Tiller577c9b22015-11-02 14:11:15 -0800791 channel_data *chand = elem->channel_data;
792 call_data *calld = elem->call_data;
793 continue_picking_args *cpa;
794 grpc_closure *closure;
795
Craig Tillerb5585d42015-11-17 07:18:31 -0800796 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -0800797
Craig Tiller577c9b22015-11-02 14:11:15 -0800798 if (initial_metadata == NULL) {
799 if (chand->lb_policy != NULL) {
Craig Tillerab33b482015-11-21 08:11:04 -0800800 grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
Mark D. Roth5f844002016-09-08 08:20:53 -0700801 connected_subchannel, GRPC_ERROR_REF(error));
Craig Tiller577c9b22015-11-02 14:11:15 -0800802 }
803 for (closure = chand->waiting_for_config_closures.head; closure != NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700804 closure = closure->next_data.next) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800805 cpa = closure->cb_arg;
Craig Tillerb5585d42015-11-17 07:18:31 -0800806 if (cpa->connected_subchannel == connected_subchannel) {
807 cpa->connected_subchannel = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800808 grpc_closure_sched(
Mark D. Roth932b10c2016-09-09 08:44:30 -0700809 exec_ctx, cpa->on_ready,
Craig Tiller91031da2016-12-28 15:44:25 -0800810 GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -0800811 }
812 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700813 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth697a1f62016-09-07 13:35:07 -0700814 GRPC_ERROR_UNREF(error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700815 return true;
Craig Tiller577c9b22015-11-02 14:11:15 -0800816 }
Mark D. Roth697a1f62016-09-07 13:35:07 -0700817 GPR_ASSERT(error == GRPC_ERROR_NONE);
Craig Tiller577c9b22015-11-02 14:11:15 -0800818 if (chand->lb_policy != NULL) {
Craig Tiller86c0f8a2015-12-01 20:05:40 -0800819 grpc_lb_policy *lb_policy = chand->lb_policy;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700820 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothe40dd292016-10-05 14:58:37 -0700821 // If the application explicitly set wait_for_ready, use that.
822 // Otherwise, if the service config specified a value for this
823 // method, use that.
Mark D. Rothc1c38582016-10-11 11:03:27 -0700824 const bool wait_for_ready_set_from_api =
825 initial_metadata_flags &
826 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
827 const bool wait_for_ready_set_from_service_config =
828 calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
829 if (!wait_for_ready_set_from_api &&
830 wait_for_ready_set_from_service_config) {
Mark D. Rothe40dd292016-10-05 14:58:37 -0700831 if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
832 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
833 } else {
834 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
835 }
836 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700837 const grpc_lb_policy_pick_args inputs = {
Yuchen Zengac8bc422016-10-05 14:00:02 -0700838 initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
839 gpr_inf_future(GPR_CLOCK_MONOTONIC)};
Mark D. Roth55f25b62016-10-12 14:55:20 -0700840 const bool result = grpc_lb_policy_pick(
841 exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700842 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
843 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700844 return result;
Craig Tiller577c9b22015-11-02 14:11:15 -0800845 }
846 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700847 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800848 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth046cf762016-09-26 11:13:51 -0700849 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700850 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -0800851 }
Craig Tiller0eab6972016-04-23 12:59:57 -0700852 if (chand->resolver != NULL) {
853 cpa = gpr_malloc(sizeof(*cpa));
854 cpa->initial_metadata = initial_metadata;
855 cpa->initial_metadata_flags = initial_metadata_flags;
856 cpa->connected_subchannel = connected_subchannel;
857 cpa->on_ready = on_ready;
858 cpa->elem = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -0800859 grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
860 grpc_combiner_scheduler(chand->combiner, true));
Craig Tiller804ff712016-05-05 16:25:40 -0700861 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
862 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -0700863 } else {
Craig Tiller91031da2016-12-28 15:44:25 -0800864 grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -0700865 }
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700866
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700867 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700868 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -0800869}
870
Craig Tillera8610c02017-02-14 10:05:11 -0800871static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
872 grpc_transport_stream_op *op,
873 grpc_call_element *elem) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800874 channel_data *chand = elem->channel_data;
Craig Tillera11bfc82017-02-14 09:56:33 -0800875 call_data *calld = elem->call_data;
Craig Tillerbefafe62017-02-09 11:30:54 -0800876 grpc_subchannel_call *call;
877
Craig Tillerbefafe62017-02-09 11:30:54 -0800878 /* need to recheck that another thread hasn't set the call */
879 call = GET_CALL(calld);
880 if (call == CANCELLED_CALL) {
881 grpc_transport_stream_op_finish_with_failure(
882 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -0800883 /* early out */
884 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800885 }
886 if (call != NULL) {
887 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera11bfc82017-02-14 09:56:33 -0800888 /* early out */
889 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800890 }
891 /* if this is a cancellation, then we can raise our cancelled flag */
892 if (op->cancel_error != GRPC_ERROR_NONE) {
893 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
894 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tillera11bfc82017-02-14 09:56:33 -0800895 /* recurse to retry */
Craig Tillera8610c02017-02-14 10:05:11 -0800896 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800897 /* early out */
898 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800899 } else {
Craig Tillerbe9691a2017-02-14 10:00:42 -0800900 /* Stash a copy of cancel_error in our call data, so that we can use
901 it for subsequent operations. This ensures that if the call is
902 cancelled before any ops are passed down (e.g., if the deadline
903 is in the past when the call starts), we can return the right
904 error to the caller when the first op does get passed down. */
Craig Tillerbefafe62017-02-09 11:30:54 -0800905 calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
906 switch (calld->creation_phase) {
907 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
908 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
909 break;
910 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
911 pick_subchannel_locked(exec_ctx, elem, NULL, 0,
912 &calld->connected_subchannel, NULL,
913 GRPC_ERROR_REF(op->cancel_error));
914 break;
915 }
916 grpc_transport_stream_op_finish_with_failure(
917 exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -0800918 /* early out */
919 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800920 }
921 }
922 /* if we don't have a subchannel, try to get one */
923 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
924 calld->connected_subchannel == NULL &&
925 op->send_initial_metadata != NULL) {
926 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
927 grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
928 grpc_combiner_scheduler(chand->combiner, true));
929 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
930 /* If a subchannel is not available immediately, the polling entity from
931 call_data should be provided to channel_data's interested_parties, so
932 that IO of the lb_policy and resolver could be done under it. */
933 if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata,
934 op->send_initial_metadata_flags,
935 &calld->connected_subchannel, &calld->next_step,
936 GRPC_ERROR_NONE)) {
937 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
938 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
939 } else {
940 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
941 chand->interested_parties);
942 }
943 }
944 /* if we've got a subchannel, then let's ask it to create a call */
945 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
946 calld->connected_subchannel != NULL) {
947 grpc_subchannel_call *subchannel_call = NULL;
948 grpc_error *error = grpc_connected_subchannel_create_call(
949 exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
950 calld->call_start_time, calld->deadline, &subchannel_call);
951 if (error != GRPC_ERROR_NONE) {
952 subchannel_call = CANCELLED_CALL;
953 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
954 grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
955 }
956 gpr_atm_rel_store(&calld->subchannel_call,
957 (gpr_atm)(uintptr_t)subchannel_call);
958 retry_waiting_locked(exec_ctx, calld);
Craig Tillera11bfc82017-02-14 09:56:33 -0800959 /* recurse to retry */
Craig Tillera8610c02017-02-14 10:05:11 -0800960 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800961 /* early out */
962 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800963 }
964 /* nothing to be done but wait */
965 add_waiting_locked(calld, op);
Craig Tillera11bfc82017-02-14 09:56:33 -0800966}
967
968static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
969 void *arg,
970 grpc_error *error_ignored) {
971 GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0);
972
973 grpc_transport_stream_op *op = arg;
974 grpc_call_element *elem = op->handler_private.args[0];
975 call_data *calld = elem->call_data;
976
Craig Tillera8610c02017-02-14 10:05:11 -0800977 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800978
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800979 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
980 "start_transport_stream_op");
Craig Tillera11bfc82017-02-14 09:56:33 -0800981 GPR_TIMER_END("cc_start_transport_stream_op_locked", 0);
Craig Tillerbefafe62017-02-09 11:30:54 -0800982}
983
Craig Tillerbe9691a2017-02-14 10:00:42 -0800984/* The logic here is fairly complicated, due to (a) the fact that we
985 need to handle the case where we receive the send op before the
986 initial metadata op, and (b) the need for efficiency, especially in
987 the streaming case.
988
989 We use double-checked locking to initially see if initialization has been
990 performed. If it has not, we acquire the combiner and perform initialization.
991 If it has, we proceed on the fast path. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700992static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
993 grpc_call_element *elem,
994 grpc_transport_stream_op *op) {
995 call_data *calld = elem->call_data;
Yuchen Zeng19656b12016-09-01 18:00:45 -0700996 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700997 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Mark D. Roth72f6da82016-09-02 13:42:38 -0700998 grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700999 /* try to (atomically) get the call */
1000 grpc_subchannel_call *call = GET_CALL(calld);
1001 GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
1002 if (call == CANCELLED_CALL) {
Mark D. Rothf28763c2016-09-14 15:18:40 -07001003 grpc_transport_stream_op_finish_with_failure(
1004 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001005 GPR_TIMER_END("cc_start_transport_stream_op", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001006 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001007 return;
1008 }
1009 if (call != NULL) {
1010 grpc_subchannel_call_process_op(exec_ctx, call, op);
1011 GPR_TIMER_END("cc_start_transport_stream_op", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001012 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001013 return;
1014 }
1015 /* we failed; lock and figure out what to do */
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001016 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001017 op->handler_private.args[0] = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001018 grpc_closure_sched(
1019 exec_ctx,
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001020 grpc_closure_init(&op->handler_private.closure,
Craig Tillerbefafe62017-02-09 11:30:54 -08001021 cc_start_transport_stream_op_locked, op,
1022 grpc_combiner_scheduler(chand->combiner, false)),
1023 GRPC_ERROR_NONE);
Craig Tillera11bfc82017-02-14 09:56:33 -08001024 GPR_TIMER_END("cc_start_transport_stream_op", 0);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001025}
1026
Mark D. Rothe40dd292016-10-05 14:58:37 -07001027// Gets data from the service config. Invoked when the resolver returns
1028// its initial result.
Craig Tillerbefafe62017-02-09 11:30:54 -08001029static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg,
1030 grpc_error *error) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001031 grpc_call_element *elem = arg;
1032 channel_data *chand = elem->channel_data;
1033 call_data *calld = elem->call_data;
1034 // If this is an error, there's no point in looking at the service config.
Mark D. Roth196387a2016-10-12 14:53:36 -07001035 if (error == GRPC_ERROR_NONE) {
1036 // Get the method config table from channel data.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001037 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth9d480942016-10-19 14:18:05 -07001038 if (chand->method_params_table != NULL) {
1039 method_params_table =
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001040 grpc_slice_hash_table_ref(chand->method_params_table);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001041 }
Mark D. Roth196387a2016-10-12 14:53:36 -07001042 // If the method config table was present, use it.
Mark D. Roth9d480942016-10-19 14:18:05 -07001043 if (method_params_table != NULL) {
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001044 const method_parameters *method_params = grpc_method_config_table_get(
1045 exec_ctx, method_params_table, calld->path);
Mark D. Roth9d480942016-10-19 14:18:05 -07001046 if (method_params != NULL) {
1047 const bool have_method_timeout =
1048 gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
1049 if (have_method_timeout ||
1050 method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
Mark D. Roth9d480942016-10-19 14:18:05 -07001051 if (have_method_timeout) {
1052 const gpr_timespec per_method_deadline =
1053 gpr_time_add(calld->call_start_time, method_params->timeout);
Mark D. Roth196387a2016-10-12 14:53:36 -07001054 if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
1055 calld->deadline = per_method_deadline;
1056 // Reset deadline timer.
1057 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
1058 }
1059 }
Mark D. Roth9d480942016-10-19 14:18:05 -07001060 if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
Mark D. Roth196387a2016-10-12 14:53:36 -07001061 calld->wait_for_ready_from_service_config =
Mark D. Roth9d480942016-10-19 14:18:05 -07001062 method_params->wait_for_ready;
Mark D. Roth196387a2016-10-12 14:53:36 -07001063 }
Mark D. Roth196387a2016-10-12 14:53:36 -07001064 }
1065 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001066 grpc_slice_hash_table_unref(exec_ctx, method_params_table);
Mark D. Roth196387a2016-10-12 14:53:36 -07001067 }
Mark D. Rothe40dd292016-10-05 14:58:37 -07001068 }
Mark D. Roth31292f22016-10-12 13:14:07 -07001069 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
Mark D. Rothe40dd292016-10-05 14:58:37 -07001070}
1071
Craig Tillerbefafe62017-02-09 11:30:54 -08001072static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
1073 void *arg,
1074 grpc_error *error_ignored) {
1075 grpc_call_element *elem = arg;
1076 channel_data *chand = elem->channel_data;
1077 call_data *calld = elem->call_data;
1078 // If the resolver has already returned results, then we can access
1079 // the service config parameters immediately. Otherwise, we need to
1080 // defer that work until the resolver returns an initial result.
1081 // TODO(roth): This code is almost but not quite identical to the code
1082 // in read_service_config() above. It would be nice to find a way to
1083 // combine them, to avoid having to maintain it twice.
1084 if (chand->lb_policy != NULL) {
1085 // We already have a resolver result, so check for service config.
1086 if (chand->method_params_table != NULL) {
1087 grpc_slice_hash_table *method_params_table =
1088 grpc_slice_hash_table_ref(chand->method_params_table);
1089 method_parameters *method_params = grpc_method_config_table_get(
1090 exec_ctx, method_params_table, calld->path);
1091 if (method_params != NULL) {
1092 if (gpr_time_cmp(method_params->timeout,
1093 gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
1094 gpr_timespec per_method_deadline =
1095 gpr_time_add(calld->call_start_time, method_params->timeout);
1096 calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
1097 }
1098 if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
1099 calld->wait_for_ready_from_service_config =
1100 method_params->wait_for_ready;
1101 }
1102 }
1103 grpc_slice_hash_table_unref(exec_ctx, method_params_table);
1104 }
1105 } else {
1106 // We don't yet have a resolver result, so register a callback to
1107 // get the service config data once the resolver returns.
1108 // Take a reference to the call stack to be owned by the callback.
1109 GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
1110 grpc_closure_init(&calld->read_service_config, read_service_config_locked,
1111 elem, grpc_combiner_scheduler(chand->combiner, false));
1112 grpc_closure_list_append(&chand->waiting_for_config_closures,
1113 &calld->read_service_config, GRPC_ERROR_NONE);
1114 }
1115 // Start the deadline timer with the current deadline value. If we
1116 // do not yet have service config data, then the timer may be reset
1117 // later.
1118 grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001119 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
1120 "initial_read_service_config");
Craig Tillerbefafe62017-02-09 11:30:54 -08001121}
1122
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001123/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001124static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1125 grpc_call_element *elem,
1126 grpc_call_element_args *args) {
Mark D. Rothaa850a72016-09-26 13:38:02 -07001127 channel_data *chand = elem->channel_data;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001128 call_data *calld = elem->call_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001129 // Initialize data members.
1130 grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001131 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001132 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001133 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
1134 calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
Mark D. Rothf28763c2016-09-14 15:18:40 -07001135 calld->cancel_error = GRPC_ERROR_NONE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001136 gpr_atm_rel_store(&calld->subchannel_call, 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001137 calld->connected_subchannel = NULL;
1138 calld->waiting_ops = NULL;
1139 calld->waiting_ops_count = 0;
1140 calld->waiting_ops_capacity = 0;
1141 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
1142 calld->owning_call = args->call_stack;
1143 calld->pollent = NULL;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001144 GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
Craig Tillerbefafe62017-02-09 11:30:54 -08001145 grpc_closure_sched(
1146 exec_ctx,
1147 grpc_closure_init(&calld->read_service_config,
1148 initial_read_service_config_locked, elem,
1149 grpc_combiner_scheduler(chand->combiner, false)),
1150 GRPC_ERROR_NONE);
Mark D. Roth0badbe82016-06-23 10:15:12 -07001151 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001152}
1153
1154/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001155static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1156 grpc_call_element *elem,
1157 const grpc_call_final_info *final_info,
1158 void *and_free_memory) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001159 call_data *calld = elem->call_data;
Mark D. Rothf28763c2016-09-14 15:18:40 -07001160 grpc_deadline_state_destroy(exec_ctx, elem);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001161 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Rothf28763c2016-09-14 15:18:40 -07001162 GRPC_ERROR_UNREF(calld->cancel_error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001163 grpc_subchannel_call *call = GET_CALL(calld);
1164 if (call != NULL && call != CANCELLED_CALL) {
1165 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
1166 }
1167 GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001168 GPR_ASSERT(calld->waiting_ops_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001169 if (calld->connected_subchannel != NULL) {
1170 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1171 "picked");
1172 }
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001173 gpr_free(calld->waiting_ops);
Craig Tiller2c8063c2016-03-22 22:12:15 -07001174 gpr_free(and_free_memory);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001175}
1176
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001177static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1178 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001179 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001180 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001181 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001182}
1183
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001184/*************************************************************************
1185 * EXPORTED SYMBOLS
1186 */
1187
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001188const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillerf40df232016-03-25 13:38:14 -07001189 cc_start_transport_stream_op,
1190 cc_start_transport_op,
1191 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001192 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001193 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001194 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001195 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001196 cc_init_channel_elem,
1197 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001198 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001199 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001200 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001201};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001202
Craig Tiller613dafa2017-02-09 12:00:43 -08001203static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1204 grpc_error *error_ignored) {
1205 channel_data *chand = arg;
1206 if (chand->lb_policy != NULL) {
1207 grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
1208 } else {
1209 chand->exit_idle_when_lb_policy_arrives = true;
1210 if (!chand->started_resolving && chand->resolver != NULL) {
1211 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1212 chand->started_resolving = true;
1213 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
1214 &chand->on_resolver_result_changed);
1215 }
1216 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001217 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001218}
1219
Craig Tillera82950e2015-09-22 12:33:20 -07001220grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1221 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001222 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001223 grpc_connectivity_state out =
1224 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001225 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001226 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001227 grpc_closure_sched(
1228 exec_ctx,
1229 grpc_closure_create(try_to_connect_locked, chand,
1230 grpc_combiner_scheduler(chand->combiner, false)),
1231 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001232 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001233 return out;
1234}
1235
Craig Tiller86c99582015-11-25 15:22:26 -08001236typedef struct {
1237 channel_data *chand;
1238 grpc_pollset *pollset;
1239 grpc_closure *on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001240 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001241 grpc_closure my_closure;
1242} external_connectivity_watcher;
1243
Craig Tiller1d881fb2015-12-01 07:39:04 -08001244static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001245 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001246 external_connectivity_watcher *w = arg;
1247 grpc_closure *follow_up = w->on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -08001248 grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
Craig Tiller1d881fb2015-12-01 07:39:04 -08001249 w->pollset);
1250 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1251 "external_connectivity_watcher");
Craig Tiller86c99582015-11-25 15:22:26 -08001252 gpr_free(w);
Craig Tiller613dafa2017-02-09 12:00:43 -08001253 grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
1254}
1255
Craig Tillera8610c02017-02-14 10:05:11 -08001256static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1257 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001258 external_connectivity_watcher *w = arg;
1259 grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
1260 grpc_schedule_on_exec_ctx);
1261 grpc_connectivity_state_notify_on_state_change(
1262 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
Craig Tiller86c99582015-11-25 15:22:26 -08001263}
1264
Craig Tillera82950e2015-09-22 12:33:20 -07001265void grpc_client_channel_watch_connectivity_state(
Craig Tiller906e3bc2015-11-24 07:31:31 -08001266 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
Craig Tillera82950e2015-09-22 12:33:20 -07001267 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001268 channel_data *chand = elem->channel_data;
Craig Tiller86c99582015-11-25 15:22:26 -08001269 external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
1270 w->chand = chand;
1271 w->pollset = pollset;
1272 w->on_complete = on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001273 w->state = state;
Craig Tiller69b093b2016-02-25 19:04:07 -08001274 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001275 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1276 "external_connectivity_watcher");
Craig Tiller613dafa2017-02-09 12:00:43 -08001277 grpc_closure_sched(
1278 exec_ctx,
Craig Tillera8610c02017-02-14 10:05:11 -08001279 grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tiller613dafa2017-02-09 12:00:43 -08001280 grpc_combiner_scheduler(chand->combiner, true)),
1281 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001282}