blob: ec6ca4288965c52d1342e97d2a9da189315d61a2 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include "src/core/channel/channel_args.h"
40#include "src/core/channel/connected_channel.h"
Craig Tiller98465032015-06-29 14:36:42 -070041#include "src/core/surface/channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080042#include "src/core/iomgr/iomgr.h"
Craig Tillerfe4ba362015-05-08 09:47:18 -070043#include "src/core/iomgr/pollset_set.h"
Craig Tiller485d7762015-01-23 12:54:05 -080044#include "src/core/support/string.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070045#include "src/core/transport/connectivity_state.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046#include <grpc/support/alloc.h>
47#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/sync.h>
49#include <grpc/support/useful.h>
50
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051/* Client channel implementation */
52
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053typedef struct call_data call_data;
54
55typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070056 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070057 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070058 /** resolver for this channel */
59 grpc_resolver *resolver;
Craig Tiller50bc6092015-07-01 14:25:19 -070060 /** master channel - the grpc_channel instance that ultimately owns
61 this channel_data via its channel stack.
62 We occasionally use this to bump the refcount on the master channel
63 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070064 grpc_channel *master;
Craig Tillerf5f17122015-06-25 08:47:26 -070065
Craig Tiller9d94b602015-07-01 14:23:18 -070066 /** mutex protecting client configuration, including all
67 variables below in this data structure */
Craig Tillerf5f17122015-06-25 08:47:26 -070068 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070069 /** currently active load balancer - guarded by mu_config */
70 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070071 /** incoming configuration - set by resolver.next
72 guarded by mu_config */
73 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070074 /** a list of closures that are all waiting for config to come in */
75 grpc_iomgr_closure *waiting_for_config_closures;
76 /** resolver callback */
77 grpc_iomgr_closure on_config_changed;
78 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070079 grpc_connectivity_state_tracker state_tracker;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080080} channel_data;
81
82typedef enum {
83 CALL_CREATED,
Craig Tiller5d44c062015-07-01 08:55:28 -070084 CALL_WAITING_FOR_SEND,
Craig Tiller3f475422015-06-25 10:43:05 -070085 CALL_WAITING_FOR_CONFIG,
86 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -070087 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 CALL_ACTIVE,
89 CALL_CANCELLED
90} call_state;
91
92struct call_data {
93 /* owning element */
94 grpc_call_element *elem;
95
Craig Tillerf5f17122015-06-25 08:47:26 -070096 gpr_mu mu_state;
97
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080098 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100 grpc_subchannel *picked_channel;
101 grpc_iomgr_closure async_setup_task;
102 grpc_transport_stream_op waiting_op;
103 /* our child call stack */
104 grpc_subchannel_call *subchannel_call;
105 grpc_linked_mdelem status;
106 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800107};
108
Craig Tillerb3671532015-07-01 10:37:40 -0700109static grpc_iomgr_closure *merge_into_waiting_op(
110 grpc_call_element *elem,
111 grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
Craig Tiller5d44c062015-07-01 08:55:28 -0700112
Craig Tiller1a727fd2015-04-24 13:21:22 -0700113static void handle_op_after_cancellation(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700114 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700115 call_data *calld = elem->call_data;
116 channel_data *chand = elem->channel_data;
117 if (op->send_ops) {
Yang Gaodbf8fdc2015-05-28 00:52:31 -0700118 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700119 op->on_done_send->cb(op->on_done_send->cb_arg, 0);
Craig Tillere70413c2015-04-24 10:12:34 -0700120 }
121 if (op->recv_ops) {
122 char status[GPR_LTOA_MIN_BUFSIZE];
123 grpc_metadata_batch mdb;
124 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700125 calld->status.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700126 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700127 calld->details.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700128 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700129 calld->status.prev = calld->details.next = NULL;
130 calld->status.next = &calld->details;
131 calld->details.prev = &calld->status;
132 mdb.list.head = &calld->status;
133 mdb.list.tail = &calld->details;
Craig Tillere70413c2015-04-24 10:12:34 -0700134 mdb.garbage.head = mdb.garbage.tail = NULL;
Craig Tiller143e7bf2015-07-13 08:41:49 -0700135 mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Craig Tillere70413c2015-04-24 10:12:34 -0700136 grpc_sopb_add_metadata(op->recv_ops, mdb);
137 *op->recv_state = GRPC_STREAM_CLOSED;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700138 op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
Craig Tillere70413c2015-04-24 10:12:34 -0700139 }
Craig Tiller5dde66e2015-06-02 09:05:23 -0700140 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700141 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700142 }
Craig Tillere70413c2015-04-24 10:12:34 -0700143}
Craig Tiller8b976d02015-02-05 21:41:23 -0800144
Craig Tillereb3b12e2015-06-26 14:42:49 -0700145typedef struct {
146 grpc_iomgr_closure closure;
147 grpc_call_element *elem;
148} waiting_call;
149
Craig Tiller079a11b2015-06-30 10:07:15 -0700150static void perform_transport_stream_op(grpc_call_element *elem,
151 grpc_transport_stream_op *op,
152 int continuation);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700153
154static void continue_with_pick(void *arg, int iomgr_success) {
155 waiting_call *wc = arg;
156 call_data *calld = wc->elem->call_data;
157 perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
158 gpr_free(wc);
159}
160
Craig Tiller079a11b2015-06-30 10:07:15 -0700161static void add_to_lb_policy_wait_queue_locked_state_config(
162 grpc_call_element *elem) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700163 channel_data *chand = elem->channel_data;
164 waiting_call *wc = gpr_malloc(sizeof(*wc));
165 grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
166 wc->elem = elem;
167 wc->closure.next = chand->waiting_for_config_closures;
168 chand->waiting_for_config_closures = &wc->closure;
169}
170
171static int is_empty(void *p, int len) {
172 char *ptr = p;
173 int i;
174 for (i = 0; i < len; i++) {
175 if (ptr[i] != 0) return 0;
176 }
177 return 1;
178}
179
180static void started_call(void *arg, int iomgr_success) {
181 call_data *calld = arg;
182 grpc_transport_stream_op op;
183 int have_waiting;
184
185 gpr_mu_lock(&calld->mu_state);
186 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
187 memset(&op, 0, sizeof(op));
188 op.cancel_with_status = GRPC_STATUS_CANCELLED;
189 gpr_mu_unlock(&calld->mu_state);
190 grpc_subchannel_call_process_op(calld->subchannel_call, &op);
191 } else if (calld->state == CALL_WAITING_FOR_CALL) {
192 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
193 if (calld->subchannel_call != NULL) {
194 calld->state = CALL_ACTIVE;
195 gpr_mu_unlock(&calld->mu_state);
196 if (have_waiting) {
Craig Tiller079a11b2015-06-30 10:07:15 -0700197 grpc_subchannel_call_process_op(calld->subchannel_call,
198 &calld->waiting_op);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700199 }
200 } else {
201 calld->state = CALL_CANCELLED;
202 gpr_mu_unlock(&calld->mu_state);
203 if (have_waiting) {
204 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
205 }
206 }
207 } else {
208 GPR_ASSERT(calld->state == CALL_CANCELLED);
Craig Tiller62b14732015-07-01 11:28:41 -0700209 gpr_mu_unlock(&calld->mu_state);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700210 }
211}
212
213static void picked_target(void *arg, int iomgr_success) {
214 call_data *calld = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700215 grpc_pollset *pollset;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700216
217 if (calld->picked_channel == NULL) {
218 /* treat this like a cancellation */
219 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
220 perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
221 } else {
222 gpr_mu_lock(&calld->mu_state);
223 if (calld->state == CALL_CANCELLED) {
224 gpr_mu_unlock(&calld->mu_state);
225 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
226 } else {
227 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
228 calld->state = CALL_WAITING_FOR_CALL;
Craig Tillerabf36382015-06-29 16:13:27 -0700229 pollset = calld->waiting_op.bind_pollset;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700230 gpr_mu_unlock(&calld->mu_state);
231 grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
Craig Tillerabf36382015-06-29 16:13:27 -0700232 grpc_subchannel_create_call(calld->picked_channel, pollset,
Craig Tiller5f84c842015-06-26 16:08:21 -0700233 &calld->subchannel_call,
234 &calld->async_setup_task);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700235 }
236 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700237}
238
Craig Tillerb3671532015-07-01 10:37:40 -0700239static grpc_iomgr_closure *merge_into_waiting_op(
240 grpc_call_element *elem, grpc_transport_stream_op *new_op) {
Craig Tillerbb32ba32015-06-30 09:31:48 -0700241 call_data *calld = elem->call_data;
Craig Tiller5d44c062015-07-01 08:55:28 -0700242 grpc_iomgr_closure *consumed_op = NULL;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700243 grpc_transport_stream_op *waiting_op = &calld->waiting_op;
Craig Tiller2e9dc502015-07-01 11:31:07 -0700244 GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
245 GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
Craig Tillerbb32ba32015-06-30 09:31:48 -0700246 if (new_op->send_ops != NULL) {
247 waiting_op->send_ops = new_op->send_ops;
248 waiting_op->is_last_send = new_op->is_last_send;
249 waiting_op->on_done_send = new_op->on_done_send;
250 }
251 if (new_op->recv_ops != NULL) {
252 waiting_op->recv_ops = new_op->recv_ops;
253 waiting_op->recv_state = new_op->recv_state;
254 waiting_op->on_done_recv = new_op->on_done_recv;
255 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700256 if (new_op->on_consumed != NULL) {
257 if (waiting_op->on_consumed != NULL) {
258 consumed_op = waiting_op->on_consumed;
259 }
Craig Tillerbb32ba32015-06-30 09:31:48 -0700260 waiting_op->on_consumed = new_op->on_consumed;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700261 }
262 if (new_op->cancel_with_status != GRPC_STATUS_OK) {
263 waiting_op->cancel_with_status = new_op->cancel_with_status;
264 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700265 return consumed_op;
Craig Tillerbb32ba32015-06-30 09:31:48 -0700266}
267
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700268static char *cc_get_peer(grpc_call_element *elem) {
269 call_data *calld = elem->call_data;
270 channel_data *chand = elem->channel_data;
271 grpc_subchannel_call *subchannel_call;
272 char *result;
273
274 gpr_mu_lock(&calld->mu_state);
275 if (calld->state == CALL_ACTIVE) {
276 subchannel_call = calld->subchannel_call;
277 GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
278 gpr_mu_unlock(&calld->mu_state);
279 result = grpc_subchannel_call_get_peer(subchannel_call);
280 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer");
281 return result;
282 } else {
283 gpr_mu_unlock(&calld->mu_state);
284 return grpc_channel_get_target(chand->master);
285 }
286}
287
Craig Tiller079a11b2015-06-30 10:07:15 -0700288static void perform_transport_stream_op(grpc_call_element *elem,
289 grpc_transport_stream_op *op,
290 int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800291 call_data *calld = elem->call_data;
292 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700293 grpc_subchannel_call *subchannel_call;
294 grpc_lb_policy *lb_policy;
Craig Tiller49924e02015-06-29 22:42:33 -0700295 grpc_transport_stream_op op2;
Craig Tiller5d44c062015-07-01 08:55:28 -0700296 grpc_iomgr_closure *consumed_op = NULL;
Craig Tillera9407522015-04-24 10:37:57 -0700297 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
298 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800299
Craig Tillerf5f17122015-06-25 08:47:26 -0700300 gpr_mu_lock(&calld->mu_state);
301 switch (calld->state) {
302 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700303 GPR_ASSERT(!continuation);
304 subchannel_call = calld->subchannel_call;
Craig Tillerf5f17122015-06-25 08:47:26 -0700305 gpr_mu_unlock(&calld->mu_state);
306 grpc_subchannel_call_process_op(subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700307 break;
308 case CALL_CANCELLED:
309 gpr_mu_unlock(&calld->mu_state);
310 handle_op_after_cancellation(elem, op);
311 break;
Craig Tiller5d44c062015-07-01 08:55:28 -0700312 case CALL_WAITING_FOR_SEND:
313 GPR_ASSERT(!continuation);
314 consumed_op = merge_into_waiting_op(elem, op);
Craig Tillerb3671532015-07-01 10:37:40 -0700315 if (!calld->waiting_op.send_ops &&
316 calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
Craig Tiller5d44c062015-07-01 08:55:28 -0700317 gpr_mu_unlock(&calld->mu_state);
318 break;
319 }
320 *op = calld->waiting_op;
321 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
322 continuation = 1;
Craig Tillerb3671532015-07-01 10:37:40 -0700323 /* fall through */
Craig Tillereb3b12e2015-06-26 14:42:49 -0700324 case CALL_WAITING_FOR_CONFIG:
325 case CALL_WAITING_FOR_PICK:
326 case CALL_WAITING_FOR_CALL:
327 if (!continuation) {
328 if (op->cancel_with_status != GRPC_STATUS_OK) {
329 calld->state = CALL_CANCELLED;
Craig Tiller49924e02015-06-29 22:42:33 -0700330 op2 = calld->waiting_op;
331 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
Craig Tillerbb32ba32015-06-30 09:31:48 -0700332 if (op->on_consumed) {
333 calld->waiting_op.on_consumed = op->on_consumed;
334 op->on_consumed = NULL;
335 } else if (op2.on_consumed) {
336 calld->waiting_op.on_consumed = op2.on_consumed;
337 op2.on_consumed = NULL;
338 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700339 gpr_mu_unlock(&calld->mu_state);
340 handle_op_after_cancellation(elem, op);
Craig Tiller49924e02015-06-29 22:42:33 -0700341 handle_op_after_cancellation(elem, &op2);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700342 } else {
Craig Tiller5d44c062015-07-01 08:55:28 -0700343 consumed_op = merge_into_waiting_op(elem, op);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700344 gpr_mu_unlock(&calld->mu_state);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700345 }
346 break;
347 }
Craig Tiller079a11b2015-06-30 10:07:15 -0700348 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700349 case CALL_CREATED:
350 if (op->cancel_with_status != GRPC_STATUS_OK) {
351 calld->state = CALL_CANCELLED;
352 gpr_mu_unlock(&calld->mu_state);
353 handle_op_after_cancellation(elem, op);
354 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700355 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700356
Craig Tiller5d44c062015-07-01 08:55:28 -0700357 if (op->send_ops == NULL) {
358 /* need to have some send ops before we can select the
359 lb target */
360 calld->state = CALL_WAITING_FOR_SEND;
Craig Tillerf5f17122015-06-25 08:47:26 -0700361 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf5f17122015-06-25 08:47:26 -0700362 } else {
Craig Tiller5d44c062015-07-01 08:55:28 -0700363 gpr_mu_lock(&chand->mu_config);
364 lb_policy = chand->lb_policy;
365 if (lb_policy) {
Craig Tiller990f6422015-07-20 22:07:13 -0700366 grpc_transport_stream_op *op = &calld->waiting_op;
367 grpc_pollset *bind_pollset = op->bind_pollset;
368 grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
Craig Tiller5d44c062015-07-01 08:55:28 -0700369 GRPC_LB_POLICY_REF(lb_policy, "pick");
370 gpr_mu_unlock(&chand->mu_config);
371 calld->state = CALL_WAITING_FOR_PICK;
Craig Tiller3d578712015-07-20 22:00:24 -0700372
Craig Tiller990f6422015-07-20 22:07:13 -0700373 GPR_ASSERT(op->bind_pollset);
374 GPR_ASSERT(op->send_ops);
375 GPR_ASSERT(op->send_ops->nops >= 1);
Craig Tiller3d578712015-07-20 22:00:24 -0700376 GPR_ASSERT(
Craig Tiller990f6422015-07-20 22:07:13 -0700377 op->send_ops->ops[0].type == GRPC_OP_METADATA);
Craig Tiller5d44c062015-07-01 08:55:28 -0700378 gpr_mu_unlock(&calld->mu_state);
379
Craig Tiller990f6422015-07-20 22:07:13 -0700380 grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
381 grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
382 &calld->picked_channel, &calld->async_setup_task);
Craig Tiller5d44c062015-07-01 08:55:28 -0700383
384 GRPC_LB_POLICY_UNREF(lb_policy, "pick");
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700385 } else if (chand->resolver != NULL) {
Craig Tiller5d44c062015-07-01 08:55:28 -0700386 calld->state = CALL_WAITING_FOR_CONFIG;
387 add_to_lb_policy_wait_queue_locked_state_config(elem);
388 gpr_mu_unlock(&chand->mu_config);
389 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700390 } else {
391 calld->state = CALL_CANCELLED;
392 gpr_mu_unlock(&chand->mu_config);
393 gpr_mu_unlock(&calld->mu_state);
394 handle_op_after_cancellation(elem, op);
Craig Tiller5d44c062015-07-01 08:55:28 -0700395 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700396 }
397 }
398 break;
Craig Tillerf5f17122015-06-25 08:47:26 -0700399 }
Craig Tiller5d44c062015-07-01 08:55:28 -0700400
401 if (consumed_op != NULL) {
402 consumed_op->cb(consumed_op->cb_arg, 1);
403 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700404}
Craig Tillerf5f17122015-06-25 08:47:26 -0700405
Craig Tillereb3b12e2015-06-26 14:42:49 -0700406static void cc_start_transport_stream_op(grpc_call_element *elem,
Craig Tiller079a11b2015-06-30 10:07:15 -0700407 grpc_transport_stream_op *op) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700408 perform_transport_stream_op(elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409}
410
Craig Tiller3f475422015-06-25 10:43:05 -0700411static void cc_on_config_changed(void *arg, int iomgr_success) {
412 channel_data *chand = arg;
413 grpc_lb_policy *lb_policy = NULL;
414 grpc_lb_policy *old_lb_policy;
415 grpc_resolver *old_resolver;
416 grpc_iomgr_closure *wakeup_closures = NULL;
417
Craig Tillerd7b68e72015-06-28 11:41:09 -0700418 if (chand->incoming_configuration != NULL) {
Craig Tiller3f475422015-06-25 10:43:05 -0700419 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700420 GRPC_LB_POLICY_REF(lb_policy, "channel");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700421
422 grpc_client_config_unref(chand->incoming_configuration);
Craig Tiller3f475422015-06-25 10:43:05 -0700423 }
424
Craig Tiller3f475422015-06-25 10:43:05 -0700425 chand->incoming_configuration = NULL;
426
427 gpr_mu_lock(&chand->mu_config);
428 old_lb_policy = chand->lb_policy;
429 chand->lb_policy = lb_policy;
Craig Tillerb9a46ae2015-07-01 09:45:21 -0700430 if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
Craig Tiller3f475422015-06-25 10:43:05 -0700431 wakeup_closures = chand->waiting_for_config_closures;
432 chand->waiting_for_config_closures = NULL;
433 }
434 gpr_mu_unlock(&chand->mu_config);
435
Craig Tiller98465032015-06-29 14:36:42 -0700436 if (old_lb_policy) {
437 GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
438 }
439
440 gpr_mu_lock(&chand->mu_config);
441 if (iomgr_success && chand->resolver) {
442 grpc_resolver *resolver = chand->resolver;
443 GRPC_RESOLVER_REF(resolver, "channel-next");
444 gpr_mu_unlock(&chand->mu_config);
445 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
Craig Tillerab9f1922015-07-07 14:58:55 -0700446 grpc_resolver_next(resolver, &chand->incoming_configuration,
Craig Tiller079a11b2015-06-30 10:07:15 -0700447 &chand->on_config_changed);
Craig Tiller98465032015-06-29 14:36:42 -0700448 GRPC_RESOLVER_UNREF(resolver, "channel-next");
449 } else {
450 old_resolver = chand->resolver;
451 chand->resolver = NULL;
Craig Tiller079a11b2015-06-30 10:07:15 -0700452 grpc_connectivity_state_set(&chand->state_tracker,
453 GRPC_CHANNEL_FATAL_FAILURE);
Craig Tiller98465032015-06-29 14:36:42 -0700454 gpr_mu_unlock(&chand->mu_config);
455 if (old_resolver != NULL) {
456 grpc_resolver_shutdown(old_resolver);
457 GRPC_RESOLVER_UNREF(old_resolver, "channel");
458 }
459 }
460
Craig Tiller3f475422015-06-25 10:43:05 -0700461 while (wakeup_closures) {
462 grpc_iomgr_closure *next = wakeup_closures->next;
Craig Tiller994c2622015-07-23 14:00:58 -0700463 wakeup_closures->cb(wakeup_closures->cb_arg, 1);
Craig Tiller3f475422015-06-25 10:43:05 -0700464 wakeup_closures = next;
465 }
466
Craig Tiller98465032015-06-29 14:36:42 -0700467 GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
Craig Tiller3f475422015-06-25 10:43:05 -0700468}
469
Craig Tiller079a11b2015-06-30 10:07:15 -0700470static void cc_start_transport_op(grpc_channel_element *elem,
471 grpc_transport_op *op) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700472 grpc_lb_policy *lb_policy = NULL;
473 channel_data *chand = elem->channel_data;
Craig Tiller98465032015-06-29 14:36:42 -0700474 grpc_resolver *destroy_resolver = NULL;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700475 grpc_iomgr_closure *on_consumed = op->on_consumed;
476 op->on_consumed = NULL;
477
478 GPR_ASSERT(op->set_accept_stream == NULL);
479 GPR_ASSERT(op->bind_pollset == NULL);
480
481 gpr_mu_lock(&chand->mu_config);
482 if (op->on_connectivity_state_change != NULL) {
Craig Tiller079a11b2015-06-30 10:07:15 -0700483 grpc_connectivity_state_notify_on_state_change(
484 &chand->state_tracker, op->connectivity_state,
485 op->on_connectivity_state_change);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700486 op->on_connectivity_state_change = NULL;
487 op->connectivity_state = NULL;
488 }
489
Craig Tiller98465032015-06-29 14:36:42 -0700490 if (op->disconnect && chand->resolver != NULL) {
Craig Tiller079a11b2015-06-30 10:07:15 -0700491 grpc_connectivity_state_set(&chand->state_tracker,
492 GRPC_CHANNEL_FATAL_FAILURE);
Craig Tiller98465032015-06-29 14:36:42 -0700493 destroy_resolver = chand->resolver;
494 chand->resolver = NULL;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700495 if (chand->lb_policy != NULL) {
496 grpc_lb_policy_shutdown(chand->lb_policy);
497 }
Craig Tiller98465032015-06-29 14:36:42 -0700498 }
499
Craig Tillerca3e9d32015-06-27 18:37:27 -0700500 if (!is_empty(op, sizeof(*op))) {
501 lb_policy = chand->lb_policy;
502 if (lb_policy) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700503 GRPC_LB_POLICY_REF(lb_policy, "broadcast");
Craig Tillerca3e9d32015-06-27 18:37:27 -0700504 }
505 }
506 gpr_mu_unlock(&chand->mu_config);
507
Craig Tiller98465032015-06-29 14:36:42 -0700508 if (destroy_resolver) {
509 grpc_resolver_shutdown(destroy_resolver);
510 GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
511 }
512
Craig Tillerca3e9d32015-06-27 18:37:27 -0700513 if (lb_policy) {
514 grpc_lb_policy_broadcast(lb_policy, op);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700515 GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
Craig Tillerca3e9d32015-06-27 18:37:27 -0700516 }
517
518 if (on_consumed) {
519 grpc_iomgr_add_callback(on_consumed);
520 }
521}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800522
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523/* Constructor for call_data */
524static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700525 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700526 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800527 call_data *calld = elem->call_data;
528
Craig Tiller50d9db52015-04-23 10:52:14 -0700529 /* TODO(ctiller): is there something useful we can do here? */
530 GPR_ASSERT(initial_op == NULL);
531
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800532 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
533 GPR_ASSERT(server_transport_data == NULL);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700534 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800535 calld->elem = elem;
536 calld->state = CALL_CREATED;
Craig Tiller143e7bf2015-07-13 08:41:49 -0700537 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800538}
539
540/* Destructor for call_data */
541static void destroy_call_elem(grpc_call_element *elem) {
542 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700543 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800544
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800545 /* if the call got activated, we need to destroy the child stack also, and
546 remove it from the in-flight requests tracked by the child_entry we
547 picked */
Craig Tiller3f475422015-06-25 10:43:05 -0700548 gpr_mu_lock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700549 switch (calld->state) {
550 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700551 subchannel_call = calld->subchannel_call;
Craig Tiller3f475422015-06-25 10:43:05 -0700552 gpr_mu_unlock(&calld->mu_state);
Craig Tillerc3967532015-06-29 14:59:38 -0700553 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
Craig Tillerf93fd052015-06-02 08:15:33 -0700554 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700555 case CALL_CREATED:
556 case CALL_CANCELLED:
557 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700558 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700559 case CALL_WAITING_FOR_PICK:
560 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700561 case CALL_WAITING_FOR_CALL:
Craig Tiller5d44c062015-07-01 08:55:28 -0700562 case CALL_WAITING_FOR_SEND:
Craig Tiller3f475422015-06-25 10:43:05 -0700563 gpr_log(GPR_ERROR, "should never reach here");
564 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700565 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566 }
567}
568
569/* Constructor for channel_data */
Craig Tiller079a11b2015-06-30 10:07:15 -0700570static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800571 const grpc_channel_args *args,
572 grpc_mdctx *metadata_context, int is_first,
573 int is_last) {
574 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800575
Craig Tillereb3b12e2015-06-26 14:42:49 -0700576 memset(chand, 0, sizeof(*chand));
577
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800578 GPR_ASSERT(is_last);
579 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
580
Craig Tiller3f475422015-06-25 10:43:05 -0700581 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700582 chand->mdctx = metadata_context;
Craig Tiller98465032015-06-29 14:36:42 -0700583 chand->master = master;
Craig Tiller079a11b2015-06-30 10:07:15 -0700584 grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
585 chand);
Craig Tiller98465032015-06-29 14:36:42 -0700586
587 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800588}
589
590/* Destructor for channel_data */
591static void destroy_channel_elem(grpc_channel_element *elem) {
592 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800593
Craig Tillerd7b68e72015-06-28 11:41:09 -0700594 if (chand->resolver != NULL) {
Craig Tiller98465032015-06-29 14:36:42 -0700595 grpc_resolver_shutdown(chand->resolver);
596 GRPC_RESOLVER_UNREF(chand->resolver, "channel");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800597 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700598 if (chand->lb_policy != NULL) {
599 GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
Craig Tiller3f475422015-06-25 10:43:05 -0700600 }
601 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602}
603
604const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller079a11b2015-06-30 10:07:15 -0700605 cc_start_transport_stream_op,
606 cc_start_transport_op,
607 sizeof(call_data),
608 init_call_elem,
609 destroy_call_elem,
610 sizeof(channel_data),
611 init_channel_elem,
612 destroy_channel_elem,
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700613 cc_get_peer,
Craig Tiller079a11b2015-06-30 10:07:15 -0700614 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700615};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800616
Craig Tillerf5f17122015-06-25 08:47:26 -0700617void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
618 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800619 /* post construction initialization: set the transport setup pointer */
620 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
621 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700622 GPR_ASSERT(!chand->resolver);
623 chand->resolver = resolver;
Craig Tiller98465032015-06-29 14:36:42 -0700624 GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
625 GRPC_RESOLVER_REF(resolver, "channel");
Craig Tiller079a11b2015-06-30 10:07:15 -0700626 grpc_resolver_next(resolver, &chand->incoming_configuration,
627 &chand->on_config_changed);
Craig Tiller190d3602015-02-18 09:23:38 -0800628}