blob: 9630f6898d830cc090e9e5523aef6def9d709dcb [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
37
38#include "src/core/channel/channel_args.h"
39#include "src/core/channel/connected_channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080040#include "src/core/iomgr/iomgr.h"
Craig Tillerfe4ba362015-05-08 09:47:18 -070041#include "src/core/iomgr/pollset_set.h"
Craig Tiller485d7762015-01-23 12:54:05 -080042#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/alloc.h>
44#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/sync.h>
46#include <grpc/support/useful.h>
47
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048/* Client channel implementation */
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050typedef struct call_data call_data;
51
52typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070053 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070054 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070055 /** resolver for this channel */
56 grpc_resolver *resolver;
Craig Tillerf5f17122015-06-25 08:47:26 -070057
Craig Tillerf5f17122015-06-25 08:47:26 -070058 /** mutex protecting client configuration, resolution state */
59 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070060 /** currently active load balancer - guarded by mu_config */
61 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070062 /** incoming configuration - set by resolver.next
63 guarded by mu_config */
64 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070065 /** a list of closures that are all waiting for config to come in */
66 grpc_iomgr_closure *waiting_for_config_closures;
67 /** resolver callback */
68 grpc_iomgr_closure on_config_changed;
69 /** connectivity state being tracked */
70 grpc_iomgr_closure *on_connectivity_state_change;
71 grpc_connectivity_state *connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080072} channel_data;
73
74typedef enum {
75 CALL_CREATED,
Craig Tiller3f475422015-06-25 10:43:05 -070076 CALL_WAITING_FOR_CONFIG,
77 CALL_WAITING_FOR_PICK,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078 CALL_ACTIVE,
79 CALL_CANCELLED
80} call_state;
81
82struct call_data {
83 /* owning element */
84 grpc_call_element *elem;
85
Craig Tillerf5f17122015-06-25 08:47:26 -070086 gpr_mu mu_state;
87
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 gpr_timespec deadline;
90 union {
91 struct {
92 /* our child call stack */
Craig Tillerf5f17122015-06-25 08:47:26 -070093 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080094 } active;
Craig Tillerb7959a02015-06-25 08:50:54 -070095 grpc_transport_stream_op waiting_op;
Craig Tillere70413c2015-04-24 10:12:34 -070096 struct {
97 grpc_linked_mdelem status;
98 grpc_linked_mdelem details;
99 } cancelled;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100 } s;
101};
102
Craig Tillerf5f17122015-06-25 08:47:26 -0700103#if 0
ctiller82e275f2014-12-12 08:43:28 -0800104static int prepare_activate(grpc_call_element *elem,
105 grpc_child_channel *on_child) {
106 call_data *calld = elem->call_data;
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700107 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800108 if (calld->state == CALL_CANCELLED) return 0;
109
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110 /* no more access to calld->s.waiting allowed */
111 GPR_ASSERT(calld->state == CALL_WAITING);
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700112
113 if (calld->s.waiting_op.bind_pollset) {
114 grpc_transport_setup_del_interested_party(chand->transport_setup,
115 calld->s.waiting_op.bind_pollset);
116 }
117
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 calld->state = CALL_ACTIVE;
119
ctiller82e275f2014-12-12 08:43:28 -0800120 /* create a child call */
Craig Tiller3f2c2212015-04-23 07:56:33 -0700121 /* TODO(ctiller): pass the waiting op down here */
Craig Tiller06aeea72015-04-23 10:54:45 -0700122 calld->s.active.child_call =
123 grpc_child_channel_create_call(on_child, elem, NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800124
125 return 1;
126}
127
Craig Tillerb7959a02015-06-25 08:50:54 -0700128static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
ctillerf962f522014-12-10 15:28:27 -0800129 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130 grpc_call_element *child_elem =
ctiller82e275f2014-12-12 08:43:28 -0800131 grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132
133 GPR_ASSERT(calld->state == CALL_ACTIVE);
134
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800135 /* continue the start call down the stack, this nees to happen after metadata
136 are flushed*/
Craig Tiller83f88d92015-04-21 16:02:05 -0700137 child_elem->filter->start_transport_op(child_elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138}
139
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140static void remove_waiting_child(channel_data *chand, call_data *calld) {
141 size_t new_count;
142 size_t i;
143 for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
Craig Tiller6e511802015-05-13 12:48:36 -0700144 if (chand->waiting_children[i] == calld) {
Craig Tiller83b826a2015-05-13 13:43:01 -0700145 grpc_transport_setup_del_interested_party(
146 chand->transport_setup, calld->s.waiting_op.bind_pollset);
Craig Tiller6e511802015-05-13 12:48:36 -0700147 continue;
148 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149 chand->waiting_children[new_count++] = chand->waiting_children[i];
150 }
151 GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
152 new_count == chand->waiting_child_count);
153 chand->waiting_child_count = new_count;
154}
Craig Tillerf5f17122015-06-25 08:47:26 -0700155#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156
Craig Tiller1a727fd2015-04-24 13:21:22 -0700157static void handle_op_after_cancellation(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700158 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700159 call_data *calld = elem->call_data;
160 channel_data *chand = elem->channel_data;
161 if (op->send_ops) {
Yang Gaodbf8fdc2015-05-28 00:52:31 -0700162 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700163 op->on_done_send->cb(op->on_done_send->cb_arg, 0);
Craig Tillere70413c2015-04-24 10:12:34 -0700164 }
165 if (op->recv_ops) {
166 char status[GPR_LTOA_MIN_BUFSIZE];
167 grpc_metadata_batch mdb;
168 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
Craig Tiller1a727fd2015-04-24 13:21:22 -0700169 calld->s.cancelled.status.md =
170 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
171 calld->s.cancelled.details.md =
172 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
Craig Tillere70413c2015-04-24 10:12:34 -0700173 calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
174 calld->s.cancelled.status.next = &calld->s.cancelled.details;
175 calld->s.cancelled.details.prev = &calld->s.cancelled.status;
176 mdb.list.head = &calld->s.cancelled.status;
177 mdb.list.tail = &calld->s.cancelled.details;
178 mdb.garbage.head = mdb.garbage.tail = NULL;
179 mdb.deadline = gpr_inf_future;
180 grpc_sopb_add_metadata(op->recv_ops, mdb);
181 *op->recv_state = GRPC_STREAM_CLOSED;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700182 op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
Craig Tillere70413c2015-04-24 10:12:34 -0700183 }
Craig Tiller5dde66e2015-06-02 09:05:23 -0700184 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700185 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700186 }
Craig Tillere70413c2015-04-24 10:12:34 -0700187}
Craig Tiller8b976d02015-02-05 21:41:23 -0800188
Craig Tillerf5f17122015-06-25 08:47:26 -0700189static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) {
190 abort();
191}
192
193static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
194 abort();
195}
196
Craig Tiller3f475422015-06-25 10:43:05 -0700197static void cc_start_transport_stream_op(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700198 grpc_transport_stream_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800199 call_data *calld = elem->call_data;
200 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700201 grpc_subchannel_call *subchannel_call;
202 grpc_lb_policy *lb_policy;
Craig Tillera9407522015-04-24 10:37:57 -0700203 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
204 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205
Craig Tillerf5f17122015-06-25 08:47:26 -0700206 gpr_mu_lock(&calld->mu_state);
207 switch (calld->state) {
208 case CALL_ACTIVE:
209 subchannel_call = calld->s.active.subchannel_call;
Craig Tillerf5f17122015-06-25 08:47:26 -0700210 gpr_mu_unlock(&calld->mu_state);
211 grpc_subchannel_call_process_op(subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700212 break;
213 case CALL_CANCELLED:
214 gpr_mu_unlock(&calld->mu_state);
215 handle_op_after_cancellation(elem, op);
216 break;
217 case CALL_CREATED:
218 if (op->cancel_with_status != GRPC_STATUS_OK) {
219 calld->state = CALL_CANCELLED;
220 gpr_mu_unlock(&calld->mu_state);
221 handle_op_after_cancellation(elem, op);
222 } else {
Craig Tillerf5f17122015-06-25 08:47:26 -0700223 calld->s.waiting_op = *op;
224
225 gpr_mu_lock(&chand->mu_config);
226 lb_policy = chand->lb_policy;
227 if (lb_policy) {
228 grpc_lb_policy_ref(lb_policy);
229 gpr_mu_unlock(&chand->mu_config);
Craig Tiller3f475422015-06-25 10:43:05 -0700230 calld->state = CALL_WAITING_FOR_PICK;
Craig Tillerf5f17122015-06-25 08:47:26 -0700231 gpr_mu_unlock(&calld->mu_state);
232
233 pick_target(lb_policy, calld);
234
235 grpc_lb_policy_unref(lb_policy);
236 } else {
Craig Tiller3f475422015-06-25 10:43:05 -0700237 calld->state = CALL_WAITING_FOR_CONFIG;
Craig Tillerf5f17122015-06-25 08:47:26 -0700238 add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
239 gpr_mu_unlock(&chand->mu_config);
240 gpr_mu_unlock(&calld->mu_state);
241 }
242 }
243 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700244 case CALL_WAITING_FOR_CONFIG:
245 case CALL_WAITING_FOR_PICK:
Craig Tillerf5f17122015-06-25 08:47:26 -0700246 if (op->cancel_with_status != GRPC_STATUS_OK) {
Craig Tillerf5f17122015-06-25 08:47:26 -0700247 calld->state = CALL_CANCELLED;
248 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf5f17122015-06-25 08:47:26 -0700249 handle_op_after_cancellation(elem, op);
250 } else {
251 GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
252 (op->send_ops == NULL));
253 GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
254 (op->recv_ops == NULL));
Craig Tiller3f475422015-06-25 10:43:05 -0700255 if (op->send_ops != NULL) {
Craig Tillerf5f17122015-06-25 08:47:26 -0700256 calld->s.waiting_op.send_ops = op->send_ops;
257 calld->s.waiting_op.is_last_send = op->is_last_send;
258 calld->s.waiting_op.on_done_send = op->on_done_send;
Craig Tillerf5f17122015-06-25 08:47:26 -0700259 }
Craig Tiller3f475422015-06-25 10:43:05 -0700260 if (op->recv_ops != NULL) {
Craig Tillerf5f17122015-06-25 08:47:26 -0700261 calld->s.waiting_op.recv_ops = op->recv_ops;
262 calld->s.waiting_op.recv_state = op->recv_state;
263 calld->s.waiting_op.on_done_recv = op->on_done_recv;
Craig Tillerf5f17122015-06-25 08:47:26 -0700264 }
265 gpr_mu_unlock(&calld->mu_state);
Craig Tiller3f475422015-06-25 10:43:05 -0700266 if (op->on_consumed != NULL) {
267 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tillerf5f17122015-06-25 08:47:26 -0700268 }
269 }
270 break;
271 }
272
273
274
275
276#if 0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800277 gpr_mu_lock(&chand->mu);
278 switch (calld->state) {
279 case CALL_ACTIVE:
ctiller82e275f2014-12-12 08:43:28 -0800280 child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800281 gpr_mu_unlock(&chand->mu);
Craig Tiller83f88d92015-04-21 16:02:05 -0700282 child_elem->filter->start_transport_op(child_elem, op);
Craig Tillera9407522015-04-24 10:37:57 -0700283 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800284 case CALL_CREATED:
Craig Tillera9407522015-04-24 10:37:57 -0700285 if (op->cancel_with_status != GRPC_STATUS_OK) {
286 calld->state = CALL_CANCELLED;
287 gpr_mu_unlock(&chand->mu);
288 handle_op_after_cancellation(elem, op);
289 } else {
290 calld->state = CALL_WAITING;
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700291 calld->s.waiting_op.bind_pollset = NULL;
Craig Tillera9407522015-04-24 10:37:57 -0700292 if (chand->active_child) {
293 /* channel is connected - use the connected stack */
294 if (prepare_activate(elem, chand->active_child)) {
295 gpr_mu_unlock(&chand->mu);
296 /* activate the request (pass it down) outside the lock */
297 complete_activate(elem, op);
298 } else {
299 gpr_mu_unlock(&chand->mu);
300 }
301 } else {
Craig Tiller1a727fd2015-04-24 13:21:22 -0700302 /* check to see if we should initiate a connection (if we're not
303 already),
304 but don't do so until outside the lock to avoid re-entrancy
305 problems if
Craig Tillera9407522015-04-24 10:37:57 -0700306 the callback is immediate */
307 int initiate_transport_setup = 0;
308 if (!chand->transport_setup_initiated) {
309 chand->transport_setup_initiated = 1;
310 initiate_transport_setup = 1;
311 }
312 /* add this call to the waiting set to be resumed once we have a child
313 channel stack, growing the waiting set if needed */
314 if (chand->waiting_child_count == chand->waiting_child_capacity) {
315 chand->waiting_child_capacity =
316 GPR_MAX(chand->waiting_child_capacity * 2, 8);
Craig Tiller1a727fd2015-04-24 13:21:22 -0700317 chand->waiting_children = gpr_realloc(
318 chand->waiting_children,
319 chand->waiting_child_capacity * sizeof(call_data *));
Craig Tillera9407522015-04-24 10:37:57 -0700320 }
321 calld->s.waiting_op = *op;
322 chand->waiting_children[chand->waiting_child_count++] = calld;
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700323 grpc_transport_setup_add_interested_party(chand->transport_setup,
324 op->bind_pollset);
Craig Tillera9407522015-04-24 10:37:57 -0700325 gpr_mu_unlock(&chand->mu);
326
327 /* finally initiate transport setup if needed */
328 if (initiate_transport_setup) {
Craig Tiller8b4a8742015-05-12 13:33:18 -0700329 grpc_transport_setup_initiate(chand->transport_setup);
Craig Tillera9407522015-04-24 10:37:57 -0700330 }
331 }
332 }
333 break;
334 case CALL_WAITING:
335 if (op->cancel_with_status != GRPC_STATUS_OK) {
336 waiting_op = calld->s.waiting_op;
337 remove_waiting_child(chand, calld);
338 calld->state = CALL_CANCELLED;
339 gpr_mu_unlock(&chand->mu);
340 handle_op_after_cancellation(elem, &waiting_op);
341 handle_op_after_cancellation(elem, op);
342 } else {
Craig Tiller1a727fd2015-04-24 13:21:22 -0700343 GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
344 (op->send_ops == NULL));
345 GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
346 (op->recv_ops == NULL));
Craig Tillera9407522015-04-24 10:37:57 -0700347 if (op->send_ops) {
348 calld->s.waiting_op.send_ops = op->send_ops;
349 calld->s.waiting_op.is_last_send = op->is_last_send;
350 calld->s.waiting_op.on_done_send = op->on_done_send;
Craig Tillera9407522015-04-24 10:37:57 -0700351 }
352 if (op->recv_ops) {
353 calld->s.waiting_op.recv_ops = op->recv_ops;
354 calld->s.waiting_op.recv_state = op->recv_state;
355 calld->s.waiting_op.on_done_recv = op->on_done_recv;
Craig Tillera9407522015-04-24 10:37:57 -0700356 }
357 gpr_mu_unlock(&chand->mu);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700358 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700359 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700360 }
Craig Tillera9407522015-04-24 10:37:57 -0700361 }
362 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800363 case CALL_CANCELLED:
364 gpr_mu_unlock(&chand->mu);
Craig Tillere70413c2015-04-24 10:12:34 -0700365 handle_op_after_cancellation(elem, op);
Craig Tillera9407522015-04-24 10:37:57 -0700366 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700368#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800369}
370
Craig Tiller3f475422015-06-25 10:43:05 -0700371static void update_state_locked(channel_data *chand) {
372
373}
374
375static void cc_on_config_changed(void *arg, int iomgr_success) {
376 channel_data *chand = arg;
377 grpc_lb_policy *lb_policy = NULL;
378 grpc_lb_policy *old_lb_policy;
379 grpc_resolver *old_resolver;
380 grpc_iomgr_closure *wakeup_closures = NULL;
381
382 if (chand->incoming_configuration) {
383 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
384 grpc_lb_policy_ref(lb_policy);
385 }
386
387 grpc_client_config_unref(chand->incoming_configuration);
388 chand->incoming_configuration = NULL;
389
390 gpr_mu_lock(&chand->mu_config);
391 old_lb_policy = chand->lb_policy;
392 chand->lb_policy = lb_policy;
393 if (lb_policy != NULL) {
394 wakeup_closures = chand->waiting_for_config_closures;
395 chand->waiting_for_config_closures = NULL;
396 }
397 gpr_mu_unlock(&chand->mu_config);
398
399 while (wakeup_closures) {
400 grpc_iomgr_closure *next = wakeup_closures->next;
401 grpc_iomgr_add_callback(wakeup_closures);
402 wakeup_closures = next;
403 }
404
405 grpc_lb_policy_unref(old_lb_policy);
406
407 if (iomgr_success) {
408 grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
409 } else {
410 gpr_mu_lock(&chand->mu_config);
411 old_resolver = chand->resolver;
412 chand->resolver = NULL;
413 update_state_locked(chand);
414 gpr_mu_unlock(&chand->mu_config);
415 grpc_resolver_unref(old_resolver);
416 }
417}
418
419#if 0
ctillerf962f522014-12-10 15:28:27 -0800420static void channel_op(grpc_channel_element *elem,
421 grpc_channel_element *from_elem, grpc_channel_op *op) {
ctiller82e275f2014-12-12 08:43:28 -0800422 channel_data *chand = elem->channel_data;
423 grpc_child_channel *child_channel;
Craig Tillerda669372015-02-05 10:10:15 -0800424 grpc_channel_op rop;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800425 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
426
427 switch (op->type) {
ctiller82e275f2014-12-12 08:43:28 -0800428 case GRPC_CHANNEL_GOAWAY:
ctillerc6d61c42014-12-15 14:52:08 -0800429 /* sending goaway: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800430 gpr_mu_lock(&chand->mu);
431 child_channel = chand->active_child;
432 chand->active_child = NULL;
433 gpr_mu_unlock(&chand->mu);
434 if (child_channel) {
435 grpc_child_channel_handle_op(child_channel, op);
ctiller58393c22015-01-07 14:03:30 -0800436 grpc_child_channel_destroy(child_channel, 1);
ctiller82e275f2014-12-12 08:43:28 -0800437 } else {
438 gpr_slice_unref(op->data.goaway.message);
439 }
440 break;
441 case GRPC_CHANNEL_DISCONNECT:
ctillerc6d61c42014-12-15 14:52:08 -0800442 /* sending disconnect: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800443 gpr_mu_lock(&chand->mu);
444 child_channel = chand->active_child;
445 chand->active_child = NULL;
446 gpr_mu_unlock(&chand->mu);
447 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800448 grpc_child_channel_destroy(child_channel, 1);
ctillerc6d61c42014-12-15 14:52:08 -0800449 }
Craig Tillerda669372015-02-05 10:10:15 -0800450 /* fake a transport closed to satisfy the refcounting in client */
451 rop.type = GRPC_TRANSPORT_CLOSED;
452 rop.dir = GRPC_CALL_UP;
453 grpc_channel_next_op(elem, &rop);
ctillerc6d61c42014-12-15 14:52:08 -0800454 break;
455 case GRPC_TRANSPORT_GOAWAY:
456 /* receiving goaway: if it's from our active child, drop the active child;
457 in all cases consume the event here */
458 gpr_mu_lock(&chand->mu);
459 child_channel = grpc_channel_stack_from_top_element(from_elem);
460 if (child_channel == chand->active_child) {
461 chand->active_child = NULL;
462 } else {
463 child_channel = NULL;
464 }
465 gpr_mu_unlock(&chand->mu);
466 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800467 grpc_child_channel_destroy(child_channel, 0);
ctillerc6d61c42014-12-15 14:52:08 -0800468 }
469 gpr_slice_unref(op->data.goaway.message);
470 break;
471 case GRPC_TRANSPORT_CLOSED:
472 /* receiving disconnect: if it's from our active child, drop the active
473 child; in all cases consume the event here */
474 gpr_mu_lock(&chand->mu);
475 child_channel = grpc_channel_stack_from_top_element(from_elem);
476 if (child_channel == chand->active_child) {
477 chand->active_child = NULL;
478 } else {
479 child_channel = NULL;
480 }
481 gpr_mu_unlock(&chand->mu);
482 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800483 grpc_child_channel_destroy(child_channel, 0);
ctiller82e275f2014-12-12 08:43:28 -0800484 }
485 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800486 default:
487 switch (op->dir) {
488 case GRPC_CALL_UP:
489 grpc_channel_next_op(elem, op);
490 break;
491 case GRPC_CALL_DOWN:
ctiller82e275f2014-12-12 08:43:28 -0800492 gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
493 abort();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800494 break;
495 }
496 break;
497 }
498}
Craig Tiller3f475422015-06-25 10:43:05 -0700499#endif
500
501static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800502
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503/* Constructor for call_data */
504static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700505 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700506 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800507 call_data *calld = elem->call_data;
508
Craig Tiller50d9db52015-04-23 10:52:14 -0700509 /* TODO(ctiller): is there something useful we can do here? */
510 GPR_ASSERT(initial_op == NULL);
511
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800512 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
513 GPR_ASSERT(server_transport_data == NULL);
514 calld->elem = elem;
515 calld->state = CALL_CREATED;
516 calld->deadline = gpr_inf_future;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800517}
518
519/* Destructor for call_data */
520static void destroy_call_elem(grpc_call_element *elem) {
521 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700522 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800524 /* if the call got activated, we need to destroy the child stack also, and
525 remove it from the in-flight requests tracked by the child_entry we
526 picked */
Craig Tiller3f475422015-06-25 10:43:05 -0700527 gpr_mu_lock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700528 switch (calld->state) {
529 case CALL_ACTIVE:
Craig Tiller3f475422015-06-25 10:43:05 -0700530 subchannel_call = calld->s.active.subchannel_call;
531 gpr_mu_unlock(&calld->mu_state);
532 grpc_subchannel_call_unref(subchannel_call);
Craig Tillerf93fd052015-06-02 08:15:33 -0700533 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700534 case CALL_CREATED:
535 case CALL_CANCELLED:
536 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700537 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700538 case CALL_WAITING_FOR_PICK:
539 case CALL_WAITING_FOR_CONFIG:
540 gpr_log(GPR_ERROR, "should never reach here");
541 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700542 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800543 }
544}
545
546/* Constructor for channel_data */
547static void init_channel_elem(grpc_channel_element *elem,
548 const grpc_channel_args *args,
549 grpc_mdctx *metadata_context, int is_first,
550 int is_last) {
551 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800552
553 GPR_ASSERT(!is_first);
554 GPR_ASSERT(is_last);
555 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
556
Craig Tiller3f475422015-06-25 10:43:05 -0700557 gpr_mu_init(&chand->mu_config);
558 chand->resolver = NULL;
Craig Tillere70413c2015-04-24 10:12:34 -0700559 chand->mdctx = metadata_context;
Craig Tiller3f475422015-06-25 10:43:05 -0700560 grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800561}
562
563/* Destructor for channel_data */
564static void destroy_channel_elem(grpc_channel_element *elem) {
565 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566
Craig Tiller3f475422015-06-25 10:43:05 -0700567 if (chand->resolver) {
568 grpc_resolver_unref(chand->resolver);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800569 }
Craig Tiller3f475422015-06-25 10:43:05 -0700570 if (chand->lb_policy) {
571 grpc_lb_policy_unref(chand->lb_policy);
572 }
573 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800574}
575
576const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller3f475422015-06-25 10:43:05 -0700577 cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
Craig Tiller83b826a2015-05-13 13:43:01 -0700578 init_call_elem, destroy_call_elem, sizeof(channel_data),
579 init_channel_elem, destroy_channel_elem, "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700580};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800581
Craig Tiller3f475422015-06-25 10:43:05 -0700582#if 0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800583grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
584 grpc_channel_stack *channel_stack, grpc_transport *transport,
585 grpc_channel_filter const **channel_filters, size_t num_channel_filters,
586 grpc_mdctx *mdctx) {
587 /* we just got a new transport: lets create a child channel stack for it */
588 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
589 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800590 size_t num_child_filters = 2 + num_channel_filters;
591 grpc_channel_filter const **child_filters;
592 grpc_transport_setup_result result;
ctiller82e275f2014-12-12 08:43:28 -0800593 grpc_child_channel *old_active = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800594 call_data **waiting_children;
595 size_t waiting_child_count;
596 size_t i;
Craig Tillerb7959a02015-06-25 08:50:54 -0700597 grpc_transport_stream_op *call_ops;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800598
599 /* build the child filter stack */
600 child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
601 /* we always need a link back filter to get back to the connected channel */
ctiller82e275f2014-12-12 08:43:28 -0800602 child_filters[0] = &grpc_child_channel_top_filter;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800603 for (i = 0; i < num_channel_filters; i++) {
604 child_filters[i + 1] = channel_filters[i];
605 }
606 /* and we always need a connected channel to talk to the transport */
607 child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
608
609 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
610
611 /* BEGIN LOCKING CHANNEL */
612 gpr_mu_lock(&chand->mu);
613 chand->transport_setup_initiated = 0;
614
ctiller82e275f2014-12-12 08:43:28 -0800615 if (chand->active_child) {
616 old_active = chand->active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617 }
ctiller82e275f2014-12-12 08:43:28 -0800618 chand->active_child = grpc_child_channel_create(
619 elem, child_filters, num_child_filters, chand->args, mdctx);
620 result =
621 grpc_connected_channel_bind_transport(chand->active_child, transport);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622
623 /* capture the waiting children - we'll activate them outside the lock
624 to avoid re-entrancy problems */
625 waiting_children = chand->waiting_children;
626 waiting_child_count = chand->waiting_child_count;
627 /* bumping up inflight_requests here avoids taking a lock per rpc below */
628
629 chand->waiting_children = NULL;
630 chand->waiting_child_count = 0;
631 chand->waiting_child_capacity = 0;
632
Craig Tiller83f88d92015-04-21 16:02:05 -0700633 call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800634
635 for (i = 0; i < waiting_child_count; i++) {
Craig Tiller8b282cb2015-04-17 14:57:44 -0700636 call_ops[i] = waiting_children[i]->s.waiting_op;
ctiller82e275f2014-12-12 08:43:28 -0800637 if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800638 waiting_children[i] = NULL;
Craig Tillerb7959a02015-06-25 08:50:54 -0700639 grpc_transport_stream_op_finish_with_failure(&call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800640 }
641 }
642
643 /* END LOCKING CHANNEL */
644 gpr_mu_unlock(&chand->mu);
645
646 /* activate any pending operations - this is safe to do as we guarantee one
647 and only one write operation per request at the surface api - if we lose
648 that guarantee we need to do some curly locking here */
649 for (i = 0; i < waiting_child_count; i++) {
650 if (waiting_children[i]) {
ctiller82e275f2014-12-12 08:43:28 -0800651 complete_activate(waiting_children[i]->elem, &call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800652 }
653 }
654 gpr_free(waiting_children);
655 gpr_free(call_ops);
656 gpr_free(child_filters);
657
ctiller82e275f2014-12-12 08:43:28 -0800658 if (old_active) {
ctiller58393c22015-01-07 14:03:30 -0800659 grpc_child_channel_destroy(old_active, 1);
ctiller82e275f2014-12-12 08:43:28 -0800660 }
661
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800662 return result;
663}
Craig Tiller3f475422015-06-25 10:43:05 -0700664#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800665
Craig Tillerf5f17122015-06-25 08:47:26 -0700666void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
667 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800668 /* post construction initialization: set the transport setup pointer */
669 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
670 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700671 GPR_ASSERT(!chand->resolver);
672 chand->resolver = resolver;
Craig Tiller3f475422015-06-25 10:43:05 -0700673 grpc_resolver_ref(resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700674 grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
Craig Tiller190d3602015-02-18 09:23:38 -0800675}