blob: ce03ad5eb0607ff2bb987dbb3e38bd2e7d71ef16 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include "src/core/channel/channel_args.h"
40#include "src/core/channel/connected_channel.h"
Craig Tiller98465032015-06-29 14:36:42 -070041#include "src/core/surface/channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070044#include "src/core/transport/connectivity_state.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc/support/sync.h>
48#include <grpc/support/useful.h>
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050/* Client channel implementation */
51
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052typedef struct call_data call_data;
53
Craig Tillera82950e2015-09-22 12:33:20 -070054typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070055 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070056 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070057 /** resolver for this channel */
58 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -070059 /** have we started resolving this channel */
60 int started_resolving;
Craig Tiller50bc6092015-07-01 14:25:19 -070061 /** master channel - the grpc_channel instance that ultimately owns
62 this channel_data via its channel stack.
63 We occasionally use this to bump the refcount on the master channel
64 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070065 grpc_channel *master;
Craig Tillerf5f17122015-06-25 08:47:26 -070066
Craig Tiller9d94b602015-07-01 14:23:18 -070067 /** mutex protecting client configuration, including all
68 variables below in this data structure */
Craig Tillerf5f17122015-06-25 08:47:26 -070069 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070070 /** currently active load balancer - guarded by mu_config */
71 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070072 /** incoming configuration - set by resolver.next
73 guarded by mu_config */
74 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070075 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -070076 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -070077 /** resolver callback */
Craig Tiller33825112015-09-18 07:44:19 -070078 grpc_closure on_config_changed;
Craig Tiller3f475422015-06-25 10:43:05 -070079 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070080 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -070081 /** when an lb_policy arrives, should we try to exit idle */
82 int exit_idle_when_lb_policy_arrives;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070083 /** pollset_set of interested parties in a new connection */
84 grpc_pollset_set pollset_set;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080085} channel_data;
86
Craig Tillerd6c98df2015-08-18 09:33:44 -070087/** We create one watcher for each new lb_policy that is returned from a
88 resolver,
89 to watch for state changes from the lb_policy. When a state change is seen,
90 we
Craig Tiller1ada6ad2015-07-16 16:19:14 -070091 update the channel, and create a new watcher */
Craig Tillera82950e2015-09-22 12:33:20 -070092typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -070093 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -070094 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070095 grpc_connectivity_state state;
96 grpc_lb_policy *lb_policy;
97} lb_policy_connectivity_watcher;
98
Craig Tillera82950e2015-09-22 12:33:20 -070099typedef enum {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100 CALL_CREATED,
Craig Tiller5d44c062015-07-01 08:55:28 -0700101 CALL_WAITING_FOR_SEND,
Craig Tiller3f475422015-06-25 10:43:05 -0700102 CALL_WAITING_FOR_CONFIG,
103 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700104 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800105 CALL_ACTIVE,
106 CALL_CANCELLED
107} call_state;
108
Craig Tillera82950e2015-09-22 12:33:20 -0700109struct call_data {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110 /* owning element */
111 grpc_call_element *elem;
112
Craig Tillerf5f17122015-06-25 08:47:26 -0700113 gpr_mu mu_state;
114
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800116 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700117 grpc_subchannel *picked_channel;
Craig Tiller33825112015-09-18 07:44:19 -0700118 grpc_closure async_setup_task;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119 grpc_transport_stream_op waiting_op;
120 /* our child call stack */
121 grpc_subchannel_call *subchannel_call;
122 grpc_linked_mdelem status;
123 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800124};
125
Craig Tillera82950e2015-09-22 12:33:20 -0700126static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
127 grpc_transport_stream_op *new_op)
128 GRPC_MUST_USE_RESULT;
Craig Tiller5d44c062015-07-01 08:55:28 -0700129
Craig Tillera82950e2015-09-22 12:33:20 -0700130static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
131 grpc_call_element *elem,
132 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700133 call_data *calld = elem->call_data;
134 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700135 if (op->send_ops) {
136 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
137 op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
138 }
139 if (op->recv_ops) {
140 char status[GPR_LTOA_MIN_BUFSIZE];
141 grpc_metadata_batch mdb;
142 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
143 calld->status.md =
144 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
145 calld->details.md =
146 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
147 calld->status.prev = calld->details.next = NULL;
148 calld->status.next = &calld->details;
149 calld->details.prev = &calld->status;
150 mdb.list.head = &calld->status;
151 mdb.list.tail = &calld->details;
152 mdb.garbage.head = mdb.garbage.tail = NULL;
153 mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
154 grpc_sopb_add_metadata(op->recv_ops, mdb);
155 *op->recv_state = GRPC_STREAM_CLOSED;
156 op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
157 }
158 if (op->on_consumed) {
159 op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
160 }
Craig Tillere70413c2015-04-24 10:12:34 -0700161}
Craig Tiller8b976d02015-02-05 21:41:23 -0800162
Craig Tillera82950e2015-09-22 12:33:20 -0700163typedef struct {
Craig Tiller33825112015-09-18 07:44:19 -0700164 grpc_closure closure;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700165 grpc_call_element *elem;
166} waiting_call;
167
Craig Tillera82950e2015-09-22 12:33:20 -0700168static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
169 grpc_call_element *elem,
170 grpc_transport_stream_op *op,
171 int continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700172
Craig Tillera82950e2015-09-22 12:33:20 -0700173static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
174 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700175 waiting_call *wc = arg;
176 call_data *calld = wc->elem->call_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700177 perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
178 gpr_free(wc);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700179}
180
Craig Tillera82950e2015-09-22 12:33:20 -0700181static void add_to_lb_policy_wait_queue_locked_state_config(
182 grpc_call_element *elem) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700183 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700184 waiting_call *wc = gpr_malloc(sizeof(*wc));
185 grpc_closure_init(&wc->closure, continue_with_pick, wc);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700186 wc->elem = elem;
Craig Tillera82950e2015-09-22 12:33:20 -0700187 grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700188}
189
Craig Tillera82950e2015-09-22 12:33:20 -0700190static int is_empty(void *p, int len) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700191 char *ptr = p;
192 int i;
Craig Tillera82950e2015-09-22 12:33:20 -0700193 for (i = 0; i < len; i++) {
194 if (ptr[i] != 0) return 0;
195 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700196 return 1;
197}
198
Craig Tillera82950e2015-09-22 12:33:20 -0700199static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
200 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700201 call_data *calld = arg;
202 grpc_transport_stream_op op;
203 int have_waiting;
204
Craig Tillera82950e2015-09-22 12:33:20 -0700205 gpr_mu_lock(&calld->mu_state);
206 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
207 memset(&op, 0, sizeof(op));
208 op.cancel_with_status = GRPC_STATUS_CANCELLED;
209 gpr_mu_unlock(&calld->mu_state);
210 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
211 } else if (calld->state == CALL_WAITING_FOR_CALL) {
212 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
213 if (calld->subchannel_call != NULL) {
214 calld->state = CALL_ACTIVE;
215 gpr_mu_unlock(&calld->mu_state);
216 if (have_waiting) {
217 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
218 &calld->waiting_op);
219 }
220 } else {
221 calld->state = CALL_CANCELLED;
222 gpr_mu_unlock(&calld->mu_state);
223 if (have_waiting) {
224 handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
225 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700226 }
Craig Tillera82950e2015-09-22 12:33:20 -0700227 } else {
228 GPR_ASSERT(calld->state == CALL_CANCELLED);
229 gpr_mu_unlock(&calld->mu_state);
230 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700231}
232
Craig Tillera82950e2015-09-22 12:33:20 -0700233static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
234 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700235 call_data *calld = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700236 grpc_pollset *pollset;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700237
Craig Tillera82950e2015-09-22 12:33:20 -0700238 if (calld->picked_channel == NULL) {
239 /* treat this like a cancellation */
240 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
241 perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
242 } else {
243 gpr_mu_lock(&calld->mu_state);
244 if (calld->state == CALL_CANCELLED) {
245 gpr_mu_unlock(&calld->mu_state);
246 handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
247 } else {
248 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
249 calld->state = CALL_WAITING_FOR_CALL;
250 pollset = calld->waiting_op.bind_pollset;
251 gpr_mu_unlock(&calld->mu_state);
252 grpc_closure_init(&calld->async_setup_task, started_call, calld);
253 grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
254 &calld->subchannel_call,
255 &calld->async_setup_task);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700256 }
Craig Tillera82950e2015-09-22 12:33:20 -0700257 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700258}
259
Craig Tillera82950e2015-09-22 12:33:20 -0700260static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
261 grpc_transport_stream_op *new_op) {
Craig Tillerbb32ba32015-06-30 09:31:48 -0700262 call_data *calld = elem->call_data;
Craig Tiller33825112015-09-18 07:44:19 -0700263 grpc_closure *consumed_op = NULL;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700264 grpc_transport_stream_op *waiting_op = &calld->waiting_op;
Craig Tillera82950e2015-09-22 12:33:20 -0700265 GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
266 GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
267 if (new_op->send_ops != NULL) {
268 waiting_op->send_ops = new_op->send_ops;
269 waiting_op->is_last_send = new_op->is_last_send;
270 waiting_op->on_done_send = new_op->on_done_send;
271 }
272 if (new_op->recv_ops != NULL) {
273 waiting_op->recv_ops = new_op->recv_ops;
274 waiting_op->recv_state = new_op->recv_state;
275 waiting_op->on_done_recv = new_op->on_done_recv;
276 }
277 if (new_op->on_consumed != NULL) {
278 if (waiting_op->on_consumed != NULL) {
279 consumed_op = waiting_op->on_consumed;
Craig Tiller5d44c062015-07-01 08:55:28 -0700280 }
Craig Tillera82950e2015-09-22 12:33:20 -0700281 waiting_op->on_consumed = new_op->on_consumed;
282 }
283 if (new_op->cancel_with_status != GRPC_STATUS_OK) {
284 waiting_op->cancel_with_status = new_op->cancel_with_status;
285 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700286 return consumed_op;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700287}
288
Craig Tillera82950e2015-09-22 12:33:20 -0700289static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700290 call_data *calld = elem->call_data;
291 channel_data *chand = elem->channel_data;
292 grpc_subchannel_call *subchannel_call;
293 char *result;
294
Craig Tillera82950e2015-09-22 12:33:20 -0700295 gpr_mu_lock(&calld->mu_state);
296 if (calld->state == CALL_ACTIVE) {
297 subchannel_call = calld->subchannel_call;
298 GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
299 gpr_mu_unlock(&calld->mu_state);
300 result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
301 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
302 return result;
303 } else {
304 gpr_mu_unlock(&calld->mu_state);
305 return grpc_channel_get_target(chand->master);
306 }
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700307}
308
Craig Tillera82950e2015-09-22 12:33:20 -0700309static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
310 grpc_call_element *elem,
311 grpc_transport_stream_op *op,
312 int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313 call_data *calld = elem->call_data;
314 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700315 grpc_subchannel_call *subchannel_call;
316 grpc_lb_policy *lb_policy;
Craig Tiller49924e02015-06-29 22:42:33 -0700317 grpc_transport_stream_op op2;
Craig Tillera82950e2015-09-22 12:33:20 -0700318 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
319 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800320
Craig Tillera82950e2015-09-22 12:33:20 -0700321 gpr_mu_lock(&calld->mu_state);
322 switch (calld->state) {
Craig Tillerf5f17122015-06-25 08:47:26 -0700323 case CALL_ACTIVE:
Craig Tillera82950e2015-09-22 12:33:20 -0700324 GPR_ASSERT(!continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700325 subchannel_call = calld->subchannel_call;
Craig Tillera82950e2015-09-22 12:33:20 -0700326 gpr_mu_unlock(&calld->mu_state);
327 grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700328 break;
329 case CALL_CANCELLED:
Craig Tillera82950e2015-09-22 12:33:20 -0700330 gpr_mu_unlock(&calld->mu_state);
331 handle_op_after_cancellation(exec_ctx, elem, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700332 break;
Craig Tiller5d44c062015-07-01 08:55:28 -0700333 case CALL_WAITING_FOR_SEND:
Craig Tillera82950e2015-09-22 12:33:20 -0700334 GPR_ASSERT(!continuation);
335 grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
336 if (!calld->waiting_op.send_ops &&
337 calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
338 gpr_mu_unlock(&calld->mu_state);
339 break;
340 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700341 *op = calld->waiting_op;
Craig Tillera82950e2015-09-22 12:33:20 -0700342 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
Craig Tiller5d44c062015-07-01 08:55:28 -0700343 continuation = 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700344 /* fall through */
Craig Tillereb3b12e2015-06-26 14:42:49 -0700345 case CALL_WAITING_FOR_CONFIG:
346 case CALL_WAITING_FOR_PICK:
347 case CALL_WAITING_FOR_CALL:
Craig Tillera82950e2015-09-22 12:33:20 -0700348 if (!continuation) {
349 if (op->cancel_with_status != GRPC_STATUS_OK) {
350 calld->state = CALL_CANCELLED;
351 op2 = calld->waiting_op;
352 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
353 if (op->on_consumed) {
354 calld->waiting_op.on_consumed = op->on_consumed;
355 op->on_consumed = NULL;
356 } else if (op2.on_consumed) {
357 calld->waiting_op.on_consumed = op2.on_consumed;
358 op2.on_consumed = NULL;
359 }
360 gpr_mu_unlock(&calld->mu_state);
361 handle_op_after_cancellation(exec_ctx, elem, op);
362 handle_op_after_cancellation(exec_ctx, elem, &op2);
363 } else {
364 grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
365 gpr_mu_unlock(&calld->mu_state);
366 }
367 break;
368 }
369 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700370 case CALL_CREATED:
Craig Tillera82950e2015-09-22 12:33:20 -0700371 if (op->cancel_with_status != GRPC_STATUS_OK) {
372 calld->state = CALL_CANCELLED;
373 gpr_mu_unlock(&calld->mu_state);
374 handle_op_after_cancellation(exec_ctx, elem, op);
375 } else {
376 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700377
Craig Tillera82950e2015-09-22 12:33:20 -0700378 if (op->send_ops == NULL) {
379 /* need to have some send ops before we can select the
380 lb target */
381 calld->state = CALL_WAITING_FOR_SEND;
382 gpr_mu_unlock(&calld->mu_state);
383 } else {
384 gpr_mu_lock(&chand->mu_config);
385 lb_policy = chand->lb_policy;
386 if (lb_policy) {
387 grpc_transport_stream_op *op = &calld->waiting_op;
388 grpc_pollset *bind_pollset = op->bind_pollset;
389 grpc_metadata_batch *initial_metadata =
390 &op->send_ops->ops[0].data.metadata;
391 GRPC_LB_POLICY_REF(lb_policy, "pick");
392 gpr_mu_unlock(&chand->mu_config);
393 calld->state = CALL_WAITING_FOR_PICK;
Craig Tiller3d578712015-07-20 22:00:24 -0700394
Craig Tillera82950e2015-09-22 12:33:20 -0700395 GPR_ASSERT(op->bind_pollset);
396 GPR_ASSERT(op->send_ops);
397 GPR_ASSERT(op->send_ops->nops >= 1);
398 GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
399 gpr_mu_unlock(&calld->mu_state);
Craig Tiller5d44c062015-07-01 08:55:28 -0700400
Craig Tillera82950e2015-09-22 12:33:20 -0700401 grpc_closure_init(&calld->async_setup_task, picked_target, calld);
402 grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
403 initial_metadata, &calld->picked_channel,
404 &calld->async_setup_task);
Craig Tiller5d44c062015-07-01 08:55:28 -0700405
Craig Tillera82950e2015-09-22 12:33:20 -0700406 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
407 } else if (chand->resolver != NULL) {
408 calld->state = CALL_WAITING_FOR_CONFIG;
409 add_to_lb_policy_wait_queue_locked_state_config(elem);
410 if (!chand->started_resolving && chand->resolver != NULL) {
411 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
412 chand->started_resolving = 1;
413 grpc_resolver_next(exec_ctx, chand->resolver,
414 &chand->incoming_configuration,
415 &chand->on_config_changed);
416 }
417 gpr_mu_unlock(&chand->mu_config);
418 gpr_mu_unlock(&calld->mu_state);
419 } else {
420 calld->state = CALL_CANCELLED;
421 gpr_mu_unlock(&chand->mu_config);
422 gpr_mu_unlock(&calld->mu_state);
423 handle_op_after_cancellation(exec_ctx, elem, op);
424 }
425 }
426 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700427 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700428 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700429}
Craig Tillerf5f17122015-06-25 08:47:26 -0700430
Craig Tillera82950e2015-09-22 12:33:20 -0700431static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
432 grpc_call_element *elem,
433 grpc_transport_stream_op *op) {
434 perform_transport_stream_op(exec_ctx, elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800435}
436
Craig Tillera82950e2015-09-22 12:33:20 -0700437static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
438 grpc_lb_policy *lb_policy,
439 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700440
Craig Tillera82950e2015-09-22 12:33:20 -0700441static void on_lb_policy_state_changed_locked(
442 grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
Craig Tiller5795da72015-09-17 15:27:13 -0700443 /* check if the notification is for a stale policy */
Craig Tillera82950e2015-09-22 12:33:20 -0700444 if (w->lb_policy != w->chand->lb_policy) return;
Craig Tiller5795da72015-09-17 15:27:13 -0700445
Craig Tillera82950e2015-09-22 12:33:20 -0700446 grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
447 "lb_changed");
448 if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
449 watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
450 }
Craig Tiller5795da72015-09-17 15:27:13 -0700451}
452
Craig Tillera82950e2015-09-22 12:33:20 -0700453static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
454 int iomgr_success) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700455 lb_policy_connectivity_watcher *w = arg;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700456
Craig Tillera82950e2015-09-22 12:33:20 -0700457 gpr_mu_lock(&w->chand->mu_config);
458 on_lb_policy_state_changed_locked(exec_ctx, w);
459 gpr_mu_unlock(&w->chand->mu_config);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700460
Craig Tillera82950e2015-09-22 12:33:20 -0700461 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
462 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700463}
464
Craig Tillera82950e2015-09-22 12:33:20 -0700465static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
466 grpc_lb_policy *lb_policy,
467 grpc_connectivity_state current_state) {
468 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
469 GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700470
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700471 w->chand = chand;
Craig Tillera82950e2015-09-22 12:33:20 -0700472 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700473 w->state = current_state;
474 w->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700475 grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
476 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700477}
478
Craig Tillera82950e2015-09-22 12:33:20 -0700479static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
480 int iomgr_success) {
Craig Tiller3f475422015-06-25 10:43:05 -0700481 channel_data *chand = arg;
482 grpc_lb_policy *lb_policy = NULL;
483 grpc_lb_policy *old_lb_policy;
484 grpc_resolver *old_resolver;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700485 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700486 int exit_idle = 0;
Craig Tiller3f475422015-06-25 10:43:05 -0700487
Craig Tillera82950e2015-09-22 12:33:20 -0700488 if (chand->incoming_configuration != NULL) {
489 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
490 if (lb_policy != NULL) {
491 GRPC_LB_POLICY_REF(lb_policy, "channel");
492 GRPC_LB_POLICY_REF(lb_policy, "config_change");
493 state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
Craig Tiller45724b32015-09-22 10:42:19 -0700494 }
Craig Tiller3f475422015-06-25 10:43:05 -0700495
Craig Tillera82950e2015-09-22 12:33:20 -0700496 grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
497 }
498
Craig Tiller3f475422015-06-25 10:43:05 -0700499 chand->incoming_configuration = NULL;
500
Craig Tillera82950e2015-09-22 12:33:20 -0700501 gpr_mu_lock(&chand->mu_config);
Craig Tiller3f475422015-06-25 10:43:05 -0700502 old_lb_policy = chand->lb_policy;
503 chand->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700504 if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
505 grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
506 }
507 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
508 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
509 exit_idle = 1;
510 chand->exit_idle_when_lb_policy_arrives = 0;
511 }
Craig Tiller98465032015-06-29 14:36:42 -0700512
Craig Tillera82950e2015-09-22 12:33:20 -0700513 if (iomgr_success && chand->resolver) {
514 grpc_resolver *resolver = chand->resolver;
515 GRPC_RESOLVER_REF(resolver, "channel-next");
516 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
517 "new_lb+resolver");
518 if (lb_policy != NULL) {
519 watch_lb_policy(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700520 }
Craig Tillera82950e2015-09-22 12:33:20 -0700521 gpr_mu_unlock(&chand->mu_config);
522 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
523 grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
524 &chand->on_config_changed);
525 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
526 } else {
527 old_resolver = chand->resolver;
528 chand->resolver = NULL;
529 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
530 GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
531 gpr_mu_unlock(&chand->mu_config);
532 if (old_resolver != NULL) {
533 grpc_resolver_shutdown(exec_ctx, old_resolver);
534 GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700535 }
Craig Tillera82950e2015-09-22 12:33:20 -0700536 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700537
Craig Tillera82950e2015-09-22 12:33:20 -0700538 if (exit_idle) {
539 grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
540 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
541 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700542
Craig Tillera82950e2015-09-22 12:33:20 -0700543 if (old_lb_policy != NULL) {
544 grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
545 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
546 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700547
Craig Tillera82950e2015-09-22 12:33:20 -0700548 if (lb_policy != NULL) {
549 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
550 }
Craig Tiller45724b32015-09-22 10:42:19 -0700551
Craig Tillera82950e2015-09-22 12:33:20 -0700552 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
Craig Tiller3f475422015-06-25 10:43:05 -0700553}
554
Craig Tillera82950e2015-09-22 12:33:20 -0700555static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
556 grpc_channel_element *elem,
557 grpc_transport_op *op) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700558 grpc_lb_policy *lb_policy = NULL;
559 channel_data *chand = elem->channel_data;
Craig Tiller98465032015-06-29 14:36:42 -0700560 grpc_resolver *destroy_resolver = NULL;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700561
Craig Tillera82950e2015-09-22 12:33:20 -0700562 grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700563
Craig Tillera82950e2015-09-22 12:33:20 -0700564 GPR_ASSERT(op->set_accept_stream == NULL);
565 GPR_ASSERT(op->bind_pollset == NULL);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700566
Craig Tillera82950e2015-09-22 12:33:20 -0700567 gpr_mu_lock(&chand->mu_config);
568 if (op->on_connectivity_state_change != NULL) {
569 grpc_connectivity_state_notify_on_state_change(
570 exec_ctx, &chand->state_tracker, op->connectivity_state,
571 op->on_connectivity_state_change);
572 op->on_connectivity_state_change = NULL;
573 op->connectivity_state = NULL;
574 }
575
576 if (!is_empty(op, sizeof(*op))) {
577 lb_policy = chand->lb_policy;
578 if (lb_policy) {
579 GRPC_LB_POLICY_REF(lb_policy, "broadcast");
Craig Tiller03dc6552015-07-17 23:12:34 -0700580 }
Craig Tillera82950e2015-09-22 12:33:20 -0700581 }
Craig Tiller03dc6552015-07-17 23:12:34 -0700582
Craig Tillera82950e2015-09-22 12:33:20 -0700583 if (op->disconnect && chand->resolver != NULL) {
584 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
585 GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
586 destroy_resolver = chand->resolver;
587 chand->resolver = NULL;
588 if (chand->lb_policy != NULL) {
589 grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
590 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
591 chand->lb_policy = NULL;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700592 }
Craig Tillera82950e2015-09-22 12:33:20 -0700593 }
594 gpr_mu_unlock(&chand->mu_config);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700595
Craig Tillera82950e2015-09-22 12:33:20 -0700596 if (destroy_resolver) {
597 grpc_resolver_shutdown(exec_ctx, destroy_resolver);
598 GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
599 }
Craig Tiller98465032015-06-29 14:36:42 -0700600
Craig Tillera82950e2015-09-22 12:33:20 -0700601 if (lb_policy) {
602 grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
603 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
604 }
Craig Tillerca3e9d32015-06-27 18:37:27 -0700605}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800606
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800607/* Constructor for call_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700608static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
609 const void *server_transport_data,
610 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800611 call_data *calld = elem->call_data;
612
Craig Tiller50d9db52015-04-23 10:52:14 -0700613 /* TODO(ctiller): is there something useful we can do here? */
Craig Tillera82950e2015-09-22 12:33:20 -0700614 GPR_ASSERT(initial_op == NULL);
Craig Tiller50d9db52015-04-23 10:52:14 -0700615
Craig Tillera82950e2015-09-22 12:33:20 -0700616 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
617 GPR_ASSERT(server_transport_data == NULL);
618 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800619 calld->elem = elem;
620 calld->state = CALL_CREATED;
Craig Tillera82950e2015-09-22 12:33:20 -0700621 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622}
623
624/* Destructor for call_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700625static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
626 grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700628 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630 /* if the call got activated, we need to destroy the child stack also, and
631 remove it from the in-flight requests tracked by the child_entry we
632 picked */
Craig Tillera82950e2015-09-22 12:33:20 -0700633 gpr_mu_lock(&calld->mu_state);
634 switch (calld->state) {
Craig Tillerf93fd052015-06-02 08:15:33 -0700635 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700636 subchannel_call = calld->subchannel_call;
Craig Tillera82950e2015-09-22 12:33:20 -0700637 gpr_mu_unlock(&calld->mu_state);
638 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
Craig Tillerf93fd052015-06-02 08:15:33 -0700639 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700640 case CALL_CREATED:
641 case CALL_CANCELLED:
Craig Tillera82950e2015-09-22 12:33:20 -0700642 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700643 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700644 case CALL_WAITING_FOR_PICK:
645 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700646 case CALL_WAITING_FOR_CALL:
Craig Tiller5d44c062015-07-01 08:55:28 -0700647 case CALL_WAITING_FOR_SEND:
Craig Tillera82950e2015-09-22 12:33:20 -0700648 gpr_log(GPR_ERROR, "should never reach here");
649 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700650 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700651 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800652}
653
654/* Constructor for channel_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700655static void init_channel_elem(grpc_exec_ctx *exec_ctx,
656 grpc_channel_element *elem, grpc_channel *master,
657 const grpc_channel_args *args,
658 grpc_mdctx *metadata_context, int is_first,
659 int is_last) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800660 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661
Craig Tillera82950e2015-09-22 12:33:20 -0700662 memset(chand, 0, sizeof(*chand));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700663
Craig Tillera82950e2015-09-22 12:33:20 -0700664 GPR_ASSERT(is_last);
665 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800666
Craig Tillera82950e2015-09-22 12:33:20 -0700667 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700668 chand->mdctx = metadata_context;
Craig Tiller98465032015-06-29 14:36:42 -0700669 chand->master = master;
Craig Tillera82950e2015-09-22 12:33:20 -0700670 grpc_pollset_set_init(&chand->pollset_set);
671 grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
Craig Tiller98465032015-06-29 14:36:42 -0700672
Craig Tillera82950e2015-09-22 12:33:20 -0700673 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
674 "client_channel");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800675}
676
677/* Destructor for channel_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700678static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
679 grpc_channel_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800680 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800681
Craig Tillera82950e2015-09-22 12:33:20 -0700682 if (chand->resolver != NULL) {
683 grpc_resolver_shutdown(exec_ctx, chand->resolver);
684 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
685 }
686 if (chand->lb_policy != NULL) {
687 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
688 }
689 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
690 grpc_pollset_set_destroy(&chand->pollset_set);
691 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800692}
693
694const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera82950e2015-09-22 12:33:20 -0700695 cc_start_transport_stream_op,
696 cc_start_transport_op,
697 sizeof(call_data),
698 init_call_elem,
699 destroy_call_elem,
700 sizeof(channel_data),
701 init_channel_elem,
702 destroy_channel_elem,
703 cc_get_peer,
704 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700705};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800706
Craig Tillera82950e2015-09-22 12:33:20 -0700707void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
708 grpc_channel_stack *channel_stack,
709 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800710 /* post construction initialization: set the transport setup pointer */
Craig Tillera82950e2015-09-22 12:33:20 -0700711 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800712 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700713 gpr_mu_lock(&chand->mu_config);
714 GPR_ASSERT(!chand->resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700715 chand->resolver = resolver;
Craig Tillera82950e2015-09-22 12:33:20 -0700716 GRPC_RESOLVER_REF(resolver, "channel");
717 if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
718 chand->exit_idle_when_lb_policy_arrives) {
719 chand->started_resolving = 1;
720 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
721 grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
722 &chand->on_config_changed);
723 }
724 gpr_mu_unlock(&chand->mu_config);
Craig Tiller190d3602015-02-18 09:23:38 -0800725}
Craig Tiller48cb07c2015-07-15 16:16:15 -0700726
Craig Tillera82950e2015-09-22 12:33:20 -0700727grpc_connectivity_state grpc_client_channel_check_connectivity_state(
728 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700729 channel_data *chand = elem->channel_data;
730 grpc_connectivity_state out;
Craig Tillera82950e2015-09-22 12:33:20 -0700731 gpr_mu_lock(&chand->mu_config);
732 out = grpc_connectivity_state_check(&chand->state_tracker);
733 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
734 if (chand->lb_policy != NULL) {
735 grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
736 } else {
737 chand->exit_idle_when_lb_policy_arrives = 1;
738 if (!chand->started_resolving && chand->resolver != NULL) {
739 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
740 chand->started_resolving = 1;
741 grpc_resolver_next(exec_ctx, chand->resolver,
742 &chand->incoming_configuration,
743 &chand->on_config_changed);
744 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700745 }
Craig Tillera82950e2015-09-22 12:33:20 -0700746 }
747 gpr_mu_unlock(&chand->mu_config);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700748 return out;
749}
750
Craig Tillera82950e2015-09-22 12:33:20 -0700751void grpc_client_channel_watch_connectivity_state(
752 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
753 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700754 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700755 gpr_mu_lock(&chand->mu_config);
756 grpc_connectivity_state_notify_on_state_change(
757 exec_ctx, &chand->state_tracker, state, on_complete);
758 gpr_mu_unlock(&chand->mu_config);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700759}
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700760
Craig Tillera82950e2015-09-22 12:33:20 -0700761grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
762 grpc_channel_element *elem) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700763 channel_data *chand = elem->channel_data;
764 return &chand->pollset_set;
765}
766
Craig Tillera82950e2015-09-22 12:33:20 -0700767void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
768 grpc_channel_element *elem,
769 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700770 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700771 grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700772}
773
Craig Tillera82950e2015-09-22 12:33:20 -0700774void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
775 grpc_channel_element *elem,
776 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700777 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700778 grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700779}