blob: 19700a90a6e03cda8940f9205acd98c3a05685fd [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include "src/core/channel/channel_args.h"
40#include "src/core/channel/connected_channel.h"
Craig Tillerca3e9d32015-06-27 18:37:27 -070041#include "src/core/channel/connectivity_state.h"
ctillerc6d61c42014-12-15 14:52:08 -080042#include "src/core/iomgr/iomgr.h"
Craig Tillerfe4ba362015-05-08 09:47:18 -070043#include "src/core/iomgr/pollset_set.h"
Craig Tiller485d7762015-01-23 12:54:05 -080044#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc/support/sync.h>
48#include <grpc/support/useful.h>
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050/* Client channel implementation */
51
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052typedef struct call_data call_data;
53
54typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070055 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070056 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070057 /** resolver for this channel */
58 grpc_resolver *resolver;
Craig Tillerf5f17122015-06-25 08:47:26 -070059
Craig Tillerf5f17122015-06-25 08:47:26 -070060 /** mutex protecting client configuration, resolution state */
61 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070062 /** currently active load balancer - guarded by mu_config */
63 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070064 /** incoming configuration - set by resolver.next
65 guarded by mu_config */
66 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070067 /** a list of closures that are all waiting for config to come in */
68 grpc_iomgr_closure *waiting_for_config_closures;
69 /** resolver callback */
70 grpc_iomgr_closure on_config_changed;
71 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -070072 grpc_connectivity_state_tracker state_tracker;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080073} channel_data;
74
75typedef enum {
76 CALL_CREATED,
Craig Tiller3f475422015-06-25 10:43:05 -070077 CALL_WAITING_FOR_CONFIG,
78 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -070079 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080080 CALL_ACTIVE,
81 CALL_CANCELLED
82} call_state;
83
84struct call_data {
85 /* owning element */
86 grpc_call_element *elem;
87
Craig Tillerf5f17122015-06-25 08:47:26 -070088 gpr_mu mu_state;
89
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080090 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -070092 grpc_subchannel *picked_channel;
93 grpc_iomgr_closure async_setup_task;
94 grpc_transport_stream_op waiting_op;
95 /* our child call stack */
96 grpc_subchannel_call *subchannel_call;
97 grpc_linked_mdelem status;
98 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099};
100
Craig Tiller1a727fd2015-04-24 13:21:22 -0700101static void handle_op_after_cancellation(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700102 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700103 call_data *calld = elem->call_data;
104 channel_data *chand = elem->channel_data;
105 if (op->send_ops) {
Yang Gaodbf8fdc2015-05-28 00:52:31 -0700106 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700107 op->on_done_send->cb(op->on_done_send->cb_arg, 0);
Craig Tillere70413c2015-04-24 10:12:34 -0700108 }
109 if (op->recv_ops) {
110 char status[GPR_LTOA_MIN_BUFSIZE];
111 grpc_metadata_batch mdb;
112 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700113 calld->status.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700114 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700115 calld->details.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700116 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700117 calld->status.prev = calld->details.next = NULL;
118 calld->status.next = &calld->details;
119 calld->details.prev = &calld->status;
120 mdb.list.head = &calld->status;
121 mdb.list.tail = &calld->details;
Craig Tillere70413c2015-04-24 10:12:34 -0700122 mdb.garbage.head = mdb.garbage.tail = NULL;
123 mdb.deadline = gpr_inf_future;
124 grpc_sopb_add_metadata(op->recv_ops, mdb);
125 *op->recv_state = GRPC_STREAM_CLOSED;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700126 op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
Craig Tillere70413c2015-04-24 10:12:34 -0700127 }
Craig Tiller5dde66e2015-06-02 09:05:23 -0700128 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700129 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700130 }
Craig Tillere70413c2015-04-24 10:12:34 -0700131}
Craig Tiller8b976d02015-02-05 21:41:23 -0800132
Craig Tillereb3b12e2015-06-26 14:42:49 -0700133typedef struct {
134 grpc_iomgr_closure closure;
135 grpc_call_element *elem;
136} waiting_call;
137
138static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
139
140static void continue_with_pick(void *arg, int iomgr_success) {
141 waiting_call *wc = arg;
142 call_data *calld = wc->elem->call_data;
143 perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
144 gpr_free(wc);
145}
146
147static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
148 channel_data *chand = elem->channel_data;
149 waiting_call *wc = gpr_malloc(sizeof(*wc));
150 grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
151 wc->elem = elem;
152 wc->closure.next = chand->waiting_for_config_closures;
153 chand->waiting_for_config_closures = &wc->closure;
154}
155
156static int is_empty(void *p, int len) {
157 char *ptr = p;
158 int i;
159 for (i = 0; i < len; i++) {
160 if (ptr[i] != 0) return 0;
161 }
162 return 1;
163}
164
165static void started_call(void *arg, int iomgr_success) {
166 call_data *calld = arg;
167 grpc_transport_stream_op op;
168 int have_waiting;
169
170 gpr_mu_lock(&calld->mu_state);
171 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
172 memset(&op, 0, sizeof(op));
173 op.cancel_with_status = GRPC_STATUS_CANCELLED;
174 gpr_mu_unlock(&calld->mu_state);
175 grpc_subchannel_call_process_op(calld->subchannel_call, &op);
176 } else if (calld->state == CALL_WAITING_FOR_CALL) {
177 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
178 if (calld->subchannel_call != NULL) {
179 calld->state = CALL_ACTIVE;
180 gpr_mu_unlock(&calld->mu_state);
181 if (have_waiting) {
182 grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
183 }
184 } else {
185 calld->state = CALL_CANCELLED;
186 gpr_mu_unlock(&calld->mu_state);
187 if (have_waiting) {
188 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
189 }
190 }
191 } else {
192 GPR_ASSERT(calld->state == CALL_CANCELLED);
193 }
194}
195
196static void picked_target(void *arg, int iomgr_success) {
197 call_data *calld = arg;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700198 grpc_transport_stream_op op;
199
200 if (calld->picked_channel == NULL) {
201 /* treat this like a cancellation */
202 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
203 perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
204 } else {
205 gpr_mu_lock(&calld->mu_state);
206 if (calld->state == CALL_CANCELLED) {
207 gpr_mu_unlock(&calld->mu_state);
208 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
209 } else {
210 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
211 calld->state = CALL_WAITING_FOR_CALL;
212 op = calld->waiting_op;
213 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
214 gpr_mu_unlock(&calld->mu_state);
215 grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
Craig Tiller5f84c842015-06-26 16:08:21 -0700216 grpc_subchannel_create_call(calld->picked_channel, &op,
217 &calld->subchannel_call,
218 &calld->async_setup_task);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700219 }
220 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700221}
222
223static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700224 grpc_metadata_batch *initial_metadata;
225 grpc_transport_stream_op *op = &calld->waiting_op;
226
227 GPR_ASSERT(op->bind_pollset);
228 GPR_ASSERT(op->send_ops);
229 GPR_ASSERT(op->send_ops->nops >= 1);
230 GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
231 initial_metadata = &op->send_ops->ops[0].data.metadata;
232
233 grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
234 grpc_lb_policy_pick(lb_policy, op->bind_pollset,
235 initial_metadata, &calld->picked_channel, &calld->async_setup_task);
Craig Tillerf5f17122015-06-25 08:47:26 -0700236}
237
Craig Tillereb3b12e2015-06-26 14:42:49 -0700238static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800239 call_data *calld = elem->call_data;
240 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700241 grpc_subchannel_call *subchannel_call;
242 grpc_lb_policy *lb_policy;
Craig Tillera9407522015-04-24 10:37:57 -0700243 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
244 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800245
Craig Tillerf5f17122015-06-25 08:47:26 -0700246 gpr_mu_lock(&calld->mu_state);
247 switch (calld->state) {
248 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700249 GPR_ASSERT(!continuation);
250 subchannel_call = calld->subchannel_call;
Craig Tillerf5f17122015-06-25 08:47:26 -0700251 gpr_mu_unlock(&calld->mu_state);
252 grpc_subchannel_call_process_op(subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700253 break;
254 case CALL_CANCELLED:
255 gpr_mu_unlock(&calld->mu_state);
256 handle_op_after_cancellation(elem, op);
257 break;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700258 case CALL_WAITING_FOR_CONFIG:
259 case CALL_WAITING_FOR_PICK:
260 case CALL_WAITING_FOR_CALL:
261 if (!continuation) {
262 if (op->cancel_with_status != GRPC_STATUS_OK) {
263 calld->state = CALL_CANCELLED;
264 gpr_mu_unlock(&calld->mu_state);
265 handle_op_after_cancellation(elem, op);
266 } else {
267 GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
268 (op->send_ops == NULL));
269 GPR_ASSERT((calld->waiting_op.recv_ops == NULL) !=
270 (op->recv_ops == NULL));
271 if (op->send_ops != NULL) {
272 calld->waiting_op.send_ops = op->send_ops;
273 calld->waiting_op.is_last_send = op->is_last_send;
274 calld->waiting_op.on_done_send = op->on_done_send;
275 }
276 if (op->recv_ops != NULL) {
277 calld->waiting_op.recv_ops = op->recv_ops;
278 calld->waiting_op.recv_state = op->recv_state;
279 calld->waiting_op.on_done_recv = op->on_done_recv;
280 }
281 gpr_mu_unlock(&calld->mu_state);
282 if (op->on_consumed != NULL) {
283 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
284 }
285 }
286 break;
287 }
288 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700289 case CALL_CREATED:
290 if (op->cancel_with_status != GRPC_STATUS_OK) {
291 calld->state = CALL_CANCELLED;
292 gpr_mu_unlock(&calld->mu_state);
293 handle_op_after_cancellation(elem, op);
294 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700295 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700296
297 gpr_mu_lock(&chand->mu_config);
298 lb_policy = chand->lb_policy;
299 if (lb_policy) {
300 grpc_lb_policy_ref(lb_policy);
301 gpr_mu_unlock(&chand->mu_config);
Craig Tiller3f475422015-06-25 10:43:05 -0700302 calld->state = CALL_WAITING_FOR_PICK;
Craig Tillerf5f17122015-06-25 08:47:26 -0700303 gpr_mu_unlock(&calld->mu_state);
304
305 pick_target(lb_policy, calld);
306
307 grpc_lb_policy_unref(lb_policy);
308 } else {
Craig Tiller3f475422015-06-25 10:43:05 -0700309 calld->state = CALL_WAITING_FOR_CONFIG;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700310 add_to_lb_policy_wait_queue_locked_state_config(elem);
Craig Tillerf5f17122015-06-25 08:47:26 -0700311 gpr_mu_unlock(&chand->mu_config);
312 gpr_mu_unlock(&calld->mu_state);
313 }
314 }
315 break;
Craig Tillerf5f17122015-06-25 08:47:26 -0700316 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700317}
Craig Tillerf5f17122015-06-25 08:47:26 -0700318
Craig Tillereb3b12e2015-06-26 14:42:49 -0700319static void cc_start_transport_stream_op(grpc_call_element *elem,
320 grpc_transport_stream_op *op) {
321 perform_transport_stream_op(elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800322}
323
Craig Tiller3f475422015-06-25 10:43:05 -0700324static void update_state_locked(channel_data *chand) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700325 gpr_log(GPR_ERROR, "update_state_locked not implemented");
Craig Tiller3f475422015-06-25 10:43:05 -0700326}
327
328static void cc_on_config_changed(void *arg, int iomgr_success) {
329 channel_data *chand = arg;
330 grpc_lb_policy *lb_policy = NULL;
331 grpc_lb_policy *old_lb_policy;
332 grpc_resolver *old_resolver;
333 grpc_iomgr_closure *wakeup_closures = NULL;
334
335 if (chand->incoming_configuration) {
336 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
337 grpc_lb_policy_ref(lb_policy);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700338
339 grpc_client_config_unref(chand->incoming_configuration);
Craig Tiller3f475422015-06-25 10:43:05 -0700340 }
341
Craig Tiller3f475422015-06-25 10:43:05 -0700342 chand->incoming_configuration = NULL;
343
344 gpr_mu_lock(&chand->mu_config);
345 old_lb_policy = chand->lb_policy;
346 chand->lb_policy = lb_policy;
347 if (lb_policy != NULL) {
348 wakeup_closures = chand->waiting_for_config_closures;
349 chand->waiting_for_config_closures = NULL;
350 }
351 gpr_mu_unlock(&chand->mu_config);
352
353 while (wakeup_closures) {
354 grpc_iomgr_closure *next = wakeup_closures->next;
355 grpc_iomgr_add_callback(wakeup_closures);
356 wakeup_closures = next;
357 }
358
Craig Tillereb3b12e2015-06-26 14:42:49 -0700359 if (old_lb_policy) {
360 grpc_lb_policy_unref(old_lb_policy);
361 }
Craig Tiller3f475422015-06-25 10:43:05 -0700362
363 if (iomgr_success) {
364 grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
365 } else {
366 gpr_mu_lock(&chand->mu_config);
367 old_resolver = chand->resolver;
368 chand->resolver = NULL;
369 update_state_locked(chand);
370 gpr_mu_unlock(&chand->mu_config);
371 grpc_resolver_unref(old_resolver);
372 }
373}
374
Craig Tillerca3e9d32015-06-27 18:37:27 -0700375static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
376 grpc_lb_policy *lb_policy = NULL;
377 channel_data *chand = elem->channel_data;
378 grpc_iomgr_closure *on_consumed = op->on_consumed;
379 op->on_consumed = NULL;
380
381 GPR_ASSERT(op->set_accept_stream == NULL);
382 GPR_ASSERT(op->bind_pollset == NULL);
383
384 gpr_mu_lock(&chand->mu_config);
385 if (op->on_connectivity_state_change != NULL) {
386 grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
387 op->on_connectivity_state_change = NULL;
388 op->connectivity_state = NULL;
389 }
390
391 if (!is_empty(op, sizeof(*op))) {
392 lb_policy = chand->lb_policy;
393 if (lb_policy) {
394 grpc_lb_policy_ref(lb_policy);
395 }
396 }
397 gpr_mu_unlock(&chand->mu_config);
398
399 if (lb_policy) {
400 grpc_lb_policy_broadcast(lb_policy, op);
401 grpc_lb_policy_unref(lb_policy);
402 }
403
404 if (on_consumed) {
405 grpc_iomgr_add_callback(on_consumed);
406 }
407}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800408
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409/* Constructor for call_data */
410static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700411 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700412 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413 call_data *calld = elem->call_data;
414
Craig Tiller50d9db52015-04-23 10:52:14 -0700415 /* TODO(ctiller): is there something useful we can do here? */
416 GPR_ASSERT(initial_op == NULL);
417
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800418 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
419 GPR_ASSERT(server_transport_data == NULL);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700420 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800421 calld->elem = elem;
422 calld->state = CALL_CREATED;
423 calld->deadline = gpr_inf_future;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800424}
425
426/* Destructor for call_data */
427static void destroy_call_elem(grpc_call_element *elem) {
428 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700429 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800430
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800431 /* if the call got activated, we need to destroy the child stack also, and
432 remove it from the in-flight requests tracked by the child_entry we
433 picked */
Craig Tiller3f475422015-06-25 10:43:05 -0700434 gpr_mu_lock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700435 switch (calld->state) {
436 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700437 subchannel_call = calld->subchannel_call;
Craig Tiller3f475422015-06-25 10:43:05 -0700438 gpr_mu_unlock(&calld->mu_state);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700439 GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel");
Craig Tillerf93fd052015-06-02 08:15:33 -0700440 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700441 case CALL_CREATED:
442 case CALL_CANCELLED:
443 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700444 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700445 case CALL_WAITING_FOR_PICK:
446 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700447 case CALL_WAITING_FOR_CALL:
Craig Tiller3f475422015-06-25 10:43:05 -0700448 gpr_log(GPR_ERROR, "should never reach here");
449 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700450 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800451 }
452}
453
454/* Constructor for channel_data */
455static void init_channel_elem(grpc_channel_element *elem,
456 const grpc_channel_args *args,
457 grpc_mdctx *metadata_context, int is_first,
458 int is_last) {
459 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460
Craig Tillereb3b12e2015-06-26 14:42:49 -0700461 memset(chand, 0, sizeof(*chand));
462
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800463 GPR_ASSERT(is_last);
464 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
465
Craig Tiller3f475422015-06-25 10:43:05 -0700466 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700467 chand->mdctx = metadata_context;
Craig Tiller3f475422015-06-25 10:43:05 -0700468 grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800469}
470
471/* Destructor for channel_data */
472static void destroy_channel_elem(grpc_channel_element *elem) {
473 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474
Craig Tiller3f475422015-06-25 10:43:05 -0700475 if (chand->resolver) {
476 grpc_resolver_unref(chand->resolver);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 }
Craig Tiller3f475422015-06-25 10:43:05 -0700478 if (chand->lb_policy) {
479 grpc_lb_policy_unref(chand->lb_policy);
480 }
481 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800482}
483
484const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller3f475422015-06-25 10:43:05 -0700485 cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
Craig Tiller83b826a2015-05-13 13:43:01 -0700486 init_call_elem, destroy_call_elem, sizeof(channel_data),
487 init_channel_elem, destroy_channel_elem, "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700488};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800489
Craig Tillerf5f17122015-06-25 08:47:26 -0700490void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
491 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 /* post construction initialization: set the transport setup pointer */
493 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
494 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700495 GPR_ASSERT(!chand->resolver);
496 chand->resolver = resolver;
Craig Tiller3f475422015-06-25 10:43:05 -0700497 grpc_resolver_ref(resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700498 grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
Craig Tiller190d3602015-02-18 09:23:38 -0800499}