blob: 965d4e53dc61e256d8e76f2170c11bcd9fc6fa0b [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070037#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include "src/core/channel/channel_args.h"
40#include "src/core/channel/connected_channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080041#include "src/core/iomgr/iomgr.h"
Craig Tillerfe4ba362015-05-08 09:47:18 -070042#include "src/core/iomgr/pollset_set.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include <grpc/support/alloc.h>
45#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046#include <grpc/support/sync.h>
47#include <grpc/support/useful.h>
48
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049/* Client channel implementation */
50
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051typedef struct call_data call_data;
52
53typedef struct {
Craig Tillerf5f17122015-06-25 08:47:26 -070054 /** metadata context for this channel */
Craig Tillere70413c2015-04-24 10:12:34 -070055 grpc_mdctx *mdctx;
Craig Tillerf5f17122015-06-25 08:47:26 -070056 /** resolver for this channel */
57 grpc_resolver *resolver;
Craig Tillerf5f17122015-06-25 08:47:26 -070058
Craig Tillerf5f17122015-06-25 08:47:26 -070059 /** mutex protecting client configuration, resolution state */
60 gpr_mu mu_config;
Craig Tillerf5f17122015-06-25 08:47:26 -070061 /** currently active load balancer - guarded by mu_config */
62 grpc_lb_policy *lb_policy;
Craig Tillerf5f17122015-06-25 08:47:26 -070063 /** incoming configuration - set by resolver.next
64 guarded by mu_config */
65 grpc_client_config *incoming_configuration;
Craig Tiller3f475422015-06-25 10:43:05 -070066 /** a list of closures that are all waiting for config to come in */
67 grpc_iomgr_closure *waiting_for_config_closures;
68 /** resolver callback */
69 grpc_iomgr_closure on_config_changed;
70 /** connectivity state being tracked */
71 grpc_iomgr_closure *on_connectivity_state_change;
72 grpc_connectivity_state *connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080073} channel_data;
74
75typedef enum {
76 CALL_CREATED,
Craig Tiller3f475422015-06-25 10:43:05 -070077 CALL_WAITING_FOR_CONFIG,
78 CALL_WAITING_FOR_PICK,
Craig Tillereb3b12e2015-06-26 14:42:49 -070079 CALL_WAITING_FOR_CALL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080080 CALL_ACTIVE,
81 CALL_CANCELLED
82} call_state;
83
84struct call_data {
85 /* owning element */
86 grpc_call_element *elem;
87
Craig Tillerf5f17122015-06-25 08:47:26 -070088 gpr_mu mu_state;
89
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080090 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091 gpr_timespec deadline;
Craig Tillereb3b12e2015-06-26 14:42:49 -070092 grpc_subchannel *picked_channel;
93 grpc_iomgr_closure async_setup_task;
94 grpc_transport_stream_op waiting_op;
95 /* our child call stack */
96 grpc_subchannel_call *subchannel_call;
97 grpc_linked_mdelem status;
98 grpc_linked_mdelem details;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099};
100
Craig Tillerf5f17122015-06-25 08:47:26 -0700101#if 0
ctiller82e275f2014-12-12 08:43:28 -0800102static int prepare_activate(grpc_call_element *elem,
103 grpc_child_channel *on_child) {
104 call_data *calld = elem->call_data;
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700105 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106 if (calld->state == CALL_CANCELLED) return 0;
107
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800108 /* no more access to calld->s.waiting allowed */
109 GPR_ASSERT(calld->state == CALL_WAITING);
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700110
Craig Tillereb3b12e2015-06-26 14:42:49 -0700111 if (calld->waiting_op.bind_pollset) {
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700112 grpc_transport_setup_del_interested_party(chand->transport_setup,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700113 calld->waiting_op.bind_pollset);
Craig Tiller4cf08fb2015-06-10 12:56:24 -0700114 }
115
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800116 calld->state = CALL_ACTIVE;
117
ctiller82e275f2014-12-12 08:43:28 -0800118 /* create a child call */
Craig Tiller3f2c2212015-04-23 07:56:33 -0700119 /* TODO(ctiller): pass the waiting op down here */
Craig Tiller06aeea72015-04-23 10:54:45 -0700120 calld->s.active.child_call =
121 grpc_child_channel_create_call(on_child, elem, NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800122
123 return 1;
124}
125
Craig Tillerb7959a02015-06-25 08:50:54 -0700126static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
ctillerf962f522014-12-10 15:28:27 -0800127 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128 grpc_call_element *child_elem =
ctiller82e275f2014-12-12 08:43:28 -0800129 grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130
131 GPR_ASSERT(calld->state == CALL_ACTIVE);
132
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800133 /* continue the start call down the stack, this nees to happen after metadata
134 are flushed*/
Craig Tiller83f88d92015-04-21 16:02:05 -0700135 child_elem->filter->start_transport_op(child_elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136}
137
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138static void remove_waiting_child(channel_data *chand, call_data *calld) {
139 size_t new_count;
140 size_t i;
141 for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
Craig Tiller6e511802015-05-13 12:48:36 -0700142 if (chand->waiting_children[i] == calld) {
Craig Tiller83b826a2015-05-13 13:43:01 -0700143 grpc_transport_setup_del_interested_party(
Craig Tillereb3b12e2015-06-26 14:42:49 -0700144 chand->transport_setup, calld->waiting_op.bind_pollset);
Craig Tiller6e511802015-05-13 12:48:36 -0700145 continue;
146 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800147 chand->waiting_children[new_count++] = chand->waiting_children[i];
148 }
149 GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
150 new_count == chand->waiting_child_count);
151 chand->waiting_child_count = new_count;
152}
Craig Tillerf5f17122015-06-25 08:47:26 -0700153#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154
Craig Tiller1a727fd2015-04-24 13:21:22 -0700155static void handle_op_after_cancellation(grpc_call_element *elem,
Craig Tillerb7959a02015-06-25 08:50:54 -0700156 grpc_transport_stream_op *op) {
Craig Tillere70413c2015-04-24 10:12:34 -0700157 call_data *calld = elem->call_data;
158 channel_data *chand = elem->channel_data;
159 if (op->send_ops) {
Yang Gaodbf8fdc2015-05-28 00:52:31 -0700160 grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700161 op->on_done_send->cb(op->on_done_send->cb_arg, 0);
Craig Tillere70413c2015-04-24 10:12:34 -0700162 }
163 if (op->recv_ops) {
164 char status[GPR_LTOA_MIN_BUFSIZE];
165 grpc_metadata_batch mdb;
166 gpr_ltoa(GRPC_STATUS_CANCELLED, status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700167 calld->status.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700168 grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700169 calld->details.md =
Craig Tiller1a727fd2015-04-24 13:21:22 -0700170 grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700171 calld->status.prev = calld->details.next = NULL;
172 calld->status.next = &calld->details;
173 calld->details.prev = &calld->status;
174 mdb.list.head = &calld->status;
175 mdb.list.tail = &calld->details;
Craig Tillere70413c2015-04-24 10:12:34 -0700176 mdb.garbage.head = mdb.garbage.tail = NULL;
177 mdb.deadline = gpr_inf_future;
178 grpc_sopb_add_metadata(op->recv_ops, mdb);
179 *op->recv_state = GRPC_STREAM_CLOSED;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700180 op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
Craig Tillere70413c2015-04-24 10:12:34 -0700181 }
Craig Tiller5dde66e2015-06-02 09:05:23 -0700182 if (op->on_consumed) {
Craig Tiller1e6facb2015-06-11 22:47:11 -0700183 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700184 }
Craig Tillere70413c2015-04-24 10:12:34 -0700185}
Craig Tiller8b976d02015-02-05 21:41:23 -0800186
Craig Tillereb3b12e2015-06-26 14:42:49 -0700187typedef struct {
188 grpc_iomgr_closure closure;
189 grpc_call_element *elem;
190} waiting_call;
191
192static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
193
194static void continue_with_pick(void *arg, int iomgr_success) {
195 waiting_call *wc = arg;
196 call_data *calld = wc->elem->call_data;
197 perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
198 gpr_free(wc);
199}
200
201static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
202 channel_data *chand = elem->channel_data;
203 waiting_call *wc = gpr_malloc(sizeof(*wc));
204 grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
205 wc->elem = elem;
206 wc->closure.next = chand->waiting_for_config_closures;
207 chand->waiting_for_config_closures = &wc->closure;
208}
209
210static int is_empty(void *p, int len) {
211 char *ptr = p;
212 int i;
213 for (i = 0; i < len; i++) {
214 if (ptr[i] != 0) return 0;
215 }
216 return 1;
217}
218
219static void started_call(void *arg, int iomgr_success) {
220 call_data *calld = arg;
221 grpc_transport_stream_op op;
222 int have_waiting;
223
224 gpr_mu_lock(&calld->mu_state);
225 if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
226 memset(&op, 0, sizeof(op));
227 op.cancel_with_status = GRPC_STATUS_CANCELLED;
228 gpr_mu_unlock(&calld->mu_state);
229 grpc_subchannel_call_process_op(calld->subchannel_call, &op);
230 } else if (calld->state == CALL_WAITING_FOR_CALL) {
231 have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
232 if (calld->subchannel_call != NULL) {
233 calld->state = CALL_ACTIVE;
234 gpr_mu_unlock(&calld->mu_state);
235 if (have_waiting) {
236 grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
237 }
238 } else {
239 calld->state = CALL_CANCELLED;
240 gpr_mu_unlock(&calld->mu_state);
241 if (have_waiting) {
242 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
243 }
244 }
245 } else {
246 GPR_ASSERT(calld->state == CALL_CANCELLED);
247 }
248}
249
250static void picked_target(void *arg, int iomgr_success) {
251 call_data *calld = arg;
252 channel_data *chand = calld->elem->channel_data;
253 grpc_transport_stream_op op;
254
255 if (calld->picked_channel == NULL) {
256 /* treat this like a cancellation */
257 calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
258 perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
259 } else {
260 gpr_mu_lock(&calld->mu_state);
261 if (calld->state == CALL_CANCELLED) {
262 gpr_mu_unlock(&calld->mu_state);
263 handle_op_after_cancellation(calld->elem, &calld->waiting_op);
264 } else {
265 GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
266 calld->state = CALL_WAITING_FOR_CALL;
267 op = calld->waiting_op;
268 memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
269 gpr_mu_unlock(&calld->mu_state);
270 grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
271 grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task);
272 }
273 }
Craig Tillerf5f17122015-06-25 08:47:26 -0700274}
275
276static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700277 grpc_metadata_batch *initial_metadata;
278 grpc_transport_stream_op *op = &calld->waiting_op;
279
280 GPR_ASSERT(op->bind_pollset);
281 GPR_ASSERT(op->send_ops);
282 GPR_ASSERT(op->send_ops->nops >= 1);
283 GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
284 initial_metadata = &op->send_ops->ops[0].data.metadata;
285
286 grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
287 grpc_lb_policy_pick(lb_policy, op->bind_pollset,
288 initial_metadata, &calld->picked_channel, &calld->async_setup_task);
Craig Tillerf5f17122015-06-25 08:47:26 -0700289}
290
Craig Tillereb3b12e2015-06-26 14:42:49 -0700291static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292 call_data *calld = elem->call_data;
293 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700294 grpc_subchannel_call *subchannel_call;
295 grpc_lb_policy *lb_policy;
Craig Tillera9407522015-04-24 10:37:57 -0700296 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
297 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800298
Craig Tillerf5f17122015-06-25 08:47:26 -0700299 gpr_mu_lock(&calld->mu_state);
300 switch (calld->state) {
301 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700302 GPR_ASSERT(!continuation);
303 subchannel_call = calld->subchannel_call;
Craig Tillerf5f17122015-06-25 08:47:26 -0700304 gpr_mu_unlock(&calld->mu_state);
305 grpc_subchannel_call_process_op(subchannel_call, op);
Craig Tillerf5f17122015-06-25 08:47:26 -0700306 break;
307 case CALL_CANCELLED:
308 gpr_mu_unlock(&calld->mu_state);
309 handle_op_after_cancellation(elem, op);
310 break;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700311 case CALL_WAITING_FOR_CONFIG:
312 case CALL_WAITING_FOR_PICK:
313 case CALL_WAITING_FOR_CALL:
314 if (!continuation) {
315 if (op->cancel_with_status != GRPC_STATUS_OK) {
316 calld->state = CALL_CANCELLED;
317 gpr_mu_unlock(&calld->mu_state);
318 handle_op_after_cancellation(elem, op);
319 } else {
320 GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
321 (op->send_ops == NULL));
322 GPR_ASSERT((calld->waiting_op.recv_ops == NULL) !=
323 (op->recv_ops == NULL));
324 if (op->send_ops != NULL) {
325 calld->waiting_op.send_ops = op->send_ops;
326 calld->waiting_op.is_last_send = op->is_last_send;
327 calld->waiting_op.on_done_send = op->on_done_send;
328 }
329 if (op->recv_ops != NULL) {
330 calld->waiting_op.recv_ops = op->recv_ops;
331 calld->waiting_op.recv_state = op->recv_state;
332 calld->waiting_op.on_done_recv = op->on_done_recv;
333 }
334 gpr_mu_unlock(&calld->mu_state);
335 if (op->on_consumed != NULL) {
336 op->on_consumed->cb(op->on_consumed->cb_arg, 0);
337 }
338 }
339 break;
340 }
341 /* fall through */
Craig Tillerf5f17122015-06-25 08:47:26 -0700342 case CALL_CREATED:
343 if (op->cancel_with_status != GRPC_STATUS_OK) {
344 calld->state = CALL_CANCELLED;
345 gpr_mu_unlock(&calld->mu_state);
346 handle_op_after_cancellation(elem, op);
347 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700348 calld->waiting_op = *op;
Craig Tillerf5f17122015-06-25 08:47:26 -0700349
350 gpr_mu_lock(&chand->mu_config);
351 lb_policy = chand->lb_policy;
352 if (lb_policy) {
353 grpc_lb_policy_ref(lb_policy);
354 gpr_mu_unlock(&chand->mu_config);
Craig Tiller3f475422015-06-25 10:43:05 -0700355 calld->state = CALL_WAITING_FOR_PICK;
Craig Tillerf5f17122015-06-25 08:47:26 -0700356 gpr_mu_unlock(&calld->mu_state);
357
358 pick_target(lb_policy, calld);
359
360 grpc_lb_policy_unref(lb_policy);
361 } else {
Craig Tiller3f475422015-06-25 10:43:05 -0700362 calld->state = CALL_WAITING_FOR_CONFIG;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700363 add_to_lb_policy_wait_queue_locked_state_config(elem);
Craig Tillerf5f17122015-06-25 08:47:26 -0700364 gpr_mu_unlock(&chand->mu_config);
365 gpr_mu_unlock(&calld->mu_state);
366 }
367 }
368 break;
Craig Tillerf5f17122015-06-25 08:47:26 -0700369 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700370}
Craig Tillerf5f17122015-06-25 08:47:26 -0700371
Craig Tillereb3b12e2015-06-26 14:42:49 -0700372static void cc_start_transport_stream_op(grpc_call_element *elem,
373 grpc_transport_stream_op *op) {
374 perform_transport_stream_op(elem, op, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375}
376
Craig Tiller3f475422015-06-25 10:43:05 -0700377static void update_state_locked(channel_data *chand) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700378 gpr_log(GPR_ERROR, "update_state_locked not implemented");
Craig Tiller3f475422015-06-25 10:43:05 -0700379}
380
381static void cc_on_config_changed(void *arg, int iomgr_success) {
382 channel_data *chand = arg;
383 grpc_lb_policy *lb_policy = NULL;
384 grpc_lb_policy *old_lb_policy;
385 grpc_resolver *old_resolver;
386 grpc_iomgr_closure *wakeup_closures = NULL;
387
388 if (chand->incoming_configuration) {
389 lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
390 grpc_lb_policy_ref(lb_policy);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700391
392 grpc_client_config_unref(chand->incoming_configuration);
Craig Tiller3f475422015-06-25 10:43:05 -0700393 }
394
Craig Tiller3f475422015-06-25 10:43:05 -0700395 chand->incoming_configuration = NULL;
396
397 gpr_mu_lock(&chand->mu_config);
398 old_lb_policy = chand->lb_policy;
399 chand->lb_policy = lb_policy;
400 if (lb_policy != NULL) {
401 wakeup_closures = chand->waiting_for_config_closures;
402 chand->waiting_for_config_closures = NULL;
403 }
404 gpr_mu_unlock(&chand->mu_config);
405
406 while (wakeup_closures) {
407 grpc_iomgr_closure *next = wakeup_closures->next;
408 grpc_iomgr_add_callback(wakeup_closures);
409 wakeup_closures = next;
410 }
411
Craig Tillereb3b12e2015-06-26 14:42:49 -0700412 if (old_lb_policy) {
413 grpc_lb_policy_unref(old_lb_policy);
414 }
Craig Tiller3f475422015-06-25 10:43:05 -0700415
416 if (iomgr_success) {
417 grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
418 } else {
419 gpr_mu_lock(&chand->mu_config);
420 old_resolver = chand->resolver;
421 chand->resolver = NULL;
422 update_state_locked(chand);
423 gpr_mu_unlock(&chand->mu_config);
424 grpc_resolver_unref(old_resolver);
425 }
426}
427
428#if 0
ctillerf962f522014-12-10 15:28:27 -0800429static void channel_op(grpc_channel_element *elem,
430 grpc_channel_element *from_elem, grpc_channel_op *op) {
ctiller82e275f2014-12-12 08:43:28 -0800431 channel_data *chand = elem->channel_data;
432 grpc_child_channel *child_channel;
Craig Tillerda669372015-02-05 10:10:15 -0800433 grpc_channel_op rop;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800434 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
435
436 switch (op->type) {
ctiller82e275f2014-12-12 08:43:28 -0800437 case GRPC_CHANNEL_GOAWAY:
ctillerc6d61c42014-12-15 14:52:08 -0800438 /* sending goaway: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800439 gpr_mu_lock(&chand->mu);
440 child_channel = chand->active_child;
441 chand->active_child = NULL;
442 gpr_mu_unlock(&chand->mu);
443 if (child_channel) {
444 grpc_child_channel_handle_op(child_channel, op);
ctiller58393c22015-01-07 14:03:30 -0800445 grpc_child_channel_destroy(child_channel, 1);
ctiller82e275f2014-12-12 08:43:28 -0800446 } else {
447 gpr_slice_unref(op->data.goaway.message);
448 }
449 break;
450 case GRPC_CHANNEL_DISCONNECT:
ctillerc6d61c42014-12-15 14:52:08 -0800451 /* sending disconnect: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800452 gpr_mu_lock(&chand->mu);
453 child_channel = chand->active_child;
454 chand->active_child = NULL;
455 gpr_mu_unlock(&chand->mu);
456 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800457 grpc_child_channel_destroy(child_channel, 1);
ctillerc6d61c42014-12-15 14:52:08 -0800458 }
Craig Tillerda669372015-02-05 10:10:15 -0800459 /* fake a transport closed to satisfy the refcounting in client */
460 rop.type = GRPC_TRANSPORT_CLOSED;
461 rop.dir = GRPC_CALL_UP;
462 grpc_channel_next_op(elem, &rop);
ctillerc6d61c42014-12-15 14:52:08 -0800463 break;
464 case GRPC_TRANSPORT_GOAWAY:
465 /* receiving goaway: if it's from our active child, drop the active child;
466 in all cases consume the event here */
467 gpr_mu_lock(&chand->mu);
468 child_channel = grpc_channel_stack_from_top_element(from_elem);
469 if (child_channel == chand->active_child) {
470 chand->active_child = NULL;
471 } else {
472 child_channel = NULL;
473 }
474 gpr_mu_unlock(&chand->mu);
475 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800476 grpc_child_channel_destroy(child_channel, 0);
ctillerc6d61c42014-12-15 14:52:08 -0800477 }
478 gpr_slice_unref(op->data.goaway.message);
479 break;
480 case GRPC_TRANSPORT_CLOSED:
481 /* receiving disconnect: if it's from our active child, drop the active
482 child; in all cases consume the event here */
483 gpr_mu_lock(&chand->mu);
484 child_channel = grpc_channel_stack_from_top_element(from_elem);
485 if (child_channel == chand->active_child) {
486 chand->active_child = NULL;
487 } else {
488 child_channel = NULL;
489 }
490 gpr_mu_unlock(&chand->mu);
491 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800492 grpc_child_channel_destroy(child_channel, 0);
ctiller82e275f2014-12-12 08:43:28 -0800493 }
494 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800495 default:
496 switch (op->dir) {
497 case GRPC_CALL_UP:
498 grpc_channel_next_op(elem, op);
499 break;
500 case GRPC_CALL_DOWN:
ctiller82e275f2014-12-12 08:43:28 -0800501 gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
502 abort();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503 break;
504 }
505 break;
506 }
507}
Craig Tiller3f475422015-06-25 10:43:05 -0700508#endif
509
510static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800511
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800512/* Constructor for call_data */
513static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700514 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700515 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800516 call_data *calld = elem->call_data;
517
Craig Tiller50d9db52015-04-23 10:52:14 -0700518 /* TODO(ctiller): is there something useful we can do here? */
519 GPR_ASSERT(initial_op == NULL);
520
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800521 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
522 GPR_ASSERT(server_transport_data == NULL);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700523 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800524 calld->elem = elem;
525 calld->state = CALL_CREATED;
526 calld->deadline = gpr_inf_future;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800527}
528
529/* Destructor for call_data */
530static void destroy_call_elem(grpc_call_element *elem) {
531 call_data *calld = elem->call_data;
Craig Tiller3f475422015-06-25 10:43:05 -0700532 grpc_subchannel_call *subchannel_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800533
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800534 /* if the call got activated, we need to destroy the child stack also, and
535 remove it from the in-flight requests tracked by the child_entry we
536 picked */
Craig Tiller3f475422015-06-25 10:43:05 -0700537 gpr_mu_lock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700538 switch (calld->state) {
539 case CALL_ACTIVE:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700540 subchannel_call = calld->subchannel_call;
Craig Tiller3f475422015-06-25 10:43:05 -0700541 gpr_mu_unlock(&calld->mu_state);
542 grpc_subchannel_call_unref(subchannel_call);
Craig Tillerf93fd052015-06-02 08:15:33 -0700543 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700544 case CALL_CREATED:
545 case CALL_CANCELLED:
546 gpr_mu_unlock(&calld->mu_state);
Craig Tillerf93fd052015-06-02 08:15:33 -0700547 break;
Craig Tiller3f475422015-06-25 10:43:05 -0700548 case CALL_WAITING_FOR_PICK:
549 case CALL_WAITING_FOR_CONFIG:
Craig Tillereb3b12e2015-06-26 14:42:49 -0700550 case CALL_WAITING_FOR_CALL:
Craig Tiller3f475422015-06-25 10:43:05 -0700551 gpr_log(GPR_ERROR, "should never reach here");
552 abort();
Craig Tillerf93fd052015-06-02 08:15:33 -0700553 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800554 }
555}
556
557/* Constructor for channel_data */
558static void init_channel_elem(grpc_channel_element *elem,
559 const grpc_channel_args *args,
560 grpc_mdctx *metadata_context, int is_first,
561 int is_last) {
562 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800563
Craig Tillereb3b12e2015-06-26 14:42:49 -0700564 memset(chand, 0, sizeof(*chand));
565
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566 GPR_ASSERT(is_last);
567 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
568
Craig Tiller3f475422015-06-25 10:43:05 -0700569 gpr_mu_init(&chand->mu_config);
Craig Tillere70413c2015-04-24 10:12:34 -0700570 chand->mdctx = metadata_context;
Craig Tiller3f475422015-06-25 10:43:05 -0700571 grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800572}
573
574/* Destructor for channel_data */
575static void destroy_channel_elem(grpc_channel_element *elem) {
576 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800577
Craig Tiller3f475422015-06-25 10:43:05 -0700578 if (chand->resolver) {
579 grpc_resolver_unref(chand->resolver);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800580 }
Craig Tiller3f475422015-06-25 10:43:05 -0700581 if (chand->lb_policy) {
582 grpc_lb_policy_unref(chand->lb_policy);
583 }
584 gpr_mu_destroy(&chand->mu_config);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800585}
586
587const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller3f475422015-06-25 10:43:05 -0700588 cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
Craig Tiller83b826a2015-05-13 13:43:01 -0700589 init_call_elem, destroy_call_elem, sizeof(channel_data),
590 init_channel_elem, destroy_channel_elem, "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -0700591};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800592
Craig Tiller3f475422015-06-25 10:43:05 -0700593#if 0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800594grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
595 grpc_channel_stack *channel_stack, grpc_transport *transport,
596 grpc_channel_filter const **channel_filters, size_t num_channel_filters,
597 grpc_mdctx *mdctx) {
598 /* we just got a new transport: lets create a child channel stack for it */
599 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
600 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800601 size_t num_child_filters = 2 + num_channel_filters;
602 grpc_channel_filter const **child_filters;
603 grpc_transport_setup_result result;
ctiller82e275f2014-12-12 08:43:28 -0800604 grpc_child_channel *old_active = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800605 call_data **waiting_children;
606 size_t waiting_child_count;
607 size_t i;
Craig Tillerb7959a02015-06-25 08:50:54 -0700608 grpc_transport_stream_op *call_ops;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800609
610 /* build the child filter stack */
611 child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
612 /* we always need a link back filter to get back to the connected channel */
ctiller82e275f2014-12-12 08:43:28 -0800613 child_filters[0] = &grpc_child_channel_top_filter;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800614 for (i = 0; i < num_channel_filters; i++) {
615 child_filters[i + 1] = channel_filters[i];
616 }
617 /* and we always need a connected channel to talk to the transport */
618 child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
619
620 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
621
622 /* BEGIN LOCKING CHANNEL */
623 gpr_mu_lock(&chand->mu);
624 chand->transport_setup_initiated = 0;
625
ctiller82e275f2014-12-12 08:43:28 -0800626 if (chand->active_child) {
627 old_active = chand->active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800628 }
ctiller82e275f2014-12-12 08:43:28 -0800629 chand->active_child = grpc_child_channel_create(
630 elem, child_filters, num_child_filters, chand->args, mdctx);
631 result =
632 grpc_connected_channel_bind_transport(chand->active_child, transport);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800633
634 /* capture the waiting children - we'll activate them outside the lock
635 to avoid re-entrancy problems */
636 waiting_children = chand->waiting_children;
637 waiting_child_count = chand->waiting_child_count;
638 /* bumping up inflight_requests here avoids taking a lock per rpc below */
639
640 chand->waiting_children = NULL;
641 chand->waiting_child_count = 0;
642 chand->waiting_child_capacity = 0;
643
Craig Tiller83f88d92015-04-21 16:02:05 -0700644 call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800645
646 for (i = 0; i < waiting_child_count; i++) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700647 call_ops[i] = waiting_children[i]->waiting_op;
ctiller82e275f2014-12-12 08:43:28 -0800648 if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800649 waiting_children[i] = NULL;
Craig Tillerb7959a02015-06-25 08:50:54 -0700650 grpc_transport_stream_op_finish_with_failure(&call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800651 }
652 }
653
654 /* END LOCKING CHANNEL */
655 gpr_mu_unlock(&chand->mu);
656
657 /* activate any pending operations - this is safe to do as we guarantee one
658 and only one write operation per request at the surface api - if we lose
659 that guarantee we need to do some curly locking here */
660 for (i = 0; i < waiting_child_count; i++) {
661 if (waiting_children[i]) {
ctiller82e275f2014-12-12 08:43:28 -0800662 complete_activate(waiting_children[i]->elem, &call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800663 }
664 }
665 gpr_free(waiting_children);
666 gpr_free(call_ops);
667 gpr_free(child_filters);
668
ctiller82e275f2014-12-12 08:43:28 -0800669 if (old_active) {
ctiller58393c22015-01-07 14:03:30 -0800670 grpc_child_channel_destroy(old_active, 1);
ctiller82e275f2014-12-12 08:43:28 -0800671 }
672
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800673 return result;
674}
Craig Tiller3f475422015-06-25 10:43:05 -0700675#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800676
Craig Tillerf5f17122015-06-25 08:47:26 -0700677void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
678 grpc_resolver *resolver) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800679 /* post construction initialization: set the transport setup pointer */
680 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
681 channel_data *chand = elem->channel_data;
Craig Tillerf5f17122015-06-25 08:47:26 -0700682 GPR_ASSERT(!chand->resolver);
683 chand->resolver = resolver;
Craig Tiller3f475422015-06-25 10:43:05 -0700684 grpc_resolver_ref(resolver);
Craig Tillerf5f17122015-06-25 08:47:26 -0700685 grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
Craig Tiller190d3602015-02-18 09:23:38 -0800686}