blob: 96c8c62c044b6705ae116a78c25ec5a438b38464 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd4c98332016-03-31 13:45:47 -070034#include "src/core/ext/client_config/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080042#include <grpc/support/sync.h>
43#include <grpc/support/useful.h>
44
Mark D. Roth0e48a9a2016-09-08 14:14:39 -070045#include "src/core/ext/client_config/lb_policy_registry.h"
Mark D. Roth046cf762016-09-26 11:13:51 -070046#include "src/core/ext/client_config/method_config.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070047#include "src/core/ext/client_config/subchannel.h"
Craig Tiller9533d042016-03-25 17:11:06 -070048#include "src/core/lib/channel/channel_args.h"
49#include "src/core/lib/channel/connected_channel.h"
Craig Tiller9533d042016-03-25 17:11:06 -070050#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070051#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070052#include "src/core/lib/profiling/timers.h"
53#include "src/core/lib/support/string.h"
54#include "src/core/lib/surface/channel.h"
55#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070056#include "src/core/lib/transport/metadata.h"
57#include "src/core/lib/transport/metadata_batch.h"
58#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070059
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080060/* Client channel implementation */
61
Mark D. Roth2a5959f2016-09-01 08:20:27 -070062/*************************************************************************
63 * CHANNEL-WIDE FUNCTIONS
64 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080065
Craig Tiller800dacb2015-10-06 09:10:26 -070066typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -070067 /** resolver for this channel */
68 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -070069 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -070070 bool started_resolving;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -070071 /** client channel factory */
72 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -070073
Mark D. Roth046cf762016-09-26 11:13:51 -070074 /** mutex protecting all variables below in this data structure */
Mark D. Rothff4df062016-08-22 15:02:49 -070075 gpr_mu mu;
Mark D. Roth046cf762016-09-26 11:13:51 -070076 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -070077 grpc_lb_policy *lb_policy;
Mark D. Roth046cf762016-09-26 11:13:51 -070078 /** method config table */
79 grpc_method_config_table *method_config_table;
80 /** incoming resolver result - set by resolver.next() */
81 grpc_resolver_result *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -070082 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -070083 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -070084 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -070085 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -070086 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070087 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -070088 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -070089 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -080090 /** owning stack */
91 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -080092 /** interested parties (owned) */
93 grpc_pollset_set *interested_parties;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080094} channel_data;
95
Craig Tillerd6c98df2015-08-18 09:33:44 -070096/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -070097 resolver, to watch for state changes from the lb_policy. When a state
98 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -070099typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700100 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700101 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700102 grpc_connectivity_state state;
103 grpc_lb_policy *lb_policy;
104} lb_policy_connectivity_watcher;
105
Craig Tillera82950e2015-09-22 12:33:20 -0700106static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
107 grpc_lb_policy *lb_policy,
108 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700109
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800110static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
111 channel_data *chand,
112 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700113 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800114 const char *reason) {
115 if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
Craig Tiller48ed92e2016-06-02 11:07:12 -0700116 state == GRPC_CHANNEL_SHUTDOWN) &&
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800117 chand->lb_policy != NULL) {
118 /* cancel fail-fast picks */
119 grpc_lb_policy_cancel_picks(
120 exec_ctx, chand->lb_policy,
121 /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
122 /* check= */ 0);
123 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700124 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
125 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800126}
127
Craig Tiller804ff712016-05-05 16:25:40 -0700128static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
129 lb_policy_connectivity_watcher *w,
130 grpc_error *error) {
Craig Tillercb2609f2015-11-24 17:19:19 -0800131 grpc_connectivity_state publish_state = w->state;
Craig Tiller5795da72015-09-17 15:27:13 -0700132 /* check if the notification is for a stale policy */
Craig Tillera82950e2015-09-22 12:33:20 -0700133 if (w->lb_policy != w->chand->lb_policy) return;
Craig Tiller5795da72015-09-17 15:27:13 -0700134
Craig Tiller48ed92e2016-06-02 11:07:12 -0700135 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
Craig Tillercb2609f2015-11-24 17:19:19 -0800136 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
137 grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
Craig Tillerf62c4d52015-12-04 07:43:07 -0800138 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
139 w->chand->lb_policy = NULL;
Craig Tillercb2609f2015-11-24 17:19:19 -0800140 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800141 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
Craig Tillerfc353d62016-05-10 12:58:03 -0700142 GRPC_ERROR_REF(error), "lb_changed");
Craig Tiller48ed92e2016-06-02 11:07:12 -0700143 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tillera82950e2015-09-22 12:33:20 -0700144 watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
145 }
Craig Tiller5795da72015-09-17 15:27:13 -0700146}
147
Craig Tillera82950e2015-09-22 12:33:20 -0700148static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -0700149 grpc_error *error) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700150 lb_policy_connectivity_watcher *w = arg;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700151
Mark D. Rothff4df062016-08-22 15:02:49 -0700152 gpr_mu_lock(&w->chand->mu);
Craig Tiller804ff712016-05-05 16:25:40 -0700153 on_lb_policy_state_changed_locked(exec_ctx, w, error);
Mark D. Rothff4df062016-08-22 15:02:49 -0700154 gpr_mu_unlock(&w->chand->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700155
Craig Tiller906e3bc2015-11-24 07:31:31 -0800156 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700157 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700158}
159
Craig Tillera82950e2015-09-22 12:33:20 -0700160static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
161 grpc_lb_policy *lb_policy,
162 grpc_connectivity_state current_state) {
163 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800164 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700165
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700166 w->chand = chand;
Craig Tillera82950e2015-09-22 12:33:20 -0700167 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700168 w->state = current_state;
169 w->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700170 grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
171 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700172}
173
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700174static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
175 grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700176 channel_data *chand = arg;
177 grpc_lb_policy *lb_policy = NULL;
178 grpc_lb_policy *old_lb_policy;
Mark D. Roth046cf762016-09-26 11:13:51 -0700179 grpc_method_config_table *method_config_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700180 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700181 bool exit_idle = false;
Craig Tiller804ff712016-05-05 16:25:40 -0700182 grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
Craig Tiller3f475422015-06-25 10:43:05 -0700183
Mark D. Roth046cf762016-09-26 11:13:51 -0700184 if (chand->resolver_result != NULL) {
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700185 grpc_lb_policy_args lb_policy_args;
Mark D. Roth933c9552016-09-21 08:12:11 -0700186 lb_policy_args.server_name =
Mark D. Roth046cf762016-09-26 11:13:51 -0700187 grpc_resolver_result_get_server_name(chand->resolver_result);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700188 lb_policy_args.addresses =
Mark D. Roth046cf762016-09-26 11:13:51 -0700189 grpc_resolver_result_get_addresses(chand->resolver_result);
190 lb_policy_args.additional_args =
191 grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700192 lb_policy_args.client_channel_factory = chand->client_channel_factory;
193 lb_policy = grpc_lb_policy_create(
194 exec_ctx,
Mark D. Roth046cf762016-09-26 11:13:51 -0700195 grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700196 &lb_policy_args);
Craig Tillera82950e2015-09-22 12:33:20 -0700197 if (lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700198 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tillerf707d622016-05-06 14:26:12 -0700199 GRPC_ERROR_UNREF(state_error);
Craig Tiller804ff712016-05-05 16:25:40 -0700200 state =
201 grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
Craig Tiller45724b32015-09-22 10:42:19 -0700202 }
Mark D. Roth046cf762016-09-26 11:13:51 -0700203 const grpc_arg *channel_arg = grpc_channel_args_find(
204 lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG);
205 if (channel_arg != NULL) {
206 GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
207 method_config_table = grpc_method_config_table_ref(
208 (grpc_method_config_table *)channel_arg->value.pointer.p);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700209 }
Mark D. Roth046cf762016-09-26 11:13:51 -0700210 grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
211 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700212 }
213
Craig Tiller86c99582015-11-25 15:22:26 -0800214 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800215 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
216 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800217 }
218
Mark D. Rothff4df062016-08-22 15:02:49 -0700219 gpr_mu_lock(&chand->mu);
Craig Tiller3f475422015-06-25 10:43:05 -0700220 old_lb_policy = chand->lb_policy;
221 chand->lb_policy = lb_policy;
Mark D. Roth046cf762016-09-26 11:13:51 -0700222 if (chand->method_config_table != NULL) {
223 grpc_method_config_table_unref(chand->method_config_table);
224 }
225 chand->method_config_table = method_config_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700226 if (lb_policy != NULL) {
227 grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
228 NULL);
229 } else if (chand->resolver == NULL /* disconnected */) {
Craig Tiller804ff712016-05-05 16:25:40 -0700230 grpc_closure_list_fail_all(
231 &chand->waiting_for_config_closures,
232 GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
Craig Tiller6c396862016-01-28 13:53:40 -0800233 grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
234 NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700235 }
236 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
237 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700238 exit_idle = true;
239 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700240 }
Craig Tiller98465032015-06-29 14:36:42 -0700241
Craig Tiller804ff712016-05-05 16:25:40 -0700242 if (error == GRPC_ERROR_NONE && chand->resolver) {
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700243 set_channel_connectivity_state_locked(
244 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Craig Tillera82950e2015-09-22 12:33:20 -0700245 if (lb_policy != NULL) {
246 watch_lb_policy(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700247 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800248 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth046cf762016-09-26 11:13:51 -0700249 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700250 &chand->on_resolver_result_changed);
251 gpr_mu_unlock(&chand->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700252 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800253 if (chand->resolver != NULL) {
254 grpc_resolver_shutdown(exec_ctx, chand->resolver);
255 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
256 chand->resolver = NULL;
257 }
Craig Tiller804ff712016-05-05 16:25:40 -0700258 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800259 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700260 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller804ff712016-05-05 16:25:40 -0700261 GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
262 GPR_ARRAY_SIZE(refs)),
263 "resolver_gone");
Mark D. Rothff4df062016-08-22 15:02:49 -0700264 gpr_mu_unlock(&chand->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700265 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700266
Craig Tillera82950e2015-09-22 12:33:20 -0700267 if (exit_idle) {
268 grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
269 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
270 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700271
Craig Tillera82950e2015-09-22 12:33:20 -0700272 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800273 grpc_pollset_set_del_pollset_set(
274 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700275 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
276 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700277
Craig Tillera82950e2015-09-22 12:33:20 -0700278 if (lb_policy != NULL) {
279 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
280 }
Craig Tiller45724b32015-09-22 10:42:19 -0700281
Craig Tiller906e3bc2015-11-24 07:31:31 -0800282 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700283 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700284}
285
Craig Tillera82950e2015-09-22 12:33:20 -0700286static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
287 grpc_channel_element *elem,
288 grpc_transport_op *op) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700289 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700290
Craig Tiller332f1b32016-05-24 13:21:21 -0700291 grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700292
Craig Tillerd7f12e32016-03-03 10:08:31 -0800293 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller28bf8912015-12-07 16:07:04 -0800294 if (op->bind_pollset != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800295 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
Craig Tillere2c62372015-12-07 16:11:03 -0800296 op->bind_pollset);
Craig Tiller28bf8912015-12-07 16:07:04 -0800297 }
Craig Tillerca3e9d32015-06-27 18:37:27 -0700298
Mark D. Rothff4df062016-08-22 15:02:49 -0700299 gpr_mu_lock(&chand->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700300 if (op->on_connectivity_state_change != NULL) {
301 grpc_connectivity_state_notify_on_state_change(
302 exec_ctx, &chand->state_tracker, op->connectivity_state,
303 op->on_connectivity_state_change);
304 op->on_connectivity_state_change = NULL;
305 op->connectivity_state = NULL;
306 }
307
Craig Tiller26dab312015-12-07 14:43:47 -0800308 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800309 if (chand->lb_policy == NULL) {
Craig Tiller332f1b32016-05-24 13:21:21 -0700310 grpc_exec_ctx_sched(exec_ctx, op->send_ping,
311 GRPC_ERROR_CREATE("Ping with no load balancing"),
312 NULL);
Craig Tiller26dab312015-12-07 14:43:47 -0800313 } else {
Craig Tiller28bf8912015-12-07 16:07:04 -0800314 grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800315 op->bind_pollset = NULL;
316 }
317 op->send_ping = NULL;
318 }
319
Craig Tiller1c51edc2016-05-07 16:18:43 -0700320 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
321 if (chand->resolver != NULL) {
322 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700323 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700324 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
325 grpc_resolver_shutdown(exec_ctx, chand->resolver);
326 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
327 chand->resolver = NULL;
328 if (!chand->started_resolving) {
329 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
330 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700331 grpc_exec_ctx_enqueue_list(exec_ctx,
332 &chand->waiting_for_config_closures, NULL);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700333 }
334 if (chand->lb_policy != NULL) {
335 grpc_pollset_set_del_pollset_set(exec_ctx,
336 chand->lb_policy->interested_parties,
337 chand->interested_parties);
338 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
339 chand->lb_policy = NULL;
340 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700341 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700342 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700343 }
Mark D. Rothff4df062016-08-22 15:02:49 -0700344 gpr_mu_unlock(&chand->mu);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700345}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800346
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700347/* Constructor for channel_data */
348static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
349 grpc_channel_element *elem,
350 grpc_channel_element_args *args) {
351 channel_data *chand = elem->channel_data;
352
353 memset(chand, 0, sizeof(*chand));
354
355 GPR_ASSERT(args->is_last);
356 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
357
358 gpr_mu_init(&chand->mu);
359 grpc_closure_init(&chand->on_resolver_result_changed,
360 on_resolver_result_changed, chand);
361 chand->owning_stack = args->channel_stack;
362
363 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
364 "client_channel");
365 chand->interested_parties = grpc_pollset_set_create();
366}
367
368/* Destructor for channel_data */
369static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
370 grpc_channel_element *elem) {
371 channel_data *chand = elem->channel_data;
372
373 if (chand->resolver != NULL) {
374 grpc_resolver_shutdown(exec_ctx, chand->resolver);
375 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
376 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700377 if (chand->client_channel_factory != NULL) {
378 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
379 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700380 if (chand->lb_policy != NULL) {
381 grpc_pollset_set_del_pollset_set(exec_ctx,
382 chand->lb_policy->interested_parties,
383 chand->interested_parties);
384 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
385 }
Mark D. Roth046cf762016-09-26 11:13:51 -0700386 if (chand->method_config_table != NULL) {
387 grpc_method_config_table_unref(chand->method_config_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700388 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700389 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
390 grpc_pollset_set_destroy(chand->interested_parties);
391 gpr_mu_destroy(&chand->mu);
392}
393
394/*************************************************************************
395 * PER-CALL FUNCTIONS
396 */
397
398#define GET_CALL(call_data) \
399 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
400
401#define CANCELLED_CALL ((grpc_subchannel_call *)1)
402
403typedef enum {
404 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
405 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
406} subchannel_creation_phase;
407
408/** Call data. Holds a pointer to grpc_subchannel_call and the
409 associated machinery to create such a pointer.
410 Handles queueing of stream ops until a call object is ready, waiting
411 for initial metadata before trying to create a call object,
412 and handling cancellation gracefully. */
413typedef struct client_channel_call_data {
414 /** either 0 for no call, 1 for cancelled, or a pointer to a
415 grpc_subchannel_call */
416 gpr_atm subchannel_call;
417
418 gpr_mu mu;
419
420 subchannel_creation_phase creation_phase;
421 grpc_connected_subchannel *connected_subchannel;
422 grpc_polling_entity *pollent;
423
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700424 grpc_mdstr *path;
425 grpc_method_config *method_config;
426
Craig Tiller57726ca2016-09-12 11:59:45 -0700427 grpc_transport_stream_op **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700428 size_t waiting_ops_count;
429 size_t waiting_ops_capacity;
430
431 grpc_closure next_step;
432
433 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200434
435 grpc_linked_mdelem lb_token_mdelem;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700436} call_data;
437
438static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
439 GPR_TIMER_BEGIN("add_waiting_locked", 0);
440 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
441 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
442 calld->waiting_ops =
443 gpr_realloc(calld->waiting_ops,
444 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
445 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700446 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700447 GPR_TIMER_END("add_waiting_locked", 0);
448}
449
450static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
451 grpc_error *error) {
452 size_t i;
453 for (i = 0; i < calld->waiting_ops_count; i++) {
454 grpc_transport_stream_op_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700455 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700456 }
457 calld->waiting_ops_count = 0;
458 GRPC_ERROR_UNREF(error);
459}
460
461typedef struct {
Craig Tiller57726ca2016-09-12 11:59:45 -0700462 grpc_transport_stream_op **ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700463 size_t nops;
464 grpc_subchannel_call *call;
465} retry_ops_args;
466
467static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
468 retry_ops_args *a = args;
469 size_t i;
470 for (i = 0; i < a->nops; i++) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700471 grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700472 }
473 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
474 gpr_free(a->ops);
475 gpr_free(a);
476}
477
478static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700479 if (calld->waiting_ops_count == 0) {
480 return;
481 }
482
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700483 retry_ops_args *a = gpr_malloc(sizeof(*a));
484 a->ops = calld->waiting_ops;
485 a->nops = calld->waiting_ops_count;
486 a->call = GET_CALL(calld);
487 if (a->call == CANCELLED_CALL) {
488 gpr_free(a);
489 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
490 return;
491 }
492 calld->waiting_ops = NULL;
493 calld->waiting_ops_count = 0;
494 calld->waiting_ops_capacity = 0;
495 GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
496 grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
497 GRPC_ERROR_NONE, NULL);
498}
499
500static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
501 grpc_error *error) {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700502 grpc_call_element *elem = arg;
503 call_data *calld = elem->call_data;
504 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700505 gpr_mu_lock(&calld->mu);
506 GPR_ASSERT(calld->creation_phase ==
507 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
508 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
509 if (calld->connected_subchannel == NULL) {
510 gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
511 fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
512 "Failed to create subchannel", &error, 1));
513 } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
514 /* already cancelled before subchannel became ready */
515 fail_locked(exec_ctx, calld,
516 GRPC_ERROR_CREATE_REFERENCING(
517 "Cancelled before creating subchannel", &error, 1));
518 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700519 /* Get method config. */
520// FIXME: need to actually use the config data!
Mark D. Roth4c3a4682016-09-23 15:19:26 -0700521// FIXME: think about refcounting vs. atomicity here
Mark D. Roth046cf762016-09-26 11:13:51 -0700522 if (chand->method_config_table != NULL) {
Mark D. Roth4c3a4682016-09-23 15:19:26 -0700523 calld->method_config = grpc_method_config_table_get_method_config(
Mark D. Roth046cf762016-09-26 11:13:51 -0700524 chand->method_config_table, calld->path);
Mark D. Roth4c3a4682016-09-23 15:19:26 -0700525 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700526 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700527 grpc_subchannel_call *subchannel_call = NULL;
528 grpc_error *new_error = grpc_connected_subchannel_create_call(
529 exec_ctx, calld->connected_subchannel, calld->pollent,
530 &subchannel_call);
531 if (new_error != GRPC_ERROR_NONE) {
532 new_error = grpc_error_add_child(new_error, error);
533 subchannel_call = CANCELLED_CALL;
534 fail_locked(exec_ctx, calld, new_error);
535 }
536 gpr_atm_rel_store(&calld->subchannel_call,
537 (gpr_atm)(uintptr_t)subchannel_call);
538 retry_waiting_locked(exec_ctx, calld);
539 }
540 gpr_mu_unlock(&calld->mu);
541 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
542}
543
544static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
545 call_data *calld = elem->call_data;
546 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
547 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
548 return NULL;
549 } else {
550 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
551 }
552}
553
Craig Tiller577c9b22015-11-02 14:11:15 -0800554typedef struct {
555 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800556 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -0800557 grpc_connected_subchannel **connected_subchannel;
Craig Tiller577c9b22015-11-02 14:11:15 -0800558 grpc_closure *on_ready;
559 grpc_call_element *elem;
560 grpc_closure closure;
561} continue_picking_args;
562
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700563static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
564 grpc_metadata_batch *initial_metadata,
565 uint32_t initial_metadata_flags,
566 grpc_connected_subchannel **connected_subchannel,
567 grpc_closure *on_ready);
Craig Tiller577c9b22015-11-02 14:11:15 -0800568
Craig Tiller804ff712016-05-05 16:25:40 -0700569static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
570 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800571 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -0700572 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800573 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700574 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller332f1b32016-05-24 13:21:21 -0700575 grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700576 } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
577 cpa->initial_metadata_flags,
578 cpa->connected_subchannel, cpa->on_ready)) {
Craig Tiller332f1b32016-05-24 13:21:21 -0700579 grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800580 }
581 gpr_free(cpa);
582}
583
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700584static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
585 grpc_metadata_batch *initial_metadata,
586 uint32_t initial_metadata_flags,
587 grpc_connected_subchannel **connected_subchannel,
588 grpc_closure *on_ready) {
589 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700590
Craig Tiller577c9b22015-11-02 14:11:15 -0800591 channel_data *chand = elem->channel_data;
592 call_data *calld = elem->call_data;
593 continue_picking_args *cpa;
594 grpc_closure *closure;
595
Craig Tillerb5585d42015-11-17 07:18:31 -0800596 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -0800597
Mark D. Rothff4df062016-08-22 15:02:49 -0700598 gpr_mu_lock(&chand->mu);
Craig Tiller577c9b22015-11-02 14:11:15 -0800599 if (initial_metadata == NULL) {
600 if (chand->lb_policy != NULL) {
Craig Tillerab33b482015-11-21 08:11:04 -0800601 grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
602 connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -0800603 }
604 for (closure = chand->waiting_for_config_closures.head; closure != NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700605 closure = closure->next_data.next) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800606 cpa = closure->cb_arg;
Craig Tillerb5585d42015-11-17 07:18:31 -0800607 if (cpa->connected_subchannel == connected_subchannel) {
608 cpa->connected_subchannel = NULL;
Craig Tiller332f1b32016-05-24 13:21:21 -0700609 grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
610 GRPC_ERROR_CREATE("Pick cancelled"), NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800611 }
612 }
Mark D. Rothff4df062016-08-22 15:02:49 -0700613 gpr_mu_unlock(&chand->mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700614 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700615 return true;
Craig Tiller577c9b22015-11-02 14:11:15 -0800616 }
617 if (chand->lb_policy != NULL) {
Craig Tiller86c0f8a2015-12-01 20:05:40 -0800618 grpc_lb_policy *lb_policy = chand->lb_policy;
619 int r;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700620 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothff4df062016-08-22 15:02:49 -0700621 gpr_mu_unlock(&chand->mu);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700622 const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700623 initial_metadata_flags,
624 &calld->lb_token_mdelem};
David Garcia Quintas8aace512016-08-15 14:55:12 -0700625 r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700626 NULL, on_ready);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700627 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
628 GPR_TIMER_END("pick_subchannel", 0);
Craig Tiller577c9b22015-11-02 14:11:15 -0800629 return r;
630 }
631 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700632 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800633 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth046cf762016-09-26 11:13:51 -0700634 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700635 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -0800636 }
Craig Tiller0eab6972016-04-23 12:59:57 -0700637 if (chand->resolver != NULL) {
638 cpa = gpr_malloc(sizeof(*cpa));
639 cpa->initial_metadata = initial_metadata;
640 cpa->initial_metadata_flags = initial_metadata_flags;
641 cpa->connected_subchannel = connected_subchannel;
642 cpa->on_ready = on_ready;
643 cpa->elem = elem;
644 grpc_closure_init(&cpa->closure, continue_picking, cpa);
Craig Tiller804ff712016-05-05 16:25:40 -0700645 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
646 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -0700647 } else {
Craig Tiller332f1b32016-05-24 13:21:21 -0700648 grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
649 NULL);
Craig Tiller0eab6972016-04-23 12:59:57 -0700650 }
Mark D. Rothff4df062016-08-22 15:02:49 -0700651 gpr_mu_unlock(&chand->mu);
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700652
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700653 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700654 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -0800655}
656
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700657// The logic here is fairly complicated, due to (a) the fact that we
658// need to handle the case where we receive the send op before the
659// initial metadata op, and (b) the need for efficiency, especially in
660// the streaming case.
661// TODO(ctiller): Explain this more thoroughly.
662static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
663 grpc_call_element *elem,
664 grpc_transport_stream_op *op) {
665 call_data *calld = elem->call_data;
666 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
667 /* try to (atomically) get the call */
668 grpc_subchannel_call *call = GET_CALL(calld);
669 GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
670 if (call == CANCELLED_CALL) {
671 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
672 GRPC_ERROR_CANCELLED);
673 GPR_TIMER_END("cc_start_transport_stream_op", 0);
674 return;
675 }
676 if (call != NULL) {
677 grpc_subchannel_call_process_op(exec_ctx, call, op);
678 GPR_TIMER_END("cc_start_transport_stream_op", 0);
679 return;
680 }
681 /* we failed; lock and figure out what to do */
682 gpr_mu_lock(&calld->mu);
683retry:
684 /* need to recheck that another thread hasn't set the call */
685 call = GET_CALL(calld);
686 if (call == CANCELLED_CALL) {
687 gpr_mu_unlock(&calld->mu);
688 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
689 GRPC_ERROR_CANCELLED);
690 GPR_TIMER_END("cc_start_transport_stream_op", 0);
691 return;
692 }
693 if (call != NULL) {
694 gpr_mu_unlock(&calld->mu);
695 grpc_subchannel_call_process_op(exec_ctx, call, op);
696 GPR_TIMER_END("cc_start_transport_stream_op", 0);
697 return;
698 }
699 /* if this is a cancellation, then we can raise our cancelled flag */
700 if (op->cancel_error != GRPC_ERROR_NONE) {
701 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
702 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
703 goto retry;
704 } else {
705 switch (calld->creation_phase) {
706 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
707 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
708 break;
709 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
Mark D. Rothd4c0f552016-09-01 09:25:32 -0700710 pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
711 NULL);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700712 break;
713 }
714 gpr_mu_unlock(&calld->mu);
715 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
716 GRPC_ERROR_CANCELLED);
717 GPR_TIMER_END("cc_start_transport_stream_op", 0);
718 return;
719 }
720 }
721 /* if we don't have a subchannel, try to get one */
722 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
723 calld->connected_subchannel == NULL &&
724 op->send_initial_metadata != NULL) {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700725 for (grpc_linked_mdelem *mdelem = op->send_initial_metadata->list.head;
726 mdelem != NULL; mdelem = mdelem->next) {
727 if (mdelem->md->key == GRPC_MDSTR_PATH) {
728 calld->path = GRPC_MDSTR_REF(mdelem->md->value);
729 break;
730 }
731 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700732 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700733 grpc_closure_init(&calld->next_step, subchannel_ready, elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Mark D. Rothd4c0f552016-09-01 09:25:32 -0700735 if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
736 op->send_initial_metadata_flags,
737 &calld->connected_subchannel, &calld->next_step)) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700738 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
739 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
740 }
741 }
742 /* if we've got a subchannel, then let's ask it to create a call */
743 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
744 calld->connected_subchannel != NULL) {
745 grpc_subchannel_call *subchannel_call = NULL;
746 grpc_error *error = grpc_connected_subchannel_create_call(
747 exec_ctx, calld->connected_subchannel, calld->pollent,
748 &subchannel_call);
749 if (error != GRPC_ERROR_NONE) {
750 subchannel_call = CANCELLED_CALL;
751 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
752 grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
753 }
754 gpr_atm_rel_store(&calld->subchannel_call,
755 (gpr_atm)(uintptr_t)subchannel_call);
756 retry_waiting_locked(exec_ctx, calld);
757 goto retry;
758 }
759 /* nothing to be done but wait */
760 add_waiting_locked(calld, op);
761 gpr_mu_unlock(&calld->mu);
762 GPR_TIMER_END("cc_start_transport_stream_op", 0);
763}
764
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800765/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700766static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
767 grpc_call_element *elem,
768 grpc_call_element_args *args) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700769 call_data *calld = elem->call_data;
770 gpr_atm_rel_store(&calld->subchannel_call, 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700771 gpr_mu_init(&calld->mu);
772 calld->connected_subchannel = NULL;
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700773 calld->path = NULL;
774 calld->method_config = NULL;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700775 calld->waiting_ops = NULL;
776 calld->waiting_ops_count = 0;
777 calld->waiting_ops_capacity = 0;
778 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
779 calld->owning_call = args->call_stack;
780 calld->pollent = NULL;
Mark D. Roth0badbe82016-06-23 10:15:12 -0700781 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800782}
783
784/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700785static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
786 grpc_call_element *elem,
787 const grpc_call_final_info *final_info,
788 void *and_free_memory) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700789 call_data *calld = elem->call_data;
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700790 if (calld->path != NULL) GRPC_MDSTR_UNREF(calld->path);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700791 grpc_subchannel_call *call = GET_CALL(calld);
792 if (call != NULL && call != CANCELLED_CALL) {
793 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
794 }
795 GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
796 gpr_mu_destroy(&calld->mu);
797 GPR_ASSERT(calld->waiting_ops_count == 0);
798 gpr_free(calld->waiting_ops);
Craig Tiller2c8063c2016-03-22 22:12:15 -0700799 gpr_free(and_free_memory);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800800}
801
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700802static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
803 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700804 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800805 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700806 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -0800807}
808
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700809/*************************************************************************
810 * EXPORTED SYMBOLS
811 */
812
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800813const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillerf40df232016-03-25 13:38:14 -0700814 cc_start_transport_stream_op,
815 cc_start_transport_op,
816 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700817 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -0700818 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700819 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -0700820 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700821 cc_init_channel_elem,
822 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -0700823 cc_get_peer,
824 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700825};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800826
Mark D. Roth60534972016-09-20 08:37:12 -0700827void grpc_client_channel_finish_initialization(
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700828 grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
829 grpc_resolver *resolver,
830 grpc_client_channel_factory *client_channel_factory) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800831 /* post construction initialization: set the transport setup pointer */
Mark D. Roth7f7d1652016-09-20 10:46:15 -0700832 GPR_ASSERT(client_channel_factory != NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700833 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800834 channel_data *chand = elem->channel_data;
Mark D. Rothff4df062016-08-22 15:02:49 -0700835 gpr_mu_lock(&chand->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700836 GPR_ASSERT(!chand->resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700837 chand->resolver = resolver;
Craig Tillera82950e2015-09-22 12:33:20 -0700838 GRPC_RESOLVER_REF(resolver, "channel");
839 if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
840 chand->exit_idle_when_lb_policy_arrives) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700841 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800842 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth046cf762016-09-26 11:13:51 -0700843 grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700844 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700845 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700846 chand->client_channel_factory = client_channel_factory;
847 grpc_client_channel_factory_ref(client_channel_factory);
Mark D. Rothff4df062016-08-22 15:02:49 -0700848 gpr_mu_unlock(&chand->mu);
Craig Tiller190d3602015-02-18 09:23:38 -0800849}
Craig Tiller48cb07c2015-07-15 16:16:15 -0700850
Craig Tillera82950e2015-09-22 12:33:20 -0700851grpc_connectivity_state grpc_client_channel_check_connectivity_state(
852 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700853 channel_data *chand = elem->channel_data;
854 grpc_connectivity_state out;
Mark D. Rothff4df062016-08-22 15:02:49 -0700855 gpr_mu_lock(&chand->mu);
Craig Tiller804ff712016-05-05 16:25:40 -0700856 out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700857 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
858 if (chand->lb_policy != NULL) {
859 grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
860 } else {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700861 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tillera82950e2015-09-22 12:33:20 -0700862 if (!chand->started_resolving && chand->resolver != NULL) {
Craig Tiller906e3bc2015-11-24 07:31:31 -0800863 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700864 chand->started_resolving = true;
Mark D. Roth046cf762016-09-26 11:13:51 -0700865 grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
Mark D. Rothff4df062016-08-22 15:02:49 -0700866 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700867 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700868 }
Craig Tillera82950e2015-09-22 12:33:20 -0700869 }
Mark D. Rothff4df062016-08-22 15:02:49 -0700870 gpr_mu_unlock(&chand->mu);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700871 return out;
872}
873
Craig Tiller86c99582015-11-25 15:22:26 -0800874typedef struct {
875 channel_data *chand;
876 grpc_pollset *pollset;
877 grpc_closure *on_complete;
878 grpc_closure my_closure;
879} external_connectivity_watcher;
880
Craig Tiller1d881fb2015-12-01 07:39:04 -0800881static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -0700882 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -0800883 external_connectivity_watcher *w = arg;
884 grpc_closure *follow_up = w->on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -0800885 grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
Craig Tiller1d881fb2015-12-01 07:39:04 -0800886 w->pollset);
887 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
888 "external_connectivity_watcher");
Craig Tiller86c99582015-11-25 15:22:26 -0800889 gpr_free(w);
Craig Tiller804ff712016-05-05 16:25:40 -0700890 follow_up->cb(exec_ctx, follow_up->cb_arg, error);
Craig Tiller86c99582015-11-25 15:22:26 -0800891}
892
Craig Tillera82950e2015-09-22 12:33:20 -0700893void grpc_client_channel_watch_connectivity_state(
Craig Tiller906e3bc2015-11-24 07:31:31 -0800894 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
Craig Tillera82950e2015-09-22 12:33:20 -0700895 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700896 channel_data *chand = elem->channel_data;
Craig Tiller86c99582015-11-25 15:22:26 -0800897 external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
898 w->chand = chand;
899 w->pollset = pollset;
900 w->on_complete = on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -0800901 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
Craig Tiller86c99582015-11-25 15:22:26 -0800902 grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
Craig Tiller1d881fb2015-12-01 07:39:04 -0800903 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
904 "external_connectivity_watcher");
Mark D. Rothff4df062016-08-22 15:02:49 -0700905 gpr_mu_lock(&chand->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700906 grpc_connectivity_state_notify_on_state_change(
Craig Tiller86c99582015-11-25 15:22:26 -0800907 exec_ctx, &chand->state_tracker, state, &w->my_closure);
Mark D. Rothff4df062016-08-22 15:02:49 -0700908 gpr_mu_unlock(&chand->mu);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700909}