blob: 21ba4301ff02993d46a297390e9a4f4839c80151 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Mark D. Roth2137cd82016-09-14 09:04:00 -070034#include "src/core/ext/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080035
Mark D. Roth4c0fe492016-08-31 13:51:55 -070036#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070038#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070042#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/sync.h>
44#include <grpc/support/useful.h>
45
Mark D. Rothd58a9852017-01-18 08:28:57 -080046#include "src/core/ext/client_channel/http_connect_handshaker.h"
Mark D. Roth15195742016-10-07 09:02:28 -070047#include "src/core/ext/client_channel/lb_policy_registry.h"
Mark D. Rothdc9bee72017-02-07 12:29:14 -080048#include "src/core/ext/client_channel/proxy_mapper_registry.h"
Mark D. Roth21d4b2d2016-11-18 09:53:41 -080049#include "src/core/ext/client_channel/resolver_registry.h"
Mark D. Roth2137cd82016-09-14 09:04:00 -070050#include "src/core/ext/client_channel/subchannel.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/channel/channel_args.h"
52#include "src/core/lib/channel/connected_channel.h"
Mark D. Roth72f6da82016-09-02 13:42:38 -070053#include "src/core/lib/channel/deadline_filter.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080054#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070055#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070056#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070057#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080058#include "src/core/lib/slice/slice_internal.h"
Craig Tiller9533d042016-03-25 17:11:06 -070059#include "src/core/lib/support/string.h"
60#include "src/core/lib/surface/channel.h"
61#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/metadata.h"
63#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070064#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070065#include "src/core/lib/transport/static_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070066
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080067/* Client channel implementation */
68
Mark D. Roth26b7be42016-10-24 10:08:07 -070069/*************************************************************************
70 * METHOD-CONFIG TABLE
71 */
72
Mark D. Roth9d480942016-10-19 14:18:05 -070073typedef enum {
74 WAIT_FOR_READY_UNSET,
75 WAIT_FOR_READY_FALSE,
76 WAIT_FOR_READY_TRUE
77} wait_for_ready_value;
78
79typedef struct method_parameters {
80 gpr_timespec timeout;
81 wait_for_ready_value wait_for_ready;
82} method_parameters;
83
84static void *method_parameters_copy(void *value) {
85 void *new_value = gpr_malloc(sizeof(method_parameters));
86 memcpy(new_value, value, sizeof(method_parameters));
87 return new_value;
88}
89
Craig Tillerb28c7e82016-11-18 10:29:04 -080090static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
Craig Tiller87a7e1f2016-11-09 09:42:19 -080091 gpr_free(p);
92}
93
Craig Tiller7c70b6c2017-01-23 07:48:42 -080094static const grpc_slice_hash_table_vtable method_parameters_vtable = {
Craig Tillerb28c7e82016-11-18 10:29:04 -080095 method_parameters_free, method_parameters_copy};
Mark D. Roth9d480942016-10-19 14:18:05 -070096
Mark D. Rothe30baeb2016-11-03 08:16:19 -070097static void *method_parameters_create_from_json(const grpc_json *json) {
Mark D. Rothc968e602016-11-02 14:07:36 -070098 wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
Mark D. Roth47f10842016-11-03 08:45:27 -070099 gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
100 for (grpc_json *field = json->child; field != NULL; field = field->next) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700101 if (field->key == NULL) continue;
Mark D. Roth84c8a022016-11-10 09:39:34 -0800102 if (strcmp(field->key, "waitForReady") == 0) {
Mark D. Rothc968e602016-11-02 14:07:36 -0700103 if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
104 if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
105 return NULL;
106 }
Mark D. Roth47f10842016-11-03 08:45:27 -0700107 wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
108 : WAIT_FOR_READY_FALSE;
Mark D. Rothc968e602016-11-02 14:07:36 -0700109 } else if (strcmp(field->key, "timeout") == 0) {
110 if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
Mark D. Roth84c8a022016-11-10 09:39:34 -0800111 if (field->type != GRPC_JSON_STRING) return NULL;
112 size_t len = strlen(field->value);
113 if (field->value[len - 1] != 's') return NULL;
Mark D. Rothc19049c2016-11-10 09:43:06 -0800114 char *buf = gpr_strdup(field->value);
Mark D. Roth84c8a022016-11-10 09:39:34 -0800115 buf[len - 1] = '\0'; // Remove trailing 's'.
Mark D. Rothc19049c2016-11-10 09:43:06 -0800116 char *decimal_point = strchr(buf, '.');
Mark D. Roth84c8a022016-11-10 09:39:34 -0800117 if (decimal_point != NULL) {
118 *decimal_point = '\0';
119 timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
120 if (timeout.tv_nsec == -1) {
121 gpr_free(buf);
Mark D. Rothc968e602016-11-02 14:07:36 -0700122 return NULL;
123 }
Mark D. Roth84c8a022016-11-10 09:39:34 -0800124 // There should always be exactly 3, 6, or 9 fractional digits.
125 int multiplier = 1;
126 switch (strlen(decimal_point + 1)) {
127 case 9:
128 break;
129 case 6:
130 multiplier *= 1000;
131 break;
132 case 3:
133 multiplier *= 1000000;
134 break;
135 default: // Unsupported number of digits.
136 gpr_free(buf);
137 return NULL;
138 }
139 timeout.tv_nsec *= multiplier;
Mark D. Rothc968e602016-11-02 14:07:36 -0700140 }
Mark D. Roth84c8a022016-11-10 09:39:34 -0800141 timeout.tv_sec = gpr_parse_nonnegative_int(buf);
142 if (timeout.tv_sec == -1) return NULL;
143 gpr_free(buf);
Mark D. Rothc968e602016-11-02 14:07:36 -0700144 }
145 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700146 method_parameters *value = gpr_malloc(sizeof(method_parameters));
Mark D. Rothc968e602016-11-02 14:07:36 -0700147 value->timeout = timeout;
148 value->wait_for_ready = wait_for_ready;
Mark D. Roth9d480942016-10-19 14:18:05 -0700149 return value;
150}
151
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700152/*************************************************************************
153 * CHANNEL-WIDE FUNCTIONS
154 */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800155
Craig Tiller800dacb2015-10-06 09:10:26 -0700156typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -0700157 /** resolver for this channel */
158 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -0700159 /** have we started resolving this channel */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700160 bool started_resolving;
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700161 /** client channel factory */
162 grpc_client_channel_factory *client_channel_factory;
Craig Tillerf5f17122015-06-25 08:47:26 -0700163
Craig Tillerbefafe62017-02-09 11:30:54 -0800164 /** combiner protecting all variables below in this data structure */
165 grpc_combiner *combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700166 /** currently active load balancer */
Craig Tillerf5f17122015-06-25 08:47:26 -0700167 grpc_lb_policy *lb_policy;
Mark D. Roth9d480942016-10-19 14:18:05 -0700168 /** maps method names to method_parameters structs */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800169 grpc_slice_hash_table *method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700170 /** incoming resolver result - set by resolver.next() */
Mark D. Rothaf842452016-10-21 15:05:15 -0700171 grpc_channel_args *resolver_result;
Craig Tiller3f475422015-06-25 10:43:05 -0700172 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -0700173 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700174 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700175 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700176 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700177 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700178 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700179 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800180 /** owning stack */
181 grpc_channel_stack *owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800182 /** interested parties (owned) */
183 grpc_pollset_set *interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800184
185 /* the following properties are guarded by a mutex since API's require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800186 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800187 gpr_mu info_mu;
188 char *info_lb_policy_name;
189 /** service config in JSON form */
190 char *info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191} channel_data;
192
Craig Tillerd6c98df2015-08-18 09:33:44 -0700193/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700194 resolver, to watch for state changes from the lb_policy. When a state
195 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700196typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700197 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -0700198 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700199 grpc_connectivity_state state;
200 grpc_lb_policy *lb_policy;
201} lb_policy_connectivity_watcher;
202
Craig Tiller2400bf52017-02-09 16:25:19 -0800203static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
204 grpc_lb_policy *lb_policy,
205 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700206
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800207static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
208 channel_data *chand,
209 grpc_connectivity_state state,
Craig Tiller804ff712016-05-05 16:25:40 -0700210 grpc_error *error,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800211 const char *reason) {
212 if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
Craig Tiller48ed92e2016-06-02 11:07:12 -0700213 state == GRPC_CHANNEL_SHUTDOWN) &&
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800214 chand->lb_policy != NULL) {
Mark D. Roth59c9f902016-09-28 13:33:21 -0700215 /* cancel picks with wait_for_ready=false */
Craig Tiller2400bf52017-02-09 16:25:19 -0800216 grpc_lb_policy_cancel_picks_locked(
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800217 exec_ctx, chand->lb_policy,
Mark D. Roth59c9f902016-09-28 13:33:21 -0700218 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
Mark D. Roth58f52b72016-09-09 13:55:18 -0700219 /* check= */ 0, GRPC_ERROR_REF(error));
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800220 }
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700221 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
222 reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800223}
224
Craig Tiller804ff712016-05-05 16:25:40 -0700225static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
Craig Tillerbefafe62017-02-09 11:30:54 -0800226 void *arg, grpc_error *error) {
227 lb_policy_connectivity_watcher *w = arg;
Craig Tillercb2609f2015-11-24 17:19:19 -0800228 grpc_connectivity_state publish_state = w->state;
Craig Tillerc5de8352017-02-09 14:08:05 -0800229 /* check if the notification is for the latest policy */
230 if (w->lb_policy == w->chand->lb_policy) {
231 if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
232 publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller972470b2017-02-09 15:05:36 -0800233 grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
Craig Tillerc5de8352017-02-09 14:08:05 -0800234 GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
235 w->chand->lb_policy = NULL;
236 }
237 set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
238 GRPC_ERROR_REF(error), "lb_changed");
239 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800240 watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800241 }
Craig Tillera82950e2015-09-22 12:33:20 -0700242 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700243
Craig Tiller906e3bc2015-11-24 07:31:31 -0800244 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700245 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700246}
247
Craig Tiller2400bf52017-02-09 16:25:19 -0800248static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
249 grpc_lb_policy *lb_policy,
250 grpc_connectivity_state current_state) {
Craig Tillera82950e2015-09-22 12:33:20 -0700251 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800252 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700253
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700254 w->chand = chand;
Craig Tillerbefafe62017-02-09 11:30:54 -0800255 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
256 grpc_combiner_scheduler(chand->combiner, false));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700257 w->state = current_state;
258 w->lb_policy = lb_policy;
Craig Tiller2400bf52017-02-09 16:25:19 -0800259 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
260 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700261}
262
Craig Tillerbefafe62017-02-09 11:30:54 -0800263static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
264 void *arg, grpc_error *error) {
Craig Tiller3f475422015-06-25 10:43:05 -0700265 channel_data *chand = arg;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700266 char *lb_policy_name = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700267 grpc_lb_policy *lb_policy = NULL;
268 grpc_lb_policy *old_lb_policy;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800269 grpc_slice_hash_table *method_params_table = NULL;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700270 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700271 bool exit_idle = false;
Craig Tiller804ff712016-05-05 16:25:40 -0700272 grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800273 char *service_config_json = NULL;
Craig Tiller3f475422015-06-25 10:43:05 -0700274
Mark D. Roth046cf762016-09-26 11:13:51 -0700275 if (chand->resolver_result != NULL) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700276 // Find LB policy name.
Mark D. Rothaf842452016-10-21 15:05:15 -0700277 const grpc_arg *channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700278 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Mark D. Rothaf842452016-10-21 15:05:15 -0700279 if (channel_arg != NULL) {
280 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
281 lb_policy_name = channel_arg->value.string;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700282 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700283 // Special case: If all of the addresses are balancer addresses,
284 // assume that we should use the grpclb policy, regardless of what the
285 // resolver actually specified.
Mark D. Rothaf842452016-10-21 15:05:15 -0700286 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700287 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
Mark D. Rothaf842452016-10-21 15:05:15 -0700288 if (channel_arg != NULL) {
289 GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
Mark D. Roth557c9902016-10-24 11:12:05 -0700290 grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
Mark D. Rothaf842452016-10-21 15:05:15 -0700291 bool found_backend_address = false;
292 for (size_t i = 0; i < addresses->num_addresses; ++i) {
293 if (!addresses->addresses[i].is_balancer) {
294 found_backend_address = true;
295 break;
296 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700297 }
Mark D. Rothaf842452016-10-21 15:05:15 -0700298 if (!found_backend_address) {
299 if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
300 gpr_log(GPR_INFO,
301 "resolver requested LB policy %s but provided only balancer "
302 "addresses, no backend addresses -- forcing use of grpclb LB "
303 "policy",
Mark D. Roth5f40e5d2016-10-24 13:09:05 -0700304 lb_policy_name);
Mark D. Rothaf842452016-10-21 15:05:15 -0700305 }
306 lb_policy_name = "grpclb";
Mark D. Roth88405f72016-10-03 08:24:52 -0700307 }
Mark D. Roth88405f72016-10-03 08:24:52 -0700308 }
309 // Use pick_first if nothing was specified and we didn't select grpclb
310 // above.
311 if (lb_policy_name == NULL) lb_policy_name = "pick_first";
Mark D. Roth41124992016-11-03 11:22:20 -0700312 // Instantiate LB policy.
313 grpc_lb_policy_args lb_policy_args;
314 lb_policy_args.args = chand->resolver_result;
315 lb_policy_args.client_channel_factory = chand->client_channel_factory;
Craig Tiller2400bf52017-02-09 16:25:19 -0800316 lb_policy_args.combiner = chand->combiner;
Mark D. Roth88405f72016-10-03 08:24:52 -0700317 lb_policy =
318 grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
Craig Tillera82950e2015-09-22 12:33:20 -0700319 if (lb_policy != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700320 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tillerf707d622016-05-06 14:26:12 -0700321 GRPC_ERROR_UNREF(state_error);
Craig Tiller2400bf52017-02-09 16:25:19 -0800322 state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
323 &state_error);
Craig Tiller45724b32015-09-22 10:42:19 -0700324 }
Mark D. Roth41124992016-11-03 11:22:20 -0700325 // Find service config.
Mark D. Rothaf842452016-10-21 15:05:15 -0700326 channel_arg =
Mark D. Roth41124992016-11-03 11:22:20 -0700327 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
Mark D. Roth046cf762016-09-26 11:13:51 -0700328 if (channel_arg != NULL) {
Mark D. Roth9ec28af2016-11-03 12:32:39 -0700329 GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800330 service_config_json = gpr_strdup(channel_arg->value.string);
Mark D. Roth70a1abd2016-11-04 09:26:37 -0700331 grpc_service_config *service_config =
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800332 grpc_service_config_create(service_config_json);
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700333 if (service_config != NULL) {
334 method_params_table = grpc_service_config_create_method_config_table(
Craig Tillerb28c7e82016-11-18 10:29:04 -0800335 exec_ctx, service_config, method_parameters_create_from_json,
Mark D. Rothbdc58b22016-11-04 09:25:57 -0700336 &method_parameters_vtable);
337 grpc_service_config_destroy(service_config);
338 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700339 }
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700340 // Before we clean up, save a copy of lb_policy_name, since it might
341 // be pointing to data inside chand->resolver_result.
342 // The copy will be saved in chand->lb_policy_name below.
343 lb_policy_name = gpr_strdup(lb_policy_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800344 grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
Mark D. Roth046cf762016-09-26 11:13:51 -0700345 chand->resolver_result = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700346 }
347
Craig Tiller86c99582015-11-25 15:22:26 -0800348 if (lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800349 grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
350 chand->interested_parties);
Craig Tiller86c99582015-11-25 15:22:26 -0800351 }
352
Craig Tiller613dafa2017-02-09 12:00:43 -0800353 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700354 if (lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800355 gpr_free(chand->info_lb_policy_name);
356 chand->info_lb_policy_name = lb_policy_name;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700357 }
Craig Tiller3f475422015-06-25 10:43:05 -0700358 old_lb_policy = chand->lb_policy;
359 chand->lb_policy = lb_policy;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800360 if (service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800361 gpr_free(chand->info_service_config_json);
362 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800363 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800364 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth9d480942016-10-19 14:18:05 -0700365 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800366 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth046cf762016-09-26 11:13:51 -0700367 }
Mark D. Roth9d480942016-10-19 14:18:05 -0700368 chand->method_params_table = method_params_table;
Craig Tiller0ede5452016-04-23 12:21:45 -0700369 if (lb_policy != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800370 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller0ede5452016-04-23 12:21:45 -0700371 } else if (chand->resolver == NULL /* disconnected */) {
Craig Tiller804ff712016-05-05 16:25:40 -0700372 grpc_closure_list_fail_all(
373 &chand->waiting_for_config_closures,
374 GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
Craig Tiller91031da2016-12-28 15:44:25 -0800375 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tillera82950e2015-09-22 12:33:20 -0700376 }
377 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
378 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700379 exit_idle = true;
380 chand->exit_idle_when_lb_policy_arrives = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700381 }
Craig Tiller98465032015-06-29 14:36:42 -0700382
Craig Tiller804ff712016-05-05 16:25:40 -0700383 if (error == GRPC_ERROR_NONE && chand->resolver) {
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700384 set_channel_connectivity_state_locked(
385 exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Craig Tillera82950e2015-09-22 12:33:20 -0700386 if (lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800387 watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700388 }
Craig Tiller906e3bc2015-11-24 07:31:31 -0800389 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -0800390 grpc_resolver_next_locked(exec_ctx, chand->resolver,
391 &chand->resolver_result,
392 &chand->on_resolver_result_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700393 } else {
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800394 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800395 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800396 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
397 chand->resolver = NULL;
398 }
Craig Tiller804ff712016-05-05 16:25:40 -0700399 grpc_error *refs[] = {error, state_error};
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800400 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700401 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller804ff712016-05-05 16:25:40 -0700402 GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
403 GPR_ARRAY_SIZE(refs)),
404 "resolver_gone");
Craig Tillera82950e2015-09-22 12:33:20 -0700405 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700406
Craig Tillera82950e2015-09-22 12:33:20 -0700407 if (exit_idle) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800408 grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
Craig Tillera82950e2015-09-22 12:33:20 -0700409 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
410 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700411
Craig Tillera82950e2015-09-22 12:33:20 -0700412 if (old_lb_policy != NULL) {
Craig Tiller69b093b2016-02-25 19:04:07 -0800413 grpc_pollset_set_del_pollset_set(
414 exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
Craig Tillera82950e2015-09-22 12:33:20 -0700415 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
416 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700417
Craig Tillera82950e2015-09-22 12:33:20 -0700418 if (lb_policy != NULL) {
419 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
420 }
Craig Tiller45724b32015-09-22 10:42:19 -0700421
Craig Tiller906e3bc2015-11-24 07:31:31 -0800422 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
Craig Tiller9ccf5f12016-05-07 21:41:01 -0700423 GRPC_ERROR_UNREF(state_error);
Craig Tiller3f475422015-06-25 10:43:05 -0700424}
425
Craig Tillera8610c02017-02-14 10:05:11 -0800426static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
427 grpc_error *error_ignored) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800428 grpc_transport_op *op = arg;
429 grpc_channel_element *elem = op->transport_private.args[0];
Craig Tillerca3e9d32015-06-27 18:37:27 -0700430 channel_data *chand = elem->channel_data;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700431
Craig Tillera82950e2015-09-22 12:33:20 -0700432 if (op->on_connectivity_state_change != NULL) {
433 grpc_connectivity_state_notify_on_state_change(
434 exec_ctx, &chand->state_tracker, op->connectivity_state,
435 op->on_connectivity_state_change);
436 op->on_connectivity_state_change = NULL;
437 op->connectivity_state = NULL;
438 }
439
Craig Tiller26dab312015-12-07 14:43:47 -0800440 if (op->send_ping != NULL) {
Craig Tiller87b71e22015-12-07 15:14:14 -0800441 if (chand->lb_policy == NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800442 grpc_closure_sched(exec_ctx, op->send_ping,
443 GRPC_ERROR_CREATE("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800444 } else {
Craig Tiller2400bf52017-02-09 16:25:19 -0800445 grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
Craig Tiller26dab312015-12-07 14:43:47 -0800446 op->bind_pollset = NULL;
447 }
448 op->send_ping = NULL;
449 }
450
Craig Tiller1c51edc2016-05-07 16:18:43 -0700451 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
452 if (chand->resolver != NULL) {
453 set_channel_connectivity_state_locked(
Craig Tillerd925c932016-06-06 08:38:50 -0700454 exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700455 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Craig Tiller972470b2017-02-09 15:05:36 -0800456 grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700457 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
458 chand->resolver = NULL;
459 if (!chand->started_resolving) {
460 grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
461 GRPC_ERROR_REF(op->disconnect_with_error));
Craig Tiller91031da2016-12-28 15:44:25 -0800462 grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700463 }
464 if (chand->lb_policy != NULL) {
465 grpc_pollset_set_del_pollset_set(exec_ctx,
466 chand->lb_policy->interested_parties,
467 chand->interested_parties);
468 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
469 chand->lb_policy = NULL;
470 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700471 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700472 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700473 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800474 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
475
476 grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800477}
478
479static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
480 grpc_channel_element *elem,
481 grpc_transport_op *op) {
482 channel_data *chand = elem->channel_data;
483
Craig Tillerbefafe62017-02-09 11:30:54 -0800484 GPR_ASSERT(op->set_accept_stream == false);
485 if (op->bind_pollset != NULL) {
486 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
487 op->bind_pollset);
488 }
489
490 op->transport_private.args[0] = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800491 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
Craig Tillerbefafe62017-02-09 11:30:54 -0800492 grpc_closure_sched(
Craig Tillera8610c02017-02-14 10:05:11 -0800493 exec_ctx, grpc_closure_init(
494 &op->transport_private.closure, start_transport_op_locked,
495 op, grpc_combiner_scheduler(chand->combiner, false)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800496 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700497}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800498
Mark D. Rothb2d24882016-10-27 15:44:07 -0700499static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
500 grpc_channel_element *elem,
Mark D. Rothf79ce7d2016-11-04 08:43:36 -0700501 const grpc_channel_info *info) {
Mark D. Rothb2d24882016-10-27 15:44:07 -0700502 channel_data *chand = elem->channel_data;
Craig Tiller613dafa2017-02-09 12:00:43 -0800503 gpr_mu_lock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700504 if (info->lb_policy_name != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800505 *info->lb_policy_name = chand->info_lb_policy_name == NULL
Mark D. Roth78afd772016-11-04 12:49:49 -0700506 ? NULL
Craig Tiller613dafa2017-02-09 12:00:43 -0800507 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700508 }
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800509 if (info->service_config_json != NULL) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800510 *info->service_config_json =
511 chand->info_service_config_json == NULL
512 ? NULL
513 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800514 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800515 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700516}
517
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700518/* Constructor for channel_data */
Mark D. Rothc1087882016-11-18 10:54:45 -0800519static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800520 grpc_channel_element *elem,
521 grpc_channel_element_args *args) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700522 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700523 memset(chand, 0, sizeof(*chand));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700524 GPR_ASSERT(args->is_last);
525 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800526 // Initialize data members.
Craig Tillerbefafe62017-02-09 11:30:54 -0800527 chand->combiner = grpc_combiner_create(NULL);
Craig Tillerd85477512017-02-09 12:02:39 -0800528 gpr_mu_init(&chand->info_mu);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800529 chand->owning_stack = args->channel_stack;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700530 grpc_closure_init(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800531 on_resolver_result_changed_locked, chand,
532 grpc_combiner_scheduler(chand->combiner, false));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800533 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700534 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
535 "client_channel");
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800536 // Record client channel factory.
537 const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
538 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
539 GPR_ASSERT(arg != NULL);
540 GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
541 grpc_client_channel_factory_ref(arg->value.pointer.p);
542 chand->client_channel_factory = arg->value.pointer.p;
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800543 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800544 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800545 GPR_ASSERT(arg != NULL);
546 GPR_ASSERT(arg->type == GRPC_ARG_STRING);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800547 char *proxy_name = NULL;
548 grpc_channel_args *new_args = NULL;
549 grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
550 &proxy_name, &new_args);
551 // Instantiate resolver.
Mark D. Roth45ccec52017-01-18 14:04:01 -0800552 chand->resolver = grpc_resolver_create(
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800553 exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
554 new_args != NULL ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800555 chand->interested_parties, chand->combiner);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800556 if (proxy_name != NULL) gpr_free(proxy_name);
557 if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800558 if (chand->resolver == NULL) {
559 return GRPC_ERROR_CREATE("resolver creation failed");
560 }
561 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700562}
563
Craig Tiller972470b2017-02-09 15:05:36 -0800564static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
565 grpc_error *error) {
566 grpc_resolver *resolver = arg;
567 grpc_resolver_shutdown_locked(exec_ctx, resolver);
568 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
569}
570
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700571/* Destructor for channel_data */
572static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
573 grpc_channel_element *elem) {
574 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700575 if (chand->resolver != NULL) {
Craig Tiller972470b2017-02-09 15:05:36 -0800576 grpc_closure_sched(
577 exec_ctx,
578 grpc_closure_create(shutdown_resolver_locked, chand->resolver,
579 grpc_combiner_scheduler(chand->combiner, false)),
580 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700581 }
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700582 if (chand->client_channel_factory != NULL) {
583 grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
584 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700585 if (chand->lb_policy != NULL) {
586 grpc_pollset_set_del_pollset_set(exec_ctx,
587 chand->lb_policy->interested_parties,
588 chand->interested_parties);
589 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
590 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800591 gpr_free(chand->info_lb_policy_name);
592 gpr_free(chand->info_service_config_json);
Mark D. Roth9d480942016-10-19 14:18:05 -0700593 if (chand->method_params_table != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800594 grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700595 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700596 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800597 grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
Craig Tiller648d6482017-02-09 15:32:39 -0800598 GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800599 gpr_mu_destroy(&chand->info_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700600}
601
602/*************************************************************************
603 * PER-CALL FUNCTIONS
604 */
605
606#define GET_CALL(call_data) \
607 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
608
609#define CANCELLED_CALL ((grpc_subchannel_call *)1)
610
611typedef enum {
612 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
613 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
614} subchannel_creation_phase;
615
616/** Call data. Holds a pointer to grpc_subchannel_call and the
617 associated machinery to create such a pointer.
618 Handles queueing of stream ops until a call object is ready, waiting
619 for initial metadata before trying to create a call object,
620 and handling cancellation gracefully. */
621typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700622 // State for handling deadlines.
623 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700624 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
625 // and this struct both independently store a pointer to the call
626 // stack and each has its own mutex. If/when we have time, find a way
Mark D. Roth6ad99172016-09-09 07:52:48 -0700627 // to avoid this without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700628 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700629
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800630 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700631 gpr_timespec call_start_time;
632 gpr_timespec deadline;
Mark D. Roth9d480942016-10-19 14:18:05 -0700633 wait_for_ready_value wait_for_ready_from_service_config;
Mark D. Rothe40dd292016-10-05 14:58:37 -0700634 grpc_closure read_service_config;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700635
Mark D. Rothf28763c2016-09-14 15:18:40 -0700636 grpc_error *cancel_error;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700637
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700638 /** either 0 for no call, 1 for cancelled, or a pointer to a
639 grpc_subchannel_call */
640 gpr_atm subchannel_call;
641
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700642 subchannel_creation_phase creation_phase;
643 grpc_connected_subchannel *connected_subchannel;
644 grpc_polling_entity *pollent;
645
Craig Tiller57726ca2016-09-12 11:59:45 -0700646 grpc_transport_stream_op **waiting_ops;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700647 size_t waiting_ops_count;
648 size_t waiting_ops_capacity;
649
650 grpc_closure next_step;
651
652 grpc_call_stack *owning_call;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200653
654 grpc_linked_mdelem lb_token_mdelem;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700655} call_data;
656
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800657grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
658 grpc_call_element *call_elem) {
659 grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
660 return scc == CANCELLED_CALL ? NULL : scc;
661}
662
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700663static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
664 GPR_TIMER_BEGIN("add_waiting_locked", 0);
665 if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
666 calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
667 calld->waiting_ops =
668 gpr_realloc(calld->waiting_ops,
669 calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
670 }
Craig Tiller57726ca2016-09-12 11:59:45 -0700671 calld->waiting_ops[calld->waiting_ops_count++] = op;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700672 GPR_TIMER_END("add_waiting_locked", 0);
673}
674
675static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
676 grpc_error *error) {
677 size_t i;
678 for (i = 0; i < calld->waiting_ops_count; i++) {
679 grpc_transport_stream_op_finish_with_failure(
Craig Tiller57726ca2016-09-12 11:59:45 -0700680 exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700681 }
682 calld->waiting_ops_count = 0;
683 GRPC_ERROR_UNREF(error);
684}
685
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700686static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
Craig Tiller57726ca2016-09-12 11:59:45 -0700687 if (calld->waiting_ops_count == 0) {
688 return;
689 }
690
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800691 grpc_subchannel_call *call = GET_CALL(calld);
692 grpc_transport_stream_op **ops = calld->waiting_ops;
693 size_t nops = calld->waiting_ops_count;
694 if (call == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700695 fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
696 return;
697 }
698 calld->waiting_ops = NULL;
699 calld->waiting_ops_count = 0;
700 calld->waiting_ops_capacity = 0;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800701 for (size_t i = 0; i < nops; i++) {
702 grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
703 }
Craig Tiller9efea882017-02-09 13:06:52 -0800704 gpr_free(ops);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700705}
706
Craig Tillerbefafe62017-02-09 11:30:54 -0800707static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
708 grpc_error *error) {
Yuchen Zeng19656b12016-09-01 18:00:45 -0700709 grpc_call_element *elem = arg;
710 call_data *calld = elem->call_data;
711 channel_data *chand = elem->channel_data;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700712 GPR_ASSERT(calld->creation_phase ==
713 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
Yuchen Zeng19656b12016-09-01 18:00:45 -0700714 grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
715 chand->interested_parties);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700716 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
717 if (calld->connected_subchannel == NULL) {
718 gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
719 fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
720 "Failed to create subchannel", &error, 1));
Mark D. Roth72f6da82016-09-02 13:42:38 -0700721 } else if (GET_CALL(calld) == CANCELLED_CALL) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700722 /* already cancelled before subchannel became ready */
David Garcia Quintas68a9e382016-12-13 10:50:40 -0800723 grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING(
724 "Cancelled before creating subchannel", &error, 1);
725 /* if due to deadline, attach the deadline exceeded status to the error */
726 if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
727 cancellation_error =
728 grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
729 GRPC_STATUS_DEADLINE_EXCEEDED);
730 }
731 fail_locked(exec_ctx, calld, cancellation_error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700732 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700733 /* Create call on subchannel. */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734 grpc_subchannel_call *subchannel_call = NULL;
735 grpc_error *new_error = grpc_connected_subchannel_create_call(
Mark D. Rothaa850a72016-09-26 13:38:02 -0700736 exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
Mark D. Roth3d883412016-11-07 13:42:54 -0800737 calld->call_start_time, calld->deadline, &subchannel_call);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700738 if (new_error != GRPC_ERROR_NONE) {
739 new_error = grpc_error_add_child(new_error, error);
740 subchannel_call = CANCELLED_CALL;
741 fail_locked(exec_ctx, calld, new_error);
742 }
743 gpr_atm_rel_store(&calld->subchannel_call,
744 (gpr_atm)(uintptr_t)subchannel_call);
745 retry_waiting_locked(exec_ctx, calld);
746 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700747 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
748}
749
750static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
751 call_data *calld = elem->call_data;
752 grpc_subchannel_call *subchannel_call = GET_CALL(calld);
753 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
754 return NULL;
755 } else {
756 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
757 }
758}
759
Craig Tiller577c9b22015-11-02 14:11:15 -0800760typedef struct {
761 grpc_metadata_batch *initial_metadata;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800762 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -0800763 grpc_connected_subchannel **connected_subchannel;
Craig Tiller577c9b22015-11-02 14:11:15 -0800764 grpc_closure *on_ready;
765 grpc_call_element *elem;
766 grpc_closure closure;
767} continue_picking_args;
768
Yuchen Zeng144ce652016-09-01 18:19:34 -0700769/** Return true if subchannel is available immediately (in which case on_ready
770 should not be called), or false otherwise (in which case on_ready should be
771 called when the subchannel is available). */
Craig Tillerbefafe62017-02-09 11:30:54 -0800772static bool pick_subchannel_locked(
773 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
774 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
775 grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
776 grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800777
Craig Tillerbefafe62017-02-09 11:30:54 -0800778static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
779 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800780 continue_picking_args *cpa = arg;
Craig Tiller0ede5452016-04-23 12:21:45 -0700781 if (cpa->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800782 /* cancelled, do nothing */
Craig Tiller804ff712016-05-05 16:25:40 -0700783 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller91031da2016-12-28 15:44:25 -0800784 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700785 } else {
Craig Tillerbefafe62017-02-09 11:30:54 -0800786 if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
787 cpa->initial_metadata_flags,
788 cpa->connected_subchannel, cpa->on_ready,
789 GRPC_ERROR_NONE)) {
Craig Tiller91031da2016-12-28 15:44:25 -0800790 grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700791 }
Craig Tiller577c9b22015-11-02 14:11:15 -0800792 }
793 gpr_free(cpa);
794}
795
Craig Tillerbefafe62017-02-09 11:30:54 -0800796static bool pick_subchannel_locked(
797 grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
798 grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
799 grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
800 grpc_error *error) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700801 GPR_TIMER_BEGIN("pick_subchannel", 0);
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700802
Craig Tiller577c9b22015-11-02 14:11:15 -0800803 channel_data *chand = elem->channel_data;
804 call_data *calld = elem->call_data;
805 continue_picking_args *cpa;
806 grpc_closure *closure;
807
Craig Tillerb5585d42015-11-17 07:18:31 -0800808 GPR_ASSERT(connected_subchannel);
Craig Tiller577c9b22015-11-02 14:11:15 -0800809
Craig Tiller577c9b22015-11-02 14:11:15 -0800810 if (initial_metadata == NULL) {
811 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -0800812 grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
813 connected_subchannel,
814 GRPC_ERROR_REF(error));
Craig Tiller577c9b22015-11-02 14:11:15 -0800815 }
816 for (closure = chand->waiting_for_config_closures.head; closure != NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700817 closure = closure->next_data.next) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800818 cpa = closure->cb_arg;
Craig Tillerb5585d42015-11-17 07:18:31 -0800819 if (cpa->connected_subchannel == connected_subchannel) {
820 cpa->connected_subchannel = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800821 grpc_closure_sched(
Mark D. Roth932b10c2016-09-09 08:44:30 -0700822 exec_ctx, cpa->on_ready,
Craig Tiller91031da2016-12-28 15:44:25 -0800823 GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -0800824 }
825 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700826 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth697a1f62016-09-07 13:35:07 -0700827 GRPC_ERROR_UNREF(error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700828 return true;
Craig Tiller577c9b22015-11-02 14:11:15 -0800829 }
Mark D. Roth697a1f62016-09-07 13:35:07 -0700830 GPR_ASSERT(error == GRPC_ERROR_NONE);
Craig Tiller577c9b22015-11-02 14:11:15 -0800831 if (chand->lb_policy != NULL) {
Craig Tiller86c0f8a2015-12-01 20:05:40 -0800832 grpc_lb_policy *lb_policy = chand->lb_policy;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700833 GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
Mark D. Rothe40dd292016-10-05 14:58:37 -0700834 // If the application explicitly set wait_for_ready, use that.
835 // Otherwise, if the service config specified a value for this
836 // method, use that.
Mark D. Rothc1c38582016-10-11 11:03:27 -0700837 const bool wait_for_ready_set_from_api =
838 initial_metadata_flags &
839 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
840 const bool wait_for_ready_set_from_service_config =
841 calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
842 if (!wait_for_ready_set_from_api &&
843 wait_for_ready_set_from_service_config) {
Mark D. Rothe40dd292016-10-05 14:58:37 -0700844 if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
845 initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
846 } else {
847 initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
848 }
849 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700850 const grpc_lb_policy_pick_args inputs = {
Yuchen Zengac8bc422016-10-05 14:00:02 -0700851 initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
852 gpr_inf_future(GPR_CLOCK_MONOTONIC)};
Craig Tiller2400bf52017-02-09 16:25:19 -0800853 const bool result = grpc_lb_policy_pick_locked(
Mark D. Roth55f25b62016-10-12 14:55:20 -0700854 exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700855 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
856 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth9dab7d52016-10-07 07:48:03 -0700857 return result;
Craig Tiller577c9b22015-11-02 14:11:15 -0800858 }
859 if (chand->resolver != NULL && !chand->started_resolving) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700860 chand->started_resolving = true;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800861 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Craig Tiller972470b2017-02-09 15:05:36 -0800862 grpc_resolver_next_locked(exec_ctx, chand->resolver,
863 &chand->resolver_result,
864 &chand->on_resolver_result_changed);
Craig Tiller577c9b22015-11-02 14:11:15 -0800865 }
Craig Tiller0eab6972016-04-23 12:59:57 -0700866 if (chand->resolver != NULL) {
867 cpa = gpr_malloc(sizeof(*cpa));
868 cpa->initial_metadata = initial_metadata;
869 cpa->initial_metadata_flags = initial_metadata_flags;
870 cpa->connected_subchannel = connected_subchannel;
871 cpa->on_ready = on_ready;
872 cpa->elem = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -0800873 grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
874 grpc_combiner_scheduler(chand->combiner, true));
Craig Tiller804ff712016-05-05 16:25:40 -0700875 grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
876 GRPC_ERROR_NONE);
Craig Tiller0eab6972016-04-23 12:59:57 -0700877 } else {
Craig Tiller91031da2016-12-28 15:44:25 -0800878 grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"));
Craig Tiller0eab6972016-04-23 12:59:57 -0700879 }
Craig Tillerbfc9adc2016-06-27 13:16:22 -0700880
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700881 GPR_TIMER_END("pick_subchannel", 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700882 return false;
Craig Tiller577c9b22015-11-02 14:11:15 -0800883}
884
Craig Tillera8610c02017-02-14 10:05:11 -0800885static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
886 grpc_transport_stream_op *op,
887 grpc_call_element *elem) {
Craig Tillerbefafe62017-02-09 11:30:54 -0800888 channel_data *chand = elem->channel_data;
Craig Tillera11bfc82017-02-14 09:56:33 -0800889 call_data *calld = elem->call_data;
Craig Tillerbefafe62017-02-09 11:30:54 -0800890 grpc_subchannel_call *call;
891
Craig Tillerbefafe62017-02-09 11:30:54 -0800892 /* need to recheck that another thread hasn't set the call */
893 call = GET_CALL(calld);
894 if (call == CANCELLED_CALL) {
895 grpc_transport_stream_op_finish_with_failure(
896 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -0800897 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700898 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800899 }
900 if (call != NULL) {
901 grpc_subchannel_call_process_op(exec_ctx, call, op);
Craig Tillera11bfc82017-02-14 09:56:33 -0800902 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700903 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800904 }
905 /* if this is a cancellation, then we can raise our cancelled flag */
906 if (op->cancel_error != GRPC_ERROR_NONE) {
907 if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
908 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tillera11bfc82017-02-14 09:56:33 -0800909 /* recurse to retry */
Craig Tillera8610c02017-02-14 10:05:11 -0800910 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800911 /* early out */
912 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800913 } else {
Craig Tillerbe9691a2017-02-14 10:00:42 -0800914 /* Stash a copy of cancel_error in our call data, so that we can use
915 it for subsequent operations. This ensures that if the call is
916 cancelled before any ops are passed down (e.g., if the deadline
917 is in the past when the call starts), we can return the right
918 error to the caller when the first op does get passed down. */
Craig Tillerbefafe62017-02-09 11:30:54 -0800919 calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
920 switch (calld->creation_phase) {
921 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
922 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
923 break;
924 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
925 pick_subchannel_locked(exec_ctx, elem, NULL, 0,
926 &calld->connected_subchannel, NULL,
927 GRPC_ERROR_REF(op->cancel_error));
928 break;
929 }
930 grpc_transport_stream_op_finish_with_failure(
931 exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
Craig Tillera11bfc82017-02-14 09:56:33 -0800932 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700933 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800934 }
935 }
936 /* if we don't have a subchannel, try to get one */
937 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
938 calld->connected_subchannel == NULL &&
939 op->send_initial_metadata != NULL) {
940 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
941 grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
942 grpc_combiner_scheduler(chand->combiner, true));
943 GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
944 /* If a subchannel is not available immediately, the polling entity from
945 call_data should be provided to channel_data's interested_parties, so
946 that IO of the lb_policy and resolver could be done under it. */
947 if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata,
948 op->send_initial_metadata_flags,
949 &calld->connected_subchannel, &calld->next_step,
950 GRPC_ERROR_NONE)) {
951 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
952 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
953 } else {
954 grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
955 chand->interested_parties);
956 }
957 }
958 /* if we've got a subchannel, then let's ask it to create a call */
959 if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
960 calld->connected_subchannel != NULL) {
961 grpc_subchannel_call *subchannel_call = NULL;
962 grpc_error *error = grpc_connected_subchannel_create_call(
963 exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
964 calld->call_start_time, calld->deadline, &subchannel_call);
965 if (error != GRPC_ERROR_NONE) {
966 subchannel_call = CANCELLED_CALL;
967 fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
968 grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
969 }
970 gpr_atm_rel_store(&calld->subchannel_call,
971 (gpr_atm)(uintptr_t)subchannel_call);
972 retry_waiting_locked(exec_ctx, calld);
Craig Tillera11bfc82017-02-14 09:56:33 -0800973 /* recurse to retry */
Craig Tillera8610c02017-02-14 10:05:11 -0800974 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800975 /* early out */
976 return;
Craig Tillerbefafe62017-02-09 11:30:54 -0800977 }
978 /* nothing to be done but wait */
979 add_waiting_locked(calld, op);
Craig Tillerbefafe62017-02-09 11:30:54 -0800980}
981
Craig Tillera11bfc82017-02-14 09:56:33 -0800982static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
983 void *arg,
984 grpc_error *error_ignored) {
985 GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0);
986
987 grpc_transport_stream_op *op = arg;
988 grpc_call_element *elem = op->handler_private.args[0];
989 call_data *calld = elem->call_data;
990
Craig Tillera8610c02017-02-14 10:05:11 -0800991 start_transport_stream_op_locked_inner(exec_ctx, op, elem);
Craig Tillera11bfc82017-02-14 09:56:33 -0800992
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800993 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
Craig Tiller2c8063c2016-03-22 22:12:15 -0700994 "start_transport_stream_op");
Craig Tillera11bfc82017-02-14 09:56:33 -0800995 GPR_TIMER_END("cc_start_transport_stream_op_locked", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800996}
997
Craig Tillerbe9691a2017-02-14 10:00:42 -0800998/* The logic here is fairly complicated, due to (a) the fact that we
999 need to handle the case where we receive the send op before the
1000 initial metadata op, and (b) the need for efficiency, especially in
1001 the streaming case.
1002
1003 We use double-checked locking to initially see if initialization has been
1004 performed. If it has not, we acquire the combiner and perform initialization.
1005 If it has, we proceed on the fast path. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001006static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
1007 grpc_call_element *elem,
1008 grpc_transport_stream_op *op) {
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001009 call_data *calld = elem->call_data;
1010 channel_data *chand = elem->channel_data;
1011 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
1012 grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
1013 /* try to (atomically) get the call */
1014 grpc_subchannel_call *call = GET_CALL(calld);
1015 GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
1016 if (call == CANCELLED_CALL) {
1017 grpc_transport_stream_op_finish_with_failure(
1018 exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
1019 GPR_TIMER_END("cc_start_transport_stream_op", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001020 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001021 return;
1022 }
1023 if (call != NULL) {
1024 grpc_subchannel_call_process_op(exec_ctx, call, op);
1025 GPR_TIMER_END("cc_start_transport_stream_op", 0);
Craig Tillera11bfc82017-02-14 09:56:33 -08001026 /* early out */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001027 return;
1028 }
1029 /* we failed; lock and figure out what to do */
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001030 GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001031 op->handler_private.args[0] = elem;
Craig Tillerbefafe62017-02-09 11:30:54 -08001032 grpc_closure_sched(
1033 exec_ctx,
Craig Tiller4a84bdd2017-02-14 09:48:41 -08001034 grpc_closure_init(&op->handler_private.closure,
Craig Tillerbefafe62017-02-09 11:30:54 -08001035 cc_start_transport_stream_op_locked, op,
1036 grpc_combiner_scheduler(chand->combiner, false)),
1037 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001038 GPR_TIMER_END("cc_start_transport_stream_op", 0);
1039}
1040
Mark D. Rothe40dd292016-10-05 14:58:37 -07001041// Gets data from the service config. Invoked when the resolver returns
1042// its initial result.
Craig Tillerbefafe62017-02-09 11:30:54 -08001043static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg,
1044 grpc_error *error) {
Mark D. Rothe40dd292016-10-05 14:58:37 -07001045 grpc_call_element *elem = arg;
1046 channel_data *chand = elem->channel_data;
1047 call_data *calld = elem->call_data;
1048 // If this is an error, there's no point in looking at the service config.
Mark D. Roth196387a2016-10-12 14:53:36 -07001049 if (error == GRPC_ERROR_NONE) {
1050 // Get the method config table from channel data.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001051 grpc_slice_hash_table *method_params_table = NULL;
Mark D. Roth9d480942016-10-19 14:18:05 -07001052 if (chand->method_params_table != NULL) {
1053 method_params_table =
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001054 grpc_slice_hash_table_ref(chand->method_params_table);
Mark D. Rothe40dd292016-10-05 14:58:37 -07001055 }
Mark D. Roth196387a2016-10-12 14:53:36 -07001056 // If the method config table was present, use it.
Mark D. Roth9d480942016-10-19 14:18:05 -07001057 if (method_params_table != NULL) {
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001058 const method_parameters *method_params = grpc_method_config_table_get(
1059 exec_ctx, method_params_table, calld->path);
Mark D. Roth9d480942016-10-19 14:18:05 -07001060 if (method_params != NULL) {
1061 const bool have_method_timeout =
1062 gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
1063 if (have_method_timeout ||
1064 method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
Mark D. Roth9d480942016-10-19 14:18:05 -07001065 if (have_method_timeout) {
1066 const gpr_timespec per_method_deadline =
1067 gpr_time_add(calld->call_start_time, method_params->timeout);
Mark D. Roth196387a2016-10-12 14:53:36 -07001068 if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
1069 calld->deadline = per_method_deadline;
1070 // Reset deadline timer.
1071 grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
1072 }
1073 }
Mark D. Roth9d480942016-10-19 14:18:05 -07001074 if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
Mark D. Roth196387a2016-10-12 14:53:36 -07001075 calld->wait_for_ready_from_service_config =
Mark D. Roth9d480942016-10-19 14:18:05 -07001076 method_params->wait_for_ready;
Mark D. Roth196387a2016-10-12 14:53:36 -07001077 }
Mark D. Roth196387a2016-10-12 14:53:36 -07001078 }
1079 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001080 grpc_slice_hash_table_unref(exec_ctx, method_params_table);
Mark D. Roth196387a2016-10-12 14:53:36 -07001081 }
Mark D. Rothe40dd292016-10-05 14:58:37 -07001082 }
Mark D. Roth31292f22016-10-12 13:14:07 -07001083 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
Mark D. Rothe40dd292016-10-05 14:58:37 -07001084}
1085
Craig Tillerbefafe62017-02-09 11:30:54 -08001086static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
1087 void *arg,
1088 grpc_error *error_ignored) {
1089 grpc_call_element *elem = arg;
1090 channel_data *chand = elem->channel_data;
1091 call_data *calld = elem->call_data;
1092 // If the resolver has already returned results, then we can access
1093 // the service config parameters immediately. Otherwise, we need to
1094 // defer that work until the resolver returns an initial result.
1095 // TODO(roth): This code is almost but not quite identical to the code
1096 // in read_service_config() above. It would be nice to find a way to
1097 // combine them, to avoid having to maintain it twice.
1098 if (chand->lb_policy != NULL) {
1099 // We already have a resolver result, so check for service config.
1100 if (chand->method_params_table != NULL) {
1101 grpc_slice_hash_table *method_params_table =
1102 grpc_slice_hash_table_ref(chand->method_params_table);
1103 method_parameters *method_params = grpc_method_config_table_get(
1104 exec_ctx, method_params_table, calld->path);
1105 if (method_params != NULL) {
1106 if (gpr_time_cmp(method_params->timeout,
1107 gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
1108 gpr_timespec per_method_deadline =
1109 gpr_time_add(calld->call_start_time, method_params->timeout);
1110 calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
1111 }
1112 if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
1113 calld->wait_for_ready_from_service_config =
1114 method_params->wait_for_ready;
1115 }
1116 }
1117 grpc_slice_hash_table_unref(exec_ctx, method_params_table);
1118 }
1119 } else {
1120 // We don't yet have a resolver result, so register a callback to
1121 // get the service config data once the resolver returns.
1122 // Take a reference to the call stack to be owned by the callback.
1123 GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
1124 grpc_closure_init(&calld->read_service_config, read_service_config_locked,
1125 elem, grpc_combiner_scheduler(chand->combiner, false));
1126 grpc_closure_list_append(&chand->waiting_for_config_closures,
1127 &calld->read_service_config, GRPC_ERROR_NONE);
1128 }
1129 // Start the deadline timer with the current deadline value. If we
1130 // do not yet have service config data, then the timer may be reset
1131 // later.
1132 grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001133 GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
1134 "initial_read_service_config");
Craig Tillerbefafe62017-02-09 11:30:54 -08001135}
1136
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001137/* Constructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001138static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
1139 grpc_call_element *elem,
Craig Tillerc52ba3a2017-02-15 22:57:43 -08001140 const grpc_call_element_args *args) {
Mark D. Rothaa850a72016-09-26 13:38:02 -07001141 channel_data *chand = elem->channel_data;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001142 call_data *calld = elem->call_data;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001143 // Initialize data members.
1144 grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001145 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07001146 calld->call_start_time = args->start_time;
Mark D. Rothe40dd292016-10-05 14:58:37 -07001147 calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
1148 calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
Mark D. Rothf28763c2016-09-14 15:18:40 -07001149 calld->cancel_error = GRPC_ERROR_NONE;
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001150 gpr_atm_rel_store(&calld->subchannel_call, 0);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001151 calld->connected_subchannel = NULL;
1152 calld->waiting_ops = NULL;
1153 calld->waiting_ops_count = 0;
1154 calld->waiting_ops_capacity = 0;
1155 calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
1156 calld->owning_call = args->call_stack;
1157 calld->pollent = NULL;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001158 GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
Craig Tillerbefafe62017-02-09 11:30:54 -08001159 grpc_closure_sched(
1160 exec_ctx,
1161 grpc_closure_init(&calld->read_service_config,
1162 initial_read_service_config_locked, elem,
1163 grpc_combiner_scheduler(chand->combiner, false)),
1164 GRPC_ERROR_NONE);
Mark D. Roth0badbe82016-06-23 10:15:12 -07001165 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001166}
1167
1168/* Destructor for call_data */
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001169static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
1170 grpc_call_element *elem,
1171 const grpc_call_final_info *final_info,
1172 void *and_free_memory) {
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001173 call_data *calld = elem->call_data;
Mark D. Rothf28763c2016-09-14 15:18:40 -07001174 grpc_deadline_state_destroy(exec_ctx, elem);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001175 grpc_slice_unref_internal(exec_ctx, calld->path);
Mark D. Rothf28763c2016-09-14 15:18:40 -07001176 GRPC_ERROR_UNREF(calld->cancel_error);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001177 grpc_subchannel_call *call = GET_CALL(calld);
1178 if (call != NULL && call != CANCELLED_CALL) {
1179 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
1180 }
1181 GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001182 GPR_ASSERT(calld->waiting_ops_count == 0);
Craig Tiller693d3942016-10-27 16:51:25 -07001183 if (calld->connected_subchannel != NULL) {
1184 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
1185 "picked");
1186 }
Mark D. Roth4c0fe492016-08-31 13:51:55 -07001187 gpr_free(calld->waiting_ops);
Craig Tiller2c8063c2016-03-22 22:12:15 -07001188 gpr_free(and_free_memory);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001189}
1190
David Garcia Quintasf72eb972016-05-03 18:28:09 -07001191static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
1192 grpc_call_element *elem,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001193 grpc_polling_entity *pollent) {
Craig Tiller577c9b22015-11-02 14:11:15 -08001194 call_data *calld = elem->call_data;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07001195 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08001196}
1197
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001198/*************************************************************************
1199 * EXPORTED SYMBOLS
1200 */
1201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001202const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillerf40df232016-03-25 13:38:14 -07001203 cc_start_transport_stream_op,
1204 cc_start_transport_op,
1205 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001206 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07001207 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001208 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001209 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001210 cc_init_channel_elem,
1211 cc_destroy_channel_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07001212 cc_get_peer,
Mark D. Rothb2d24882016-10-27 15:44:07 -07001213 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07001214 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07001215};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001216
Craig Tiller613dafa2017-02-09 12:00:43 -08001217static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
1218 grpc_error *error_ignored) {
1219 channel_data *chand = arg;
1220 if (chand->lb_policy != NULL) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001221 grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
Craig Tiller613dafa2017-02-09 12:00:43 -08001222 } else {
1223 chand->exit_idle_when_lb_policy_arrives = true;
1224 if (!chand->started_resolving && chand->resolver != NULL) {
1225 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
1226 chand->started_resolving = true;
Craig Tiller972470b2017-02-09 15:05:36 -08001227 grpc_resolver_next_locked(exec_ctx, chand->resolver,
1228 &chand->resolver_result,
1229 &chand->on_resolver_result_changed);
Craig Tiller613dafa2017-02-09 12:00:43 -08001230 }
1231 }
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001232 GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001233}
1234
Craig Tillera82950e2015-09-22 12:33:20 -07001235grpc_connectivity_state grpc_client_channel_check_connectivity_state(
1236 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001237 channel_data *chand = elem->channel_data;
Craig Tillera8610c02017-02-14 10:05:11 -08001238 grpc_connectivity_state out =
1239 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07001240 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08001241 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08001242 grpc_closure_sched(
1243 exec_ctx,
1244 grpc_closure_create(try_to_connect_locked, chand,
1245 grpc_combiner_scheduler(chand->combiner, false)),
1246 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07001247 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07001248 return out;
1249}
1250
Craig Tiller86c99582015-11-25 15:22:26 -08001251typedef struct {
1252 channel_data *chand;
1253 grpc_pollset *pollset;
1254 grpc_closure *on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001255 grpc_connectivity_state *state;
Craig Tiller86c99582015-11-25 15:22:26 -08001256 grpc_closure my_closure;
1257} external_connectivity_watcher;
1258
Craig Tiller1d881fb2015-12-01 07:39:04 -08001259static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller804ff712016-05-05 16:25:40 -07001260 grpc_error *error) {
Craig Tiller86c99582015-11-25 15:22:26 -08001261 external_connectivity_watcher *w = arg;
1262 grpc_closure *follow_up = w->on_complete;
Craig Tiller69b093b2016-02-25 19:04:07 -08001263 grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
Craig Tiller1d881fb2015-12-01 07:39:04 -08001264 w->pollset);
1265 GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
1266 "external_connectivity_watcher");
Craig Tiller86c99582015-11-25 15:22:26 -08001267 gpr_free(w);
Craig Tiller613dafa2017-02-09 12:00:43 -08001268 grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
1269}
1270
Craig Tillera8610c02017-02-14 10:05:11 -08001271static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
1272 grpc_error *error_ignored) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001273 external_connectivity_watcher *w = arg;
1274 grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
1275 grpc_schedule_on_exec_ctx);
1276 grpc_connectivity_state_notify_on_state_change(
1277 exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
Craig Tiller86c99582015-11-25 15:22:26 -08001278}
1279
Craig Tillera82950e2015-09-22 12:33:20 -07001280void grpc_client_channel_watch_connectivity_state(
Craig Tiller906e3bc2015-11-24 07:31:31 -08001281 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
Craig Tillera82950e2015-09-22 12:33:20 -07001282 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -07001283 channel_data *chand = elem->channel_data;
Craig Tiller86c99582015-11-25 15:22:26 -08001284 external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
1285 w->chand = chand;
1286 w->pollset = pollset;
1287 w->on_complete = on_complete;
Craig Tiller613dafa2017-02-09 12:00:43 -08001288 w->state = state;
Craig Tiller69b093b2016-02-25 19:04:07 -08001289 grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
Craig Tiller1d881fb2015-12-01 07:39:04 -08001290 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
1291 "external_connectivity_watcher");
Craig Tiller613dafa2017-02-09 12:00:43 -08001292 grpc_closure_sched(
1293 exec_ctx,
Craig Tillera8610c02017-02-14 10:05:11 -08001294 grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tiller613dafa2017-02-09 12:00:43 -08001295 grpc_combiner_scheduler(chand->combiner, true)),
1296 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07001297}