blob: a2346503d901770fd143c5a99aa002ebaa2eac5d [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include "src/core/channel/channel_args.h"
40#include "src/core/channel/connected_channel.h"
Craig Tiller98465032015-06-29 14:36:42 -070041#include "src/core/surface/channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070044#include "src/core/transport/connectivity_state.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc/support/sync.h>
48#include <grpc/support/useful.h>
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050/* Client channel implementation */
51
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052typedef struct call_data call_data;
53
54typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070055 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070056 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070057 /** resolver for this channel */
58 grpc_resolver *resolver;
Craig Tiller20a3c352015-08-05 08:39:50 -070059 /** have we started resolving this channel */
60 int started_resolving;
Craig Tiller50bc6092015-07-01 14:25:19 -070061 /** master channel - the grpc_channel instance that ultimately owns
62 this channel_data via its channel stack.
63 We occasionally use this to bump the refcount on the master channel
64 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070065 grpc_channel *master;
Craig Tillerf5f17122015-06-25 08:47:26 -070066
Craig Tiller9d94b602015-07-01 14:23:18 -070067 /** mutex protecting client configuration, including all
68 variables below in this data structure */
Craig Tillerf5f17122015-06-25 08:47:26 -070069 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070070 /** currently active load balancer - guarded by mu_config */
71 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070072 /** incoming configuration - set by resolver.next
73 guarded by mu_config */
74 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070075 /** a list of closures that are all waiting for config to come in */
Craig Tiller000cd8f2015-09-18 07:20:29 -070076 grpc_iomgr_call_list waiting_for_config_closures;
Craig Tiller3f475422015-06-25 10:43:05 -070077 /** resolver callback */
78 grpc_iomgr_closure on_config_changed;
79 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070080 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -070081 /** when an lb_policy arrives, should we try to exit idle */
82 int exit_idle_when_lb_policy_arrives;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070083 /** pollset_set of interested parties in a new connection */
84 grpc_pollset_set pollset_set;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080085} channel_data;
86
Craig Tillerd6c98df2015-08-18 09:33:44 -070087/** We create one watcher for each new lb_policy that is returned from a
88 resolver,
89 to watch for state changes from the lb_policy. When a state change is seen,
90 we
Craig Tiller1ada6ad2015-07-16 16:19:14 -070091 update the channel, and create a new watcher */
92typedef struct {
93 channel_data *chand;
94 grpc_iomgr_closure on_changed;
95 grpc_connectivity_state state;
96 grpc_lb_policy *lb_policy;
97} lb_policy_connectivity_watcher;
98
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099typedef enum {
100 CALL_CREATED,
Craig Tiller5d44c062015-07-01 08:55:28 -0700101 CALL_WAITING_FOR_SEND,
Craig Tiller3f475422015-06-25 10:43:05 -0700102 CALL_WAITING_FOR_CONFIG,
103 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700104 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800105 CALL_ACTIVE,
106 CALL_CANCELLED
107} call_state;
108
109struct call_data {
110 /* owning element */
111 grpc_call_element *elem;
112
Craig Tillerf5f17122015-06-25 08:47:26 -0700113 gpr_mu mu_state;
114
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800116 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700117 grpc_subchannel *picked_channel;
118 grpc_iomgr_closure async_setup_task;
119 grpc_transport_stream_op waiting_op;
120 /* our child call stack */
121 grpc_subchannel_call *subchannel_call;
122 grpc_linked_mdelem status;
123 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800124};
125
Craig Tillerb3671532015-07-01 10:37:40 -0700126static grpc_iomgr_closure *merge_into_waiting_op(
127 grpc_call_element *elem,
128 grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
Craig Tiller5d44c062015-07-01 08:55:28 -0700129
Craig Tiller1a727fd2015-04-24 13:21:22 -0700130static void handle_op_after_cancellation(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700131 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700132 call_data *calld = elem->call_data;
133 channel_data *chand = elem->channel_data;
134 if (op->send_ops) {
Yang Gaodbf8fdc2015-05-28 00:52:31 -0700135 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700136 op->on_done_send->cb(op->on_done_send->cb_arg, 0);
Craig Tillere70413c2015-04-24 10:12:34 -0700137 }
138 if (op->recv_ops) {
139 char status[GPR_LTOA_MIN_BUFSIZE];
140 grpc_metadata_batch mdb;
141 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700142 calld->status.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700143 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700144 calld->details.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700145 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700146 calld->status.prev = calld->details.next = NULL;
147 calld->status.next = &calld->details;
148 calld->details.prev = &calld->status;
149 mdb.list.head = &calld->status;
150 mdb.list.tail = &calld->details;
Craig Tillere70413c2015-04-24 10:12:34 -0700151 mdb.garbage.head = mdb.garbage.tail = NULL;
Craig Tiller143e7bf2015-07-13 08:41:49 -0700152 mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Craig Tillere70413c2015-04-24 10:12:34 -0700153 grpc_sopb_add_metadata(op->recv_ops, mdb);
154 *op->recv_state = GRPC_STREAM_CLOSED;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700155 op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
Craig Tillere70413c2015-04-24 10:12:34 -0700156 }
Craig Tiller5dde66e2015-06-02 09:05:23 -0700157 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700158 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700159 }
Craig Tillere70413c2015-04-24 10:12:34 -0700160}
Craig Tiller8b976d02015-02-05 21:41:23 -0800161
Craig Tillereb3b12e2015-06-26 14:42:49 -0700162typedef struct {
163 grpc_iomgr_closure closure;
164 grpc_call_element *elem;
165} waiting_call;
166
Craig Tiller079a11b2015-06-30 10:07:15 -0700167static void perform_transport_stream_op(grpc_call_element *elem,
168 grpc_transport_stream_op *op,
169 int continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700170
171static void continue_with_pick(void *arg, int iomgr_success) {
172 waiting_call *wc = arg;
173 call_data *calld = wc->elem->call_data;
174 perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
175 gpr_free(wc);
176}
177
Craig Tiller079a11b2015-06-30 10:07:15 -0700178static void add_to_lb_policy_wait_queue_locked_state_config(
179 grpc_call_element *elem) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700180 channel_data *chand = elem->channel_data;
181 waiting_call *wc = gpr_malloc(sizeof(*wc));
182 grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
183 wc->elem = elem;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700184 grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
185 1);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700186}
187
188static int is_empty(void *p, int len) {
189 char *ptr = p;
190 int i;
191 for (i = 0; i < len; i++) {
192 if (ptr[i] != 0) return 0;
193 }
194 return 1;
195}
196
197static void started_call(void *arg, int iomgr_success) {
198 call_data *calld = arg;
199 grpc_transport_stream_op op;
200 int have_waiting;
201
202 gpr_mu_lock(&calld->mu_state);
203 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
204 memset(&op, 0, sizeof(op));
205 op.cancel_with_status = GRPC_STATUS_CANCELLED;
206 gpr_mu_unlock(&calld->mu_state);
207 grpc_subchannel_call_process_op(calld->subchannel_call, &op);
208 } else if (calld->state == CALL_WAITING_FOR_CALL) {
209 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
210 if (calld->subchannel_call != NULL) {
211 calld->state = CALL_ACTIVE;
212 gpr_mu_unlock(&calld->mu_state);
213 if (have_waiting) {
Craig Tiller079a11b2015-06-30 10:07:15 -0700214 grpc_subchannel_call_process_op(calld->subchannel_call,
215 &calld->waiting_op);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700216 }
217 } else {
218 calld->state = CALL_CANCELLED;
219 gpr_mu_unlock(&calld->mu_state);
220 if (have_waiting) {
221 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
222 }
223 }
224 } else {
225 GPR_ASSERT(calld->state == CALL_CANCELLED);
Craig Tiller62b14732015-07-01 11:28:41 -0700226 gpr_mu_unlock(&calld->mu_state);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700227 }
228}
229
230static void picked_target(void *arg, int iomgr_success) {
231 call_data *calld = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700232 grpc_pollset *pollset;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700233 grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700234
235 if (calld->picked_channel == NULL) {
236 /* treat this like a cancellation */
237 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
238 perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
239 } else {
240 gpr_mu_lock(&calld->mu_state);
241 if (calld->state == CALL_CANCELLED) {
242 gpr_mu_unlock(&calld->mu_state);
243 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
244 } else {
245 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
246 calld->state = CALL_WAITING_FOR_CALL;
Craig Tillerabf36382015-06-29 16:13:27 -0700247 pollset = calld->waiting_op.bind_pollset;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700248 gpr_mu_unlock(&calld->mu_state);
249 grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
Craig Tillerabf36382015-06-29 16:13:27 -0700250 grpc_subchannel_create_call(calld->picked_channel, pollset,
Craig Tiller5f84c842015-06-26 16:08:21 -0700251 &calld->subchannel_call,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700252 &calld->async_setup_task, &call_list);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700253 }
254 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700255 grpc_iomgr_call_list_run(call_list);
Craig Tillerf5f17122015-06-25 08:47:26 -0700256}
257
Craig Tillerb3671532015-07-01 10:37:40 -0700258static grpc_iomgr_closure *merge_into_waiting_op(
259 grpc_call_element *elem, grpc_transport_stream_op *new_op) {
Craig Tillerbb32ba32015-06-30 09:31:48 -0700260 call_data *calld = elem->call_data;
Craig Tiller5d44c062015-07-01 08:55:28 -0700261 grpc_iomgr_closure *consumed_op = NULL;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700262 grpc_transport_stream_op *waiting_op = &calld->waiting_op;
Craig Tiller2e9dc502015-07-01 11:31:07 -0700263 GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
264 GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
Craig Tillerbb32ba32015-06-30 09:31:48 -0700265 if (new_op->send_ops != NULL) {
266 waiting_op->send_ops = new_op->send_ops;
267 waiting_op->is_last_send = new_op->is_last_send;
268 waiting_op->on_done_send = new_op->on_done_send;
269 }
270 if (new_op->recv_ops != NULL) {
271 waiting_op->recv_ops = new_op->recv_ops;
272 waiting_op->recv_state = new_op->recv_state;
273 waiting_op->on_done_recv = new_op->on_done_recv;
274 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700275 if (new_op->on_consumed != NULL) {
276 if (waiting_op->on_consumed != NULL) {
277 consumed_op = waiting_op->on_consumed;
278 }
Craig Tillerbb32ba32015-06-30 09:31:48 -0700279 waiting_op->on_consumed = new_op->on_consumed;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700280 }
281 if (new_op->cancel_with_status != GRPC_STATUS_OK) {
282 waiting_op->cancel_with_status = new_op->cancel_with_status;
283 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700284 return consumed_op;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700285}
286
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700287static char *cc_get_peer(grpc_call_element *elem) {
288 call_data *calld = elem->call_data;
289 channel_data *chand = elem->channel_data;
290 grpc_subchannel_call *subchannel_call;
291 char *result;
292
293 gpr_mu_lock(&calld->mu_state);
294 if (calld->state == CALL_ACTIVE) {
295 subchannel_call = calld->subchannel_call;
296 GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
297 gpr_mu_unlock(&calld->mu_state);
298 result = grpc_subchannel_call_get_peer(subchannel_call);
299 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer");
300 return result;
301 } else {
302 gpr_mu_unlock(&calld->mu_state);
303 return grpc_channel_get_target(chand->master);
304 }
305}
306
Craig Tiller079a11b2015-06-30 10:07:15 -0700307static void perform_transport_stream_op(grpc_call_element *elem,
308 grpc_transport_stream_op *op,
309 int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800310 call_data *calld = elem->call_data;
311 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700312 grpc_subchannel_call *subchannel_call;
313 grpc_lb_policy *lb_policy;
Craig Tiller49924e02015-06-29 22:42:33 -0700314 grpc_transport_stream_op op2;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700315 grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
Craig Tillera9407522015-04-24 10:37:57 -0700316 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
317 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800318
Craig Tillerf5f17122015-06-25 08:47:26 -0700319 gpr_mu_lock(&calld->mu_state);
320 switch (calld->state) {
321 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700322 GPR_ASSERT(!continuation);
323 subchannel_call = calld->subchannel_call;
Craig Tillerf5f17122015-06-25 08:47:26 -0700324 gpr_mu_unlock(&calld->mu_state);
325 grpc_subchannel_call_process_op(subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700326 break;
327 case CALL_CANCELLED:
328 gpr_mu_unlock(&calld->mu_state);
329 handle_op_after_cancellation(elem, op);
330 break;
Craig Tiller5d44c062015-07-01 08:55:28 -0700331 case CALL_WAITING_FOR_SEND:
332 GPR_ASSERT(!continuation);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700333 grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
Craig Tillerb3671532015-07-01 10:37:40 -0700334 if (!calld->waiting_op.send_ops &&
335 calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
Craig Tiller5d44c062015-07-01 08:55:28 -0700336 gpr_mu_unlock(&calld->mu_state);
337 break;
338 }
339 *op = calld->waiting_op;
340 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
341 continuation = 1;
Craig Tillerb3671532015-07-01 10:37:40 -0700342 /* fall through */
Craig Tillereb3b12e2015-06-26 14:42:49 -0700343 case CALL_WAITING_FOR_CONFIG:
344 case CALL_WAITING_FOR_PICK:
345 case CALL_WAITING_FOR_CALL:
346 if (!continuation) {
347 if (op->cancel_with_status != GRPC_STATUS_OK) {
348 calld->state = CALL_CANCELLED;
Craig Tiller49924e02015-06-29 22:42:33 -0700349 op2 = calld->waiting_op;
350 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
Craig Tillerbb32ba32015-06-30 09:31:48 -0700351 if (op->on_consumed) {
352 calld->waiting_op.on_consumed = op->on_consumed;
353 op->on_consumed = NULL;
354 } else if (op2.on_consumed) {
355 calld->waiting_op.on_consumed = op2.on_consumed;
356 op2.on_consumed = NULL;
357 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700358 gpr_mu_unlock(&calld->mu_state);
359 handle_op_after_cancellation(elem, op);
Craig Tiller49924e02015-06-29 22:42:33 -0700360 handle_op_after_cancellation(elem, &op2);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700361 } else {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700362 grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
363 1);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700364 gpr_mu_unlock(&calld->mu_state);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700365 }
366 break;
367 }
Craig Tiller079a11b2015-06-30 10:07:15 -0700368 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700369 case CALL_CREATED:
370 if (op->cancel_with_status != GRPC_STATUS_OK) {
371 calld->state = CALL_CANCELLED;
372 gpr_mu_unlock(&calld->mu_state);
373 handle_op_after_cancellation(elem, op);
374 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700375 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700376
Craig Tiller5d44c062015-07-01 08:55:28 -0700377 if (op->send_ops == NULL) {
378 /* need to have some send ops before we can select the
379 lb target */
380 calld->state = CALL_WAITING_FOR_SEND;
Craig Tillerf5f17122015-06-25 08:47:26 -0700381 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf5f17122015-06-25 08:47:26 -0700382 } else {
Craig Tiller5d44c062015-07-01 08:55:28 -0700383 gpr_mu_lock(&chand->mu_config);
384 lb_policy = chand->lb_policy;
385 if (lb_policy) {
Craig Tiller990f6422015-07-20 22:07:13 -0700386 grpc_transport_stream_op *op = &calld->waiting_op;
387 grpc_pollset *bind_pollset = op->bind_pollset;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700388 grpc_metadata_batch *initial_metadata =
389 &op->send_ops->ops[0].data.metadata;
Craig Tiller5d44c062015-07-01 08:55:28 -0700390 GRPC_LB_POLICY_REF(lb_policy, "pick");
391 gpr_mu_unlock(&chand->mu_config);
392 calld->state = CALL_WAITING_FOR_PICK;
Craig Tiller3d578712015-07-20 22:00:24 -0700393
Craig Tiller990f6422015-07-20 22:07:13 -0700394 GPR_ASSERT(op->bind_pollset);
395 GPR_ASSERT(op->send_ops);
396 GPR_ASSERT(op->send_ops->nops >= 1);
Craig Tillerd6c98df2015-08-18 09:33:44 -0700397 GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
Craig Tiller5d44c062015-07-01 08:55:28 -0700398 gpr_mu_unlock(&calld->mu_state);
399
Craig Tillerd6c98df2015-08-18 09:33:44 -0700400 grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
401 calld);
Craig Tiller990f6422015-07-20 22:07:13 -0700402 grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
Craig Tillerd6c98df2015-08-18 09:33:44 -0700403 &calld->picked_channel,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700404 &calld->async_setup_task, &call_list);
Craig Tiller5d44c062015-07-01 08:55:28 -0700405
406 GRPC_LB_POLICY_UNREF(lb_policy, "pick");
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700407 } else if (chand->resolver != NULL) {
Craig Tiller5d44c062015-07-01 08:55:28 -0700408 calld->state = CALL_WAITING_FOR_CONFIG;
409 add_to_lb_policy_wait_queue_locked_state_config(elem);
Craig Tiller20a3c352015-08-05 08:39:50 -0700410 if (!chand->started_resolving && chand->resolver != NULL) {
Craig Tiller6659d8b2015-08-06 18:02:22 -0700411 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
Craig Tiller20a3c352015-08-05 08:39:50 -0700412 chand->started_resolving = 1;
413 grpc_resolver_next(chand->resolver,
414 &chand->incoming_configuration,
415 &chand->on_config_changed);
416 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700417 gpr_mu_unlock(&chand->mu_config);
418 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700419 } else {
420 calld->state = CALL_CANCELLED;
421 gpr_mu_unlock(&chand->mu_config);
422 gpr_mu_unlock(&calld->mu_state);
423 handle_op_after_cancellation(elem, op);
Craig Tiller5d44c062015-07-01 08:55:28 -0700424 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700425 }
426 }
427 break;
Craig Tillerf5f17122015-06-25 08:47:26 -0700428 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700429
Craig Tiller000cd8f2015-09-18 07:20:29 -0700430 grpc_iomgr_call_list_run(call_list);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700431}
Craig Tillerf5f17122015-06-25 08:47:26 -0700432
Craig Tillereb3b12e2015-06-26 14:42:49 -0700433static void cc_start_transport_stream_op(grpc_call_element *elem,
Craig Tiller079a11b2015-06-30 10:07:15 -0700434 grpc_transport_stream_op *op) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700435 perform_transport_stream_op(elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436}
437
Craig Tillerd6c98df2015-08-18 09:33:44 -0700438static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700439 grpc_connectivity_state current_state,
440 grpc_iomgr_call_list *cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700441
Craig Tiller000cd8f2015-09-18 07:20:29 -0700442static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
443 grpc_iomgr_call_list *cl) {
Craig Tiller5795da72015-09-17 15:27:13 -0700444 /* check if the notification is for a stale policy */
445 if (w->lb_policy != w->chand->lb_policy) return;
446
Craig Tiller000cd8f2015-09-18 07:20:29 -0700447 grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
448 cl);
Craig Tiller5795da72015-09-17 15:27:13 -0700449 if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700450 watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
Craig Tiller5795da72015-09-17 15:27:13 -0700451 }
452}
453
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700454static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
455 lb_policy_connectivity_watcher *w = arg;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700456 grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700457
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700458 gpr_mu_lock(&w->chand->mu_config);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700459 on_lb_policy_state_changed_locked(w, &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700460 gpr_mu_unlock(&w->chand->mu_config);
461
Craig Tiller000cd8f2015-09-18 07:20:29 -0700462 grpc_iomgr_call_list_run(cl);
Craig Tiller5795da72015-09-17 15:27:13 -0700463
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700464 GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
465 gpr_free(w);
466}
467
Craig Tillerd6c98df2015-08-18 09:33:44 -0700468static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700469 grpc_connectivity_state current_state,
470 grpc_iomgr_call_list *call_list) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700471 lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
472 GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
473
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700474 w->chand = chand;
475 grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
476 w->state = current_state;
477 w->lb_policy = lb_policy;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700478 grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
479 call_list);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700480}
481
Craig Tiller3f475422015-06-25 10:43:05 -0700482static void cc_on_config_changed(void *arg, int iomgr_success) {
483 channel_data *chand = arg;
484 grpc_lb_policy *lb_policy = NULL;
485 grpc_lb_policy *old_lb_policy;
486 grpc_resolver *old_resolver;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700487 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700488 grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700489 int exit_idle = 0;
Craig Tiller3f475422015-06-25 10:43:05 -0700490
Craig Tillerd7b68e72015-06-28 11:41:09 -0700491 if (chand->incoming_configuration != NULL) {
Craig Tiller3f475422015-06-25 10:43:05 -0700492 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700493 if (lb_policy != NULL) {
494 GRPC_LB_POLICY_REF(lb_policy, "channel");
495 GRPC_LB_POLICY_REF(lb_policy, "config_change");
Craig Tiller000cd8f2015-09-18 07:20:29 -0700496 state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700497 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700498
499 grpc_client_config_unref(chand->incoming_configuration);
Craig Tiller3f475422015-06-25 10:43:05 -0700500 }
501
Craig Tiller3f475422015-06-25 10:43:05 -0700502 chand->incoming_configuration = NULL;
503
504 gpr_mu_lock(&chand->mu_config);
505 old_lb_policy = chand->lb_policy;
506 chand->lb_policy = lb_policy;
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700507 if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700508 grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
Craig Tiller3f475422015-06-25 10:43:05 -0700509 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700510 if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
511 GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
512 exit_idle = 1;
513 chand->exit_idle_when_lb_policy_arrives = 0;
514 }
Craig Tiller3f475422015-06-25 10:43:05 -0700515
Craig Tiller98465032015-06-29 14:36:42 -0700516 if (iomgr_success && chand->resolver) {
517 grpc_resolver *resolver = chand->resolver;
518 GRPC_RESOLVER_REF(resolver, "channel-next");
Craig Tiller000cd8f2015-09-18 07:20:29 -0700519 grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
520 &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700521 if (lb_policy != NULL) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700522 watch_lb_policy(chand, lb_policy, state, &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700523 }
Craig Tiller5795da72015-09-17 15:27:13 -0700524 gpr_mu_unlock(&chand->mu_config);
525 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
Craig Tiller5795da72015-09-17 15:27:13 -0700526 grpc_resolver_next(resolver, &chand->incoming_configuration,
527 &chand->on_config_changed);
528 GRPC_RESOLVER_UNREF(resolver, "channel-next");
Craig Tiller98465032015-06-29 14:36:42 -0700529 } else {
530 old_resolver = chand->resolver;
531 chand->resolver = NULL;
Craig Tiller079a11b2015-06-30 10:07:15 -0700532 grpc_connectivity_state_set(&chand->state_tracker,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700533 GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
534 &cl);
Craig Tiller98465032015-06-29 14:36:42 -0700535 gpr_mu_unlock(&chand->mu_config);
536 if (old_resolver != NULL) {
537 grpc_resolver_shutdown(old_resolver);
538 GRPC_RESOLVER_UNREF(old_resolver, "channel");
539 }
540 }
541
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700542 if (exit_idle) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700543 grpc_lb_policy_exit_idle(lb_policy, &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700544 GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
545 }
546
547 if (old_lb_policy != NULL) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700548 grpc_lb_policy_shutdown(old_lb_policy, &cl);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700549 GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
550 }
551
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700552 if (lb_policy != NULL) {
553 GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
554 }
Craig Tiller000cd8f2015-09-18 07:20:29 -0700555
556 grpc_iomgr_call_list_run(cl);
Craig Tiller98465032015-06-29 14:36:42 -0700557 GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
Craig Tiller3f475422015-06-25 10:43:05 -0700558}
559
Craig Tiller079a11b2015-06-30 10:07:15 -0700560static void cc_start_transport_op(grpc_channel_element *elem,
561 grpc_transport_op *op) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700562 grpc_lb_policy *lb_policy = NULL;
563 channel_data *chand = elem->channel_data;
Craig Tiller98465032015-06-29 14:36:42 -0700564 grpc_resolver *destroy_resolver = NULL;
Craig Tiller000cd8f2015-09-18 07:20:29 -0700565 grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
566
567 if (op->on_consumed) {
568 grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
569 op->on_consumed = NULL;
570 }
Craig Tillerca3e9d32015-06-27 18:37:27 -0700571
572 GPR_ASSERT(op->set_accept_stream == NULL);
573 GPR_ASSERT(op->bind_pollset == NULL);
574
575 gpr_mu_lock(&chand->mu_config);
576 if (op->on_connectivity_state_change != NULL) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700577 grpc_connectivity_state_notify_on_state_change(
578 &chand->state_tracker, op->connectivity_state,
579 op->on_connectivity_state_change, &call_list);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700580 op->on_connectivity_state_change = NULL;
581 op->connectivity_state = NULL;
582 }
583
Craig Tiller03dc6552015-07-17 23:12:34 -0700584 if (!is_empty(op, sizeof(*op))) {
585 lb_policy = chand->lb_policy;
586 if (lb_policy) {
587 GRPC_LB_POLICY_REF(lb_policy, "broadcast");
588 }
589 }
590
Craig Tiller98465032015-06-29 14:36:42 -0700591 if (op->disconnect && chand->resolver != NULL) {
Craig Tiller079a11b2015-06-30 10:07:15 -0700592 grpc_connectivity_state_set(&chand->state_tracker,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700593 GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
594 &call_list);
Craig Tiller98465032015-06-29 14:36:42 -0700595 destroy_resolver = chand->resolver;
596 chand->resolver = NULL;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700597 if (chand->lb_policy != NULL) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700598 grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700599 GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
600 chand->lb_policy = NULL;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700601 }
Craig Tiller98465032015-06-29 14:36:42 -0700602 }
Craig Tillerca3e9d32015-06-27 18:37:27 -0700603 gpr_mu_unlock(&chand->mu_config);
604
Craig Tiller98465032015-06-29 14:36:42 -0700605 if (destroy_resolver) {
606 grpc_resolver_shutdown(destroy_resolver);
607 GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
608 }
609
Craig Tillerca3e9d32015-06-27 18:37:27 -0700610 if (lb_policy) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700611 grpc_lb_policy_broadcast(lb_policy, op, &call_list);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700612 GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
Craig Tillerca3e9d32015-06-27 18:37:27 -0700613 }
614
Craig Tiller000cd8f2015-09-18 07:20:29 -0700615 grpc_iomgr_call_list_run(call_list);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700616}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800618/* Constructor for call_data */
619static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700620 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700621 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622 call_data *calld = elem->call_data;
623
Craig Tiller50d9db52015-04-23 10:52:14 -0700624 /* TODO(ctiller): is there something useful we can do here? */
625 GPR_ASSERT(initial_op == NULL);
626
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
628 GPR_ASSERT(server_transport_data == NULL);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700629 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630 calld->elem = elem;
631 calld->state = CALL_CREATED;
Craig Tiller143e7bf2015-07-13 08:41:49 -0700632 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800633}
634
635/* Destructor for call_data */
636static void destroy_call_elem(grpc_call_element *elem) {
637 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700638 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800639
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800640 /* if the call got activated, we need to destroy the child stack also, and
641 remove it from the in-flight requests tracked by the child_entry we
642 picked */
Craig Tiller3f475422015-06-25 10:43:05 -0700643 gpr_mu_lock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700644 switch (calld->state) {
645 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700646 subchannel_call = calld->subchannel_call;
Craig Tiller3f475422015-06-25 10:43:05 -0700647 gpr_mu_unlock(&calld->mu_state);
Craig Tillerc3967532015-06-29 14:59:38 -0700648 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
Craig Tillerf93fd052015-06-02 08:15:33 -0700649 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700650 case CALL_CREATED:
651 case CALL_CANCELLED:
652 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700653 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700654 case CALL_WAITING_FOR_PICK:
655 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700656 case CALL_WAITING_FOR_CALL:
Craig Tiller5d44c062015-07-01 08:55:28 -0700657 case CALL_WAITING_FOR_SEND:
Craig Tiller3f475422015-06-25 10:43:05 -0700658 gpr_log(GPR_ERROR, "should never reach here");
659 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700660 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661 }
662}
663
664/* Constructor for channel_data */
Craig Tiller079a11b2015-06-30 10:07:15 -0700665static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800666 const grpc_channel_args *args,
667 grpc_mdctx *metadata_context, int is_first,
668 int is_last) {
669 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800670
Craig Tillereb3b12e2015-06-26 14:42:49 -0700671 memset(chand, 0, sizeof(*chand));
672
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800673 GPR_ASSERT(is_last);
674 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
675
Craig Tiller3f475422015-06-25 10:43:05 -0700676 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700677 chand->mdctx = metadata_context;
Craig Tiller98465032015-06-29 14:36:42 -0700678 chand->master = master;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700679 grpc_pollset_set_init(&chand->pollset_set);
Craig Tiller079a11b2015-06-30 10:07:15 -0700680 grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
681 chand);
Craig Tiller98465032015-06-29 14:36:42 -0700682
Craig Tiller47a708e2015-09-15 16:16:06 -0700683 grpc_connectivity_state_init(&chand->state_tracker,
Craig Tiller47a708e2015-09-15 16:16:06 -0700684 GRPC_CHANNEL_IDLE, "client_channel");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800685}
686
687/* Destructor for channel_data */
688static void destroy_channel_elem(grpc_channel_element *elem) {
689 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800690
Craig Tillerd7b68e72015-06-28 11:41:09 -0700691 if (chand->resolver != NULL) {
Craig Tiller98465032015-06-29 14:36:42 -0700692 grpc_resolver_shutdown(chand->resolver);
693 GRPC_RESOLVER_UNREF(chand->resolver, "channel");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800694 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700695 if (chand->lb_policy != NULL) {
696 GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
Craig Tiller3f475422015-06-25 10:43:05 -0700697 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700698 grpc_connectivity_state_destroy(&chand->state_tracker);
699 grpc_pollset_set_destroy(&chand->pollset_set);
Craig Tiller3f475422015-06-25 10:43:05 -0700700 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800701}
702
703const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller079a11b2015-06-30 10:07:15 -0700704 cc_start_transport_stream_op,
705 cc_start_transport_op,
706 sizeof(call_data),
707 init_call_elem,
708 destroy_call_elem,
709 sizeof(channel_data),
710 init_channel_elem,
711 destroy_channel_elem,
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700712 cc_get_peer,
Craig Tiller079a11b2015-06-30 10:07:15 -0700713 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700714};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800715
Craig Tillerf5f17122015-06-25 08:47:26 -0700716void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
717 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800718 /* post construction initialization: set the transport setup pointer */
719 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
720 channel_data *chand = elem->channel_data;
Craig Tiller20a3c352015-08-05 08:39:50 -0700721 gpr_mu_lock(&chand->mu_config);
Craig Tillerf5f17122015-06-25 08:47:26 -0700722 GPR_ASSERT(!chand->resolver);
723 chand->resolver = resolver;
Craig Tiller98465032015-06-29 14:36:42 -0700724 GRPC_RESOLVER_REF(resolver, "channel");
Craig Tiller000cd8f2015-09-18 07:20:29 -0700725 if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
Craig Tiller20a3c352015-08-05 08:39:50 -0700726 chand->exit_idle_when_lb_policy_arrives) {
727 chand->started_resolving = 1;
Craig Tiller6659d8b2015-08-06 18:02:22 -0700728 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
Craig Tiller20a3c352015-08-05 08:39:50 -0700729 grpc_resolver_next(resolver, &chand->incoming_configuration,
730 &chand->on_config_changed);
731 }
732 gpr_mu_unlock(&chand->mu_config);
Craig Tiller190d3602015-02-18 09:23:38 -0800733}
Craig Tiller48cb07c2015-07-15 16:16:15 -0700734
735grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Craig Tiller000cd8f2015-09-18 07:20:29 -0700736 grpc_channel_element *elem, int try_to_connect,
737 grpc_iomgr_call_list *call_list) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700738 channel_data *chand = elem->channel_data;
739 grpc_connectivity_state out;
740 gpr_mu_lock(&chand->mu_config);
741 out = grpc_connectivity_state_check(&chand->state_tracker);
742 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
743 if (chand->lb_policy != NULL) {
Craig Tiller000cd8f2015-09-18 07:20:29 -0700744 grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700745 } else {
746 chand->exit_idle_when_lb_policy_arrives = 1;
Craig Tiller20a3c352015-08-05 08:39:50 -0700747 if (!chand->started_resolving && chand->resolver != NULL) {
Craig Tiller6659d8b2015-08-06 18:02:22 -0700748 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
Craig Tiller20a3c352015-08-05 08:39:50 -0700749 chand->started_resolving = 1;
750 grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
751 &chand->on_config_changed);
752 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700753 }
754 }
755 gpr_mu_unlock(&chand->mu_config);
756 return out;
757}
758
759void grpc_client_channel_watch_connectivity_state(
760 grpc_channel_element *elem, grpc_connectivity_state *state,
Craig Tiller000cd8f2015-09-18 07:20:29 -0700761 grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700762 channel_data *chand = elem->channel_data;
763 gpr_mu_lock(&chand->mu_config);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700764 grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
765 on_complete, call_list);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700766 gpr_mu_unlock(&chand->mu_config);
767}
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700768
Craig Tillerd6c98df2015-08-18 09:33:44 -0700769grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
770 grpc_channel_element *elem) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700771 channel_data *chand = elem->channel_data;
772 return &chand->pollset_set;
773}
774
775void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
Craig Tillerd6c98df2015-08-18 09:33:44 -0700776 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700777 channel_data *chand = elem->channel_data;
778 grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
779}
780
781void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
Craig Tillerd6c98df2015-08-18 09:33:44 -0700782 grpc_pollset *pollset) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700783 channel_data *chand = elem->channel_data;
784 grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
785}