blob: b3e10b023722e259efb2bc71816aa3e4f6afa422 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039#include <grpc/support/alloc.h>
40#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc/support/sync.h>
42#include <grpc/support/useful.h>
43
Craig Tiller8910ac62015-10-08 16:49:15 -070044#include "src/core/channel/channel_args.h"
45#include "src/core/channel/connected_channel.h"
46#include "src/core/iomgr/iomgr.h"
47#include "src/core/profiling/timers.h"
48#include "src/core/support/string.h"
49#include "src/core/surface/channel.h"
50#include "src/core/transport/connectivity_state.h"
51
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052/* Client channel implementation */
53
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054typedef struct call_data call_data;
55
Craig Tiller800dacb2015-10-06 09:10:26 -070056typedef struct client_channel_channel_data {
Craig Tillerf5f17122015-06-25 08:47:26 -070057 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070058 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070059 /** resolver for this channel */
60 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -070061 /** have we started resolving this channel */
62 int started_resolving;
Craig Tiller50bc6092015-07-01 14:25:19 -070063 /** master channel - the grpc_channel instance that ultimately owns
64 this channel_data via its channel stack.
65 We occasionally use this to bump the refcount on the master channel
66 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070067 grpc_channel *master;
Craig Tillerf5f17122015-06-25 08:47:26 -070068
Craig Tiller9d94b602015-07-01 14:23:18 -070069 /** mutex protecting client configuration, including all
70 variables below in this data structure */
Craig Tillerf5f17122015-06-25 08:47:26 -070071 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070072 /** currently active load balancer - guarded by mu_config */
73 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070074 /** incoming configuration - set by resolver.next
75 guarded by mu_config */
76 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070077 /** a list of closures that are all waiting for config to come in */
Craig Tillerd9ccbbf2015-09-22 09:30:00 -070078 grpc_closure_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -070079 /** resolver callback */
Craig Tiller33825112015-09-18 07:44:19 -070080 grpc_closure on_config_changed;
Craig Tiller3f475422015-06-25 10:43:05 -070081 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070082 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -070083 /** when an lb_policy arrives, should we try to exit idle */
84 int exit_idle_when_lb_policy_arrives;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070085 /** pollset_set of interested parties in a new connection */
86 grpc_pollset_set pollset_set;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080087} channel_data;
88
Craig Tillerd6c98df2015-08-18 09:33:44 -070089/** We create one watcher for each new lb_policy that is returned from a
90 resolver,
91 to watch for state changes from the lb_policy. When a state change is seen,
92 we
Craig Tiller1ada6ad2015-07-16 16:19:14 -070093 update the channel, and create a new watcher */
Craig Tillera82950e2015-09-22 12:33:20 -070094typedef struct {
Craig Tiller1ada6ad2015-07-16 16:19:14 -070095 channel_data *chand;
Craig Tiller33825112015-09-18 07:44:19 -070096 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070097 grpc_connectivity_state state;
98 grpc_lb_policy *lb_policy;
99} lb_policy_connectivity_watcher;
100
Craig Tillera82950e2015-09-22 12:33:20 -0700101typedef enum {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800102 CALL_CREATED,
Craig Tiller5d44c062015-07-01 08:55:28 -0700103 CALL_WAITING_FOR_SEND,
Craig Tiller3f475422015-06-25 10:43:05 -0700104 CALL_WAITING_FOR_CONFIG,
105 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700106 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800107 CALL_ACTIVE,
108 CALL_CANCELLED
109} call_state;
110
Craig Tillera82950e2015-09-22 12:33:20 -0700111struct call_data {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800112 /* owning element */
113 grpc_call_element *elem;
114
Craig Tillerf5f17122015-06-25 08:47:26 -0700115 gpr_mu mu_state;
116
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119 grpc_subchannel *picked_channel;
Craig Tiller33825112015-09-18 07:44:19 -0700120 grpc_closure async_setup_task;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700121 grpc_transport_stream_op waiting_op;
122 /* our child call stack */
123 grpc_subchannel_call *subchannel_call;
124 grpc_linked_mdelem status;
125 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126};
127
Craig Tillera82950e2015-09-22 12:33:20 -0700128static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
129 grpc_transport_stream_op *new_op)
130 GRPC_MUST_USE_RESULT;
Craig Tiller5d44c062015-07-01 08:55:28 -0700131
Craig Tillera82950e2015-09-22 12:33:20 -0700132static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
133 grpc_call_element *elem,
134 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700135 call_data *calld = elem->call_data;
136 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700137 if (op->send_ops) {
138 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
139 op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
140 }
141 if (op->recv_ops) {
142 char status[GPR_LTOA_MIN_BUFSIZE];
143 grpc_metadata_batch mdb;
144 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
145 calld->status.md =
146 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
147 calld->details.md =
148 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
149 calld->status.prev = calld->details.next = NULL;
150 calld->status.next = &calld->details;
151 calld->details.prev = &calld->status;
152 mdb.list.head = &calld->status;
153 mdb.list.tail = &calld->details;
154 mdb.garbage.head = mdb.garbage.tail = NULL;
155 mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
156 grpc_sopb_add_metadata(op->recv_ops, mdb);
157 *op->recv_state = GRPC_STREAM_CLOSED;
158 op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
159 }
160 if (op->on_consumed) {
161 op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
162 }
Craig Tillere70413c2015-04-24 10:12:34 -0700163}
Craig Tiller8b976d02015-02-05 21:41:23 -0800164
Craig Tillera82950e2015-09-22 12:33:20 -0700165typedef struct {
Craig Tiller33825112015-09-18 07:44:19 -0700166 grpc_closure closure;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700167 grpc_call_element *elem;
168} waiting_call;
169
Craig Tillera82950e2015-09-22 12:33:20 -0700170static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
171 grpc_call_element *elem,
172 grpc_transport_stream_op *op,
173 int continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700174
Craig Tillera82950e2015-09-22 12:33:20 -0700175static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
176 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700177 waiting_call *wc = arg;
178 call_data *calld = wc->elem->call_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700179 perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
180 gpr_free(wc);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700181}
182
Craig Tillera82950e2015-09-22 12:33:20 -0700183static void add_to_lb_policy_wait_queue_locked_state_config(
184 grpc_call_element *elem) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700185 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700186 waiting_call *wc = gpr_malloc(sizeof(*wc));
187 grpc_closure_init(&wc->closure, continue_with_pick, wc);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700188 wc->elem = elem;
Craig Tillera82950e2015-09-22 12:33:20 -0700189 grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700190}
191
Craig Tillera82950e2015-09-22 12:33:20 -0700192static int is_empty(void *p, int len) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700193 char *ptr = p;
194 int i;
Craig Tillera82950e2015-09-22 12:33:20 -0700195 for (i = 0; i < len; i++) {
196 if (ptr[i] != 0) return 0;
197 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700198 return 1;
199}
200
Craig Tillera82950e2015-09-22 12:33:20 -0700201static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
202 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700203 call_data *calld = arg;
204 grpc_transport_stream_op op;
205 int have_waiting;
206
Craig Tillera82950e2015-09-22 12:33:20 -0700207 gpr_mu_lock(&calld->mu_state);
208 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
209 memset(&op, 0, sizeof(op));
210 op.cancel_with_status = GRPC_STATUS_CANCELLED;
211 gpr_mu_unlock(&calld->mu_state);
212 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
213 } else if (calld->state == CALL_WAITING_FOR_CALL) {
214 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
215 if (calld->subchannel_call != NULL) {
216 calld->state = CALL_ACTIVE;
217 gpr_mu_unlock(&calld->mu_state);
218 if (have_waiting) {
219 grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
220 &calld->waiting_op);
221 }
222 } else {
223 calld->state = CALL_CANCELLED;
224 gpr_mu_unlock(&calld->mu_state);
225 if (have_waiting) {
226 handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
227 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700228 }
Craig Tillera82950e2015-09-22 12:33:20 -0700229 } else {
230 GPR_ASSERT(calld->state == CALL_CANCELLED);
231 gpr_mu_unlock(&calld->mu_state);
232 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700233}
234
Craig Tillera82950e2015-09-22 12:33:20 -0700235static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
236 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700237 call_data *calld = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700238 grpc_pollset *pollset;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700239
Craig Tiller8910ac62015-10-08 16:49:15 -0700240 GRPC_TIMER_BEGIN(GRPC_PTAG_CHANNEL_PICKED_TARGET, 0);
241
Craig Tillera82950e2015-09-22 12:33:20 -0700242 if (calld->picked_channel == NULL) {
243 /* treat this like a cancellation */
244 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
245 perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
246 } else {
247 gpr_mu_lock(&calld->mu_state);
248 if (calld->state == CALL_CANCELLED) {
249 gpr_mu_unlock(&calld->mu_state);
250 handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
251 } else {
252 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
253 calld->state = CALL_WAITING_FOR_CALL;
254 pollset = calld->waiting_op.bind_pollset;
255 gpr_mu_unlock(&calld->mu_state);
256 grpc_closure_init(&calld->async_setup_task, started_call, calld);
257 grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
258 &calld->subchannel_call,
259 &calld->async_setup_task);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700260 }
Craig Tillera82950e2015-09-22 12:33:20 -0700261 }
Craig Tiller8910ac62015-10-08 16:49:15 -0700262
263 GRPC_TIMER_END(GRPC_PTAG_CHANNEL_PICKED_TARGET, 0);
Craig Tillerf5f17122015-06-25 08:47:26 -0700264}
265
Craig Tillera82950e2015-09-22 12:33:20 -0700266static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
267 grpc_transport_stream_op *new_op) {
Craig Tillerbb32ba32015-06-30 09:31:48 -0700268 call_data *calld = elem->call_data;
Craig Tiller33825112015-09-18 07:44:19 -0700269 grpc_closure *consumed_op = NULL;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700270 grpc_transport_stream_op *waiting_op = &calld->waiting_op;
Craig Tillera82950e2015-09-22 12:33:20 -0700271 GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
272 GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
273 if (new_op->send_ops != NULL) {
274 waiting_op->send_ops = new_op->send_ops;
275 waiting_op->is_last_send = new_op->is_last_send;
276 waiting_op->on_done_send = new_op->on_done_send;
277 }
278 if (new_op->recv_ops != NULL) {
279 waiting_op->recv_ops = new_op->recv_ops;
280 waiting_op->recv_state = new_op->recv_state;
281 waiting_op->on_done_recv = new_op->on_done_recv;
282 }
283 if (new_op->on_consumed != NULL) {
284 if (waiting_op->on_consumed != NULL) {
285 consumed_op = waiting_op->on_consumed;
Craig Tiller5d44c062015-07-01 08:55:28 -0700286 }
Craig Tillera82950e2015-09-22 12:33:20 -0700287 waiting_op->on_consumed = new_op->on_consumed;
288 }
289 if (new_op->cancel_with_status != GRPC_STATUS_OK) {
290 waiting_op->cancel_with_status = new_op->cancel_with_status;
291 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700292 return consumed_op;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700293}
294
Craig Tillera82950e2015-09-22 12:33:20 -0700295static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700296 call_data *calld = elem->call_data;
297 channel_data *chand = elem->channel_data;
298 grpc_subchannel_call *subchannel_call;
299 char *result;
300
Craig Tillera82950e2015-09-22 12:33:20 -0700301 gpr_mu_lock(&calld->mu_state);
302 if (calld->state == CALL_ACTIVE) {
303 subchannel_call = calld->subchannel_call;
304 GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
305 gpr_mu_unlock(&calld->mu_state);
306 result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
307 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
308 return result;
309 } else {
310 gpr_mu_unlock(&calld->mu_state);
311 return grpc_channel_get_target(chand->master);
312 }
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700313}
314
Craig Tillera82950e2015-09-22 12:33:20 -0700315static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
316 grpc_call_element *elem,
317 grpc_transport_stream_op *op,
318 int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800319 call_data *calld = elem->call_data;
320 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700321 grpc_subchannel_call *subchannel_call;
322 grpc_lb_policy *lb_policy;
Craig Tiller49924e02015-06-29 22:42:33 -0700323 grpc_transport_stream_op op2;
Craig Tillera82950e2015-09-22 12:33:20 -0700324 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
325 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800326
Craig Tillera82950e2015-09-22 12:33:20 -0700327 gpr_mu_lock(&calld->mu_state);
328 switch (calld->state) {
Craig Tillerf5f17122015-06-25 08:47:26 -0700329 case CALL_ACTIVE:
Craig Tillera82950e2015-09-22 12:33:20 -0700330 GPR_ASSERT(!continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700331 subchannel_call = calld->subchannel_call;
Craig Tillera82950e2015-09-22 12:33:20 -0700332 gpr_mu_unlock(&calld->mu_state);
333 grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700334 break;
335 case CALL_CANCELLED:
Craig Tillera82950e2015-09-22 12:33:20 -0700336 gpr_mu_unlock(&calld->mu_state);
337 handle_op_after_cancellation(exec_ctx, elem, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700338 break;
Craig Tiller5d44c062015-07-01 08:55:28 -0700339 case CALL_WAITING_FOR_SEND:
Craig Tillera82950e2015-09-22 12:33:20 -0700340 GPR_ASSERT(!continuation);
341 grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
342 if (!calld->waiting_op.send_ops &&
343 calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
344 gpr_mu_unlock(&calld->mu_state);
345 break;
346 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700347 *op = calld->waiting_op;
Craig Tillera82950e2015-09-22 12:33:20 -0700348 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
Craig Tiller5d44c062015-07-01 08:55:28 -0700349 continuation = 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700350 /* fall through */
Craig Tillereb3b12e2015-06-26 14:42:49 -0700351 case CALL_WAITING_FOR_CONFIG:
352 case CALL_WAITING_FOR_PICK:
353 case CALL_WAITING_FOR_CALL:
Craig Tillera82950e2015-09-22 12:33:20 -0700354 if (!continuation) {
355 if (op->cancel_with_status != GRPC_STATUS_OK) {
356 calld->state = CALL_CANCELLED;
357 op2 = calld->waiting_op;
358 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
359 if (op->on_consumed) {
360 calld->waiting_op.on_consumed = op->on_consumed;
361 op->on_consumed = NULL;
362 } else if (op2.on_consumed) {
363 calld->waiting_op.on_consumed = op2.on_consumed;
364 op2.on_consumed = NULL;
365 }
366 gpr_mu_unlock(&calld->mu_state);
367 handle_op_after_cancellation(exec_ctx, elem, op);
368 handle_op_after_cancellation(exec_ctx, elem, &op2);
369 } else {
370 grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
371 gpr_mu_unlock(&calld->mu_state);
372 }
373 break;
374 }
375 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700376 case CALL_CREATED:
Craig Tillera82950e2015-09-22 12:33:20 -0700377 if (op->cancel_with_status != GRPC_STATUS_OK) {
378 calld->state = CALL_CANCELLED;
379 gpr_mu_unlock(&calld->mu_state);
380 handle_op_after_cancellation(exec_ctx, elem, op);
381 } else {
382 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700383
Craig Tillera82950e2015-09-22 12:33:20 -0700384 if (op->send_ops == NULL) {
385 /* need to have some send ops before we can select the
386 lb target */
387 calld->state = CALL_WAITING_FOR_SEND;
388 gpr_mu_unlock(&calld->mu_state);
389 } else {
390 gpr_mu_lock(&chand->mu_config);
391 lb_policy = chand->lb_policy;
392 if (lb_policy) {
Craig Tillerb9d35962015-09-11 13:31:16 -0700393 grpc_transport_stream_op *waiting_op = &calld->waiting_op;
394 grpc_pollset *bind_pollset = waiting_op->bind_pollset;
Craig Tillera82950e2015-09-22 12:33:20 -0700395 grpc_metadata_batch *initial_metadata =
Craig Tillerb9d35962015-09-11 13:31:16 -0700396 &waiting_op->send_ops->ops[0].data.metadata;
Craig Tillera82950e2015-09-22 12:33:20 -0700397 GRPC_LB_POLICY_REF(lb_policy, "pick");
398 gpr_mu_unlock(&chand->mu_config);
399 calld->state = CALL_WAITING_FOR_PICK;
Craig Tiller3d578712015-07-20 22:00:24 -0700400
Craig Tillerb9d35962015-09-11 13:31:16 -0700401 GPR_ASSERT(waiting_op->bind_pollset);
402 GPR_ASSERT(waiting_op->send_ops);
403 GPR_ASSERT(waiting_op->send_ops->nops >= 1);
404 GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
Craig Tillera82950e2015-09-22 12:33:20 -0700405 gpr_mu_unlock(&calld->mu_state);
Craig Tiller5d44c062015-07-01 08:55:28 -0700406
Craig Tillera82950e2015-09-22 12:33:20 -0700407 grpc_closure_init(&calld->async_setup_task, picked_target, calld);
408 grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
409 initial_metadata, &calld->picked_channel,
410 &calld->async_setup_task);
Craig Tiller5d44c062015-07-01 08:55:28 -0700411
Craig Tillera82950e2015-09-22 12:33:20 -0700412 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
413 } else if (chand->resolver != NULL) {
414 calld->state = CALL_WAITING_FOR_CONFIG;
415 add_to_lb_policy_wait_queue_locked_state_config(elem);
416 if (!chand->started_resolving && chand->resolver != NULL) {
417 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
418 chand->started_resolving = 1;
419 grpc_resolver_next(exec_ctx, chand->resolver,
420 &chand->incoming_configuration,
421 &chand->on_config_changed);
422 }
423 gpr_mu_unlock(&chand->mu_config);
424 gpr_mu_unlock(&calld->mu_state);
425 } else {
426 calld->state = CALL_CANCELLED;
427 gpr_mu_unlock(&chand->mu_config);
428 gpr_mu_unlock(&calld->mu_state);
429 handle_op_after_cancellation(exec_ctx, elem, op);
430 }
431 }
432 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700433 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700434 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700435}
Craig Tillerf5f17122015-06-25 08:47:26 -0700436
Craig Tillera82950e2015-09-22 12:33:20 -0700437static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
438 grpc_call_element *elem,
439 grpc_transport_stream_op *op) {
440 perform_transport_stream_op(exec_ctx, elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800441}
442
Craig Tillera82950e2015-09-22 12:33:20 -0700443static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
444 grpc_lb_policy *lb_policy,
445 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700446
Craig Tillera82950e2015-09-22 12:33:20 -0700447static void on_lb_policy_state_changed_locked(
448 grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
Craig Tiller5795da72015-09-17 15:27:13 -0700449 /* check if the notification is for a stale policy */
Craig Tillera82950e2015-09-22 12:33:20 -0700450 if (w->lb_policy != w->chand->lb_policy) return;
Craig Tiller5795da72015-09-17 15:27:13 -0700451
Craig Tillera82950e2015-09-22 12:33:20 -0700452 grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
453 "lb_changed");
454 if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
455 watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
456 }
Craig Tiller5795da72015-09-17 15:27:13 -0700457}
458
Craig Tillera82950e2015-09-22 12:33:20 -0700459static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
460 int iomgr_success) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700461 lb_policy_connectivity_watcher *w = arg;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700462
Craig Tillera82950e2015-09-22 12:33:20 -0700463 gpr_mu_lock(&w->chand->mu_config);
464 on_lb_policy_state_changed_locked(exec_ctx, w);
465 gpr_mu_unlock(&w->chand->mu_config);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700466
Craig Tillera82950e2015-09-22 12:33:20 -0700467 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
468 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700469}
470
Craig Tillera82950e2015-09-22 12:33:20 -0700471static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
472 grpc_lb_policy *lb_policy,
473 grpc_connectivity_state current_state) {
474 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
475 GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700476
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700477 w->chand = chand;
Craig Tillera82950e2015-09-22 12:33:20 -0700478 grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700479 w->state = current_state;
480 w->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700481 grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
482 &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700483}
484
Craig Tillera82950e2015-09-22 12:33:20 -0700485static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
486 int iomgr_success) {
Craig Tiller3f475422015-06-25 10:43:05 -0700487 channel_data *chand = arg;
488 grpc_lb_policy *lb_policy = NULL;
489 grpc_lb_policy *old_lb_policy;
490 grpc_resolver *old_resolver;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700491 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700492 int exit_idle = 0;
Craig Tiller3f475422015-06-25 10:43:05 -0700493
Craig Tillera82950e2015-09-22 12:33:20 -0700494 if (chand->incoming_configuration != NULL) {
495 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
496 if (lb_policy != NULL) {
497 GRPC_LB_POLICY_REF(lb_policy, "channel");
498 GRPC_LB_POLICY_REF(lb_policy, "config_change");
499 state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
Craig Tiller45724b32015-09-22 10:42:19 -0700500 }
Craig Tiller3f475422015-06-25 10:43:05 -0700501
Craig Tillera82950e2015-09-22 12:33:20 -0700502 grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
503 }
504
Craig Tiller3f475422015-06-25 10:43:05 -0700505 chand->incoming_configuration = NULL;
506
Craig Tillera82950e2015-09-22 12:33:20 -0700507 gpr_mu_lock(&chand->mu_config);
Craig Tiller3f475422015-06-25 10:43:05 -0700508 old_lb_policy = chand->lb_policy;
509 chand->lb_policy = lb_policy;
Craig Tillera82950e2015-09-22 12:33:20 -0700510 if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
511 grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
512 }
513 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
514 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
515 exit_idle = 1;
516 chand->exit_idle_when_lb_policy_arrives = 0;
517 }
Craig Tiller98465032015-06-29 14:36:42 -0700518
Craig Tillera82950e2015-09-22 12:33:20 -0700519 if (iomgr_success && chand->resolver) {
520 grpc_resolver *resolver = chand->resolver;
521 GRPC_RESOLVER_REF(resolver, "channel-next");
522 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
523 "new_lb+resolver");
524 if (lb_policy != NULL) {
525 watch_lb_policy(exec_ctx, chand, lb_policy, state);
Craig Tiller45724b32015-09-22 10:42:19 -0700526 }
Craig Tillera82950e2015-09-22 12:33:20 -0700527 gpr_mu_unlock(&chand->mu_config);
528 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
529 grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
530 &chand->on_config_changed);
531 GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
532 } else {
533 old_resolver = chand->resolver;
534 chand->resolver = NULL;
535 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
536 GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
537 gpr_mu_unlock(&chand->mu_config);
538 if (old_resolver != NULL) {
539 grpc_resolver_shutdown(exec_ctx, old_resolver);
540 GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
Craig Tiller45724b32015-09-22 10:42:19 -0700541 }
Craig Tillera82950e2015-09-22 12:33:20 -0700542 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700543
Craig Tillera82950e2015-09-22 12:33:20 -0700544 if (exit_idle) {
545 grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
546 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
547 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700548
Craig Tillera82950e2015-09-22 12:33:20 -0700549 if (old_lb_policy != NULL) {
550 grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
551 GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
552 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700553
Craig Tillera82950e2015-09-22 12:33:20 -0700554 if (lb_policy != NULL) {
555 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
556 }
Craig Tiller45724b32015-09-22 10:42:19 -0700557
Craig Tillera82950e2015-09-22 12:33:20 -0700558 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
Craig Tiller3f475422015-06-25 10:43:05 -0700559}
560
Craig Tillera82950e2015-09-22 12:33:20 -0700561static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
562 grpc_channel_element *elem,
563 grpc_transport_op *op) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700564 grpc_lb_policy *lb_policy = NULL;
565 channel_data *chand = elem->channel_data;
Craig Tiller98465032015-06-29 14:36:42 -0700566 grpc_resolver *destroy_resolver = NULL;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700567
Craig Tillera82950e2015-09-22 12:33:20 -0700568 grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700569
Craig Tillera82950e2015-09-22 12:33:20 -0700570 GPR_ASSERT(op->set_accept_stream == NULL);
571 GPR_ASSERT(op->bind_pollset == NULL);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700572
Craig Tillera82950e2015-09-22 12:33:20 -0700573 gpr_mu_lock(&chand->mu_config);
574 if (op->on_connectivity_state_change != NULL) {
575 grpc_connectivity_state_notify_on_state_change(
576 exec_ctx, &chand->state_tracker, op->connectivity_state,
577 op->on_connectivity_state_change);
578 op->on_connectivity_state_change = NULL;
579 op->connectivity_state = NULL;
580 }
581
582 if (!is_empty(op, sizeof(*op))) {
583 lb_policy = chand->lb_policy;
584 if (lb_policy) {
585 GRPC_LB_POLICY_REF(lb_policy, "broadcast");
Craig Tiller03dc6552015-07-17 23:12:34 -0700586 }
Craig Tillera82950e2015-09-22 12:33:20 -0700587 }
Craig Tiller03dc6552015-07-17 23:12:34 -0700588
Craig Tillera82950e2015-09-22 12:33:20 -0700589 if (op->disconnect && chand->resolver != NULL) {
590 grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
591 GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
592 destroy_resolver = chand->resolver;
593 chand->resolver = NULL;
594 if (chand->lb_policy != NULL) {
595 grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
596 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
597 chand->lb_policy = NULL;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700598 }
Craig Tillera82950e2015-09-22 12:33:20 -0700599 }
600 gpr_mu_unlock(&chand->mu_config);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700601
Craig Tillera82950e2015-09-22 12:33:20 -0700602 if (destroy_resolver) {
603 grpc_resolver_shutdown(exec_ctx, destroy_resolver);
604 GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
605 }
Craig Tiller98465032015-06-29 14:36:42 -0700606
Craig Tillera82950e2015-09-22 12:33:20 -0700607 if (lb_policy) {
608 grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
609 GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
610 }
Craig Tillerca3e9d32015-06-27 18:37:27 -0700611}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800612
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800613/* Constructor for call_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700614static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
615 const void *server_transport_data,
616 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617 call_data *calld = elem->call_data;
618
Craig Tiller50d9db52015-04-23 10:52:14 -0700619 /* TODO(ctiller): is there something useful we can do here? */
Craig Tillera82950e2015-09-22 12:33:20 -0700620 GPR_ASSERT(initial_op == NULL);
Craig Tiller50d9db52015-04-23 10:52:14 -0700621
Craig Tillera82950e2015-09-22 12:33:20 -0700622 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
623 GPR_ASSERT(server_transport_data == NULL);
624 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800625 calld->elem = elem;
626 calld->state = CALL_CREATED;
Craig Tillera82950e2015-09-22 12:33:20 -0700627 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800628}
629
630/* Destructor for call_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700631static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
632 grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800633 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700634 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800635
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800636 /* if the call got activated, we need to destroy the child stack also, and
637 remove it from the in-flight requests tracked by the child_entry we
638 picked */
Craig Tillera82950e2015-09-22 12:33:20 -0700639 gpr_mu_lock(&calld->mu_state);
640 switch (calld->state) {
Craig Tillerf93fd052015-06-02 08:15:33 -0700641 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700642 subchannel_call = calld->subchannel_call;
Craig Tillera82950e2015-09-22 12:33:20 -0700643 gpr_mu_unlock(&calld->mu_state);
644 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
Craig Tillerf93fd052015-06-02 08:15:33 -0700645 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700646 case CALL_CREATED:
647 case CALL_CANCELLED:
Craig Tillera82950e2015-09-22 12:33:20 -0700648 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700649 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700650 case CALL_WAITING_FOR_PICK:
651 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700652 case CALL_WAITING_FOR_CALL:
Craig Tiller5d44c062015-07-01 08:55:28 -0700653 case CALL_WAITING_FOR_SEND:
Craig Tillera82950e2015-09-22 12:33:20 -0700654 gpr_log(GPR_ERROR, "should never reach here");
655 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700656 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700657 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800658}
659
660/* Constructor for channel_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700661static void init_channel_elem(grpc_exec_ctx *exec_ctx,
662 grpc_channel_element *elem, grpc_channel *master,
663 const grpc_channel_args *args,
664 grpc_mdctx *metadata_context, int is_first,
665 int is_last) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800666 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800667
Craig Tillera82950e2015-09-22 12:33:20 -0700668 memset(chand, 0, sizeof(*chand));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700669
Craig Tillera82950e2015-09-22 12:33:20 -0700670 GPR_ASSERT(is_last);
671 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800672
Craig Tillera82950e2015-09-22 12:33:20 -0700673 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700674 chand->mdctx = metadata_context;
Craig Tiller98465032015-06-29 14:36:42 -0700675 chand->master = master;
Craig Tillera82950e2015-09-22 12:33:20 -0700676 grpc_pollset_set_init(&chand->pollset_set);
677 grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
Craig Tiller98465032015-06-29 14:36:42 -0700678
Craig Tillera82950e2015-09-22 12:33:20 -0700679 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
680 "client_channel");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800681}
682
683/* Destructor for channel_data */
Craig Tillera82950e2015-09-22 12:33:20 -0700684static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
685 grpc_channel_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800686 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800687
Craig Tillera82950e2015-09-22 12:33:20 -0700688 if (chand->resolver != NULL) {
689 grpc_resolver_shutdown(exec_ctx, chand->resolver);
690 GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
691 }
692 if (chand->lb_policy != NULL) {
693 GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
694 }
695 grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
696 grpc_pollset_set_destroy(&chand->pollset_set);
697 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800698}
699
700const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700701 cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
702 init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
703 destroy_channel_elem, cc_get_peer, "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700704};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800705
Craig Tillera82950e2015-09-22 12:33:20 -0700706void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
707 grpc_channel_stack *channel_stack,
708 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800709 /* post construction initialization: set the transport setup pointer */
Craig Tillera82950e2015-09-22 12:33:20 -0700710 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800711 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700712 gpr_mu_lock(&chand->mu_config);
713 GPR_ASSERT(!chand->resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700714 chand->resolver = resolver;
Craig Tillera82950e2015-09-22 12:33:20 -0700715 GRPC_RESOLVER_REF(resolver, "channel");
716 if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
717 chand->exit_idle_when_lb_policy_arrives) {
718 chand->started_resolving = 1;
719 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
720 grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
721 &chand->on_config_changed);
722 }
723 gpr_mu_unlock(&chand->mu_config);
Craig Tiller190d3602015-02-18 09:23:38 -0800724}
Craig Tiller48cb07c2015-07-15 16:16:15 -0700725
Craig Tillera82950e2015-09-22 12:33:20 -0700726grpc_connectivity_state grpc_client_channel_check_connectivity_state(
727 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700728 channel_data *chand = elem->channel_data;
729 grpc_connectivity_state out;
Craig Tillera82950e2015-09-22 12:33:20 -0700730 gpr_mu_lock(&chand->mu_config);
731 out = grpc_connectivity_state_check(&chand->state_tracker);
732 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
733 if (chand->lb_policy != NULL) {
734 grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
735 } else {
736 chand->exit_idle_when_lb_policy_arrives = 1;
737 if (!chand->started_resolving && chand->resolver != NULL) {
738 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
739 chand->started_resolving = 1;
740 grpc_resolver_next(exec_ctx, chand->resolver,
741 &chand->incoming_configuration,
742 &chand->on_config_changed);
743 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700744 }
Craig Tillera82950e2015-09-22 12:33:20 -0700745 }
746 gpr_mu_unlock(&chand->mu_config);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700747 return out;
748}
749
Craig Tillera82950e2015-09-22 12:33:20 -0700750void grpc_client_channel_watch_connectivity_state(
751 grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
752 grpc_connectivity_state *state, grpc_closure *on_complete) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700753 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700754 gpr_mu_lock(&chand->mu_config);
755 grpc_connectivity_state_notify_on_state_change(
756 exec_ctx, &chand->state_tracker, state, on_complete);
757 gpr_mu_unlock(&chand->mu_config);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700758}
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700759
Craig Tillera82950e2015-09-22 12:33:20 -0700760grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
761 grpc_channel_element *elem) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700762 channel_data *chand = elem->channel_data;
763 return &chand->pollset_set;
764}
765
Craig Tillera82950e2015-09-22 12:33:20 -0700766void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
767 grpc_channel_element *elem,
768 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700769 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700770 grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700771}
772
Craig Tillera82950e2015-09-22 12:33:20 -0700773void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
774 grpc_channel_element *elem,
775 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700776 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700777 grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700778}