blob: e66c5bf3664ed4f9786d6d01a8b641bc07733100 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
37
38#include "src/core/channel/channel_args.h"
ctiller82e275f2014-12-12 08:43:28 -080039#include "src/core/channel/child_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include "src/core/channel/connected_channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080041#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080042#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/alloc.h>
44#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/sync.h>
46#include <grpc/support/useful.h>
47
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048/* Client channel implementation */
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050typedef struct call_data call_data;
51
52typedef struct {
53 /* protects children, child_count, child_capacity, active_child,
54 transport_setup_initiated
55 does not protect channel stacks held by children
56 transport_setup is assumed to be set once during construction */
57 gpr_mu mu;
58
ctiller82e275f2014-12-12 08:43:28 -080059 /* the sending child (may be null) */
60 grpc_child_channel *active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061
62 /* calls waiting for a channel to be ready */
63 call_data **waiting_children;
64 size_t waiting_child_count;
65 size_t waiting_child_capacity;
66
67 /* transport setup for this channel */
68 grpc_transport_setup *transport_setup;
69 int transport_setup_initiated;
70
71 grpc_channel_args *args;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080072} channel_data;
73
74typedef enum {
75 CALL_CREATED,
76 CALL_WAITING,
77 CALL_ACTIVE,
78 CALL_CANCELLED
79} call_state;
80
81struct call_data {
82 /* owning element */
83 grpc_call_element *elem;
84
85 call_state state;
Craig Tiller6902ad22015-04-16 08:01:49 -070086 grpc_call_op_metadata pending_metadata;
87 gpr_uint32 pending_metadata_flags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 gpr_timespec deadline;
89 union {
90 struct {
91 /* our child call stack */
ctiller82e275f2014-12-12 08:43:28 -080092 grpc_child_call *child_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093 } active;
94 struct {
95 void (*on_complete)(void *user_data, grpc_op_error error);
96 void *on_complete_user_data;
97 gpr_uint32 start_flags;
ctillerd79b4862014-12-17 16:36:59 -080098 grpc_pollset *pollset;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099 } waiting;
100 } s;
101};
102
ctiller82e275f2014-12-12 08:43:28 -0800103static int prepare_activate(grpc_call_element *elem,
104 grpc_child_channel *on_child) {
105 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106 if (calld->state == CALL_CANCELLED) return 0;
107
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800108 /* no more access to calld->s.waiting allowed */
109 GPR_ASSERT(calld->state == CALL_WAITING);
110 calld->state = CALL_ACTIVE;
111
ctiller82e275f2014-12-12 08:43:28 -0800112 /* create a child call */
113 calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114
115 return 1;
116}
117
118static void do_nothing(void *ignored, grpc_op_error error) {}
119
ctiller82e275f2014-12-12 08:43:28 -0800120static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
ctillerf962f522014-12-10 15:28:27 -0800121 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800122 grpc_call_element *child_elem =
ctiller82e275f2014-12-12 08:43:28 -0800123 grpc_child_call_get_top_element(calld->s.active.child_call);
Craig Tiller6902ad22015-04-16 08:01:49 -0700124 grpc_call_op mop;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125
126 GPR_ASSERT(calld->state == CALL_ACTIVE);
127
128 /* sending buffered metadata down the stack before the start call */
Craig Tiller6902ad22015-04-16 08:01:49 -0700129 mop.type = GRPC_SEND_METADATA;
130 mop.dir = GRPC_CALL_DOWN;
131 mop.flags = calld->pending_metadata_flags;
132 mop.data.metadata = calld->pending_metadata;
133 mop.done_cb = do_nothing;
134 mop.user_data = NULL;
135 child_elem->filter->call_op(child_elem, elem, &mop);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136
137 /* continue the start call down the stack, this nees to happen after metadata
138 are flushed*/
ctillerf962f522014-12-10 15:28:27 -0800139 child_elem->filter->call_op(child_elem, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140}
141
ctillerf962f522014-12-10 15:28:27 -0800142static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
143 call_data *calld = elem->call_data;
144 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145 gpr_mu_lock(&chand->mu);
146 if (calld->state == CALL_CANCELLED) {
147 gpr_mu_unlock(&chand->mu);
148 op->done_cb(op->user_data, GRPC_OP_ERROR);
149 return;
150 }
151 GPR_ASSERT(calld->state == CALL_CREATED);
152 calld->state = CALL_WAITING;
153 if (chand->active_child) {
154 /* channel is connected - use the connected stack */
ctiller82e275f2014-12-12 08:43:28 -0800155 if (prepare_activate(elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156 gpr_mu_unlock(&chand->mu);
157 /* activate the request (pass it down) outside the lock */
ctiller82e275f2014-12-12 08:43:28 -0800158 complete_activate(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159 } else {
160 gpr_mu_unlock(&chand->mu);
161 }
162 } else {
163 /* check to see if we should initiate a connection (if we're not already),
164 but don't do so until outside the lock to avoid re-entrancy problems if
165 the callback is immediate */
166 int initiate_transport_setup = 0;
167 if (!chand->transport_setup_initiated) {
168 chand->transport_setup_initiated = 1;
169 initiate_transport_setup = 1;
170 }
171 /* add this call to the waiting set to be resumed once we have a child
172 channel stack, growing the waiting set if needed */
173 if (chand->waiting_child_count == chand->waiting_child_capacity) {
174 chand->waiting_child_capacity =
175 GPR_MAX(chand->waiting_child_capacity * 2, 8);
176 chand->waiting_children =
177 gpr_realloc(chand->waiting_children,
178 chand->waiting_child_capacity * sizeof(call_data *));
179 }
180 calld->s.waiting.on_complete = op->done_cb;
181 calld->s.waiting.on_complete_user_data = op->user_data;
182 calld->s.waiting.start_flags = op->flags;
ctillerd79b4862014-12-17 16:36:59 -0800183 calld->s.waiting.pollset = op->data.start.pollset;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800184 chand->waiting_children[chand->waiting_child_count++] = calld;
185 gpr_mu_unlock(&chand->mu);
186
187 /* finally initiate transport setup if needed */
188 if (initiate_transport_setup) {
189 grpc_transport_setup_initiate(chand->transport_setup);
190 }
191 }
192}
193
194static void remove_waiting_child(channel_data *chand, call_data *calld) {
195 size_t new_count;
196 size_t i;
197 for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
198 if (chand->waiting_children[i] == calld) continue;
199 chand->waiting_children[new_count++] = chand->waiting_children[i];
200 }
201 GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
202 new_count == chand->waiting_child_count);
203 chand->waiting_child_count = new_count;
204}
205
Craig Tiller8b976d02015-02-05 21:41:23 -0800206static void send_up_cancelled_ops(grpc_call_element *elem) {
207 grpc_call_op finish_op;
Craig Tiller8b976d02015-02-05 21:41:23 -0800208 /* send up a synthesized status */
Craig Tiller6902ad22015-04-16 08:01:49 -0700209 grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
Craig Tiller8b976d02015-02-05 21:41:23 -0800210 /* send up a finish */
211 finish_op.type = GRPC_RECV_FINISH;
212 finish_op.dir = GRPC_CALL_UP;
213 finish_op.flags = 0;
214 finish_op.done_cb = do_nothing;
215 finish_op.user_data = NULL;
216 grpc_call_next_op(elem, &finish_op);
217}
218
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
220 call_data *calld = elem->call_data;
221 channel_data *chand = elem->channel_data;
222 grpc_call_element *child_elem;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800223
224 gpr_mu_lock(&chand->mu);
225 switch (calld->state) {
226 case CALL_ACTIVE:
ctiller82e275f2014-12-12 08:43:28 -0800227 child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800228 gpr_mu_unlock(&chand->mu);
ctillerf962f522014-12-10 15:28:27 -0800229 child_elem->filter->call_op(child_elem, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800230 return; /* early out */
231 case CALL_WAITING:
232 remove_waiting_child(chand, calld);
Craig Tiller696fda92015-02-06 10:59:44 -0800233 calld->state = CALL_CANCELLED;
Craig Tiller8b976d02015-02-05 21:41:23 -0800234 gpr_mu_unlock(&chand->mu);
235 send_up_cancelled_ops(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800236 calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
237 GRPC_OP_ERROR);
Craig Tiller8b976d02015-02-05 21:41:23 -0800238 return; /* early out */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800239 case CALL_CREATED:
240 calld->state = CALL_CANCELLED;
241 gpr_mu_unlock(&chand->mu);
Craig Tiller8b976d02015-02-05 21:41:23 -0800242 send_up_cancelled_ops(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800243 return; /* early out */
244 case CALL_CANCELLED:
245 gpr_mu_unlock(&chand->mu);
246 return; /* early out */
247 }
248 gpr_log(GPR_ERROR, "should never reach here");
249 abort();
250}
251
ctillerf962f522014-12-10 15:28:27 -0800252static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
253 grpc_call_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800254 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800255 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
256 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
257
258 switch (op->type) {
259 case GRPC_SEND_METADATA:
Craig Tiller6902ad22015-04-16 08:01:49 -0700260 grpc_call_op_metadata_merge(&calld->pending_metadata, &op->data.metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800261 op->done_cb(op->user_data, GRPC_OP_OK);
262 break;
263 case GRPC_SEND_START:
264 /* filter out the start event to find which child to send on */
ctillerf962f522014-12-10 15:28:27 -0800265 start_rpc(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800266 break;
267 case GRPC_CANCEL_OP:
268 cancel_rpc(elem, op);
269 break;
ctillerbcd62592014-12-17 21:23:05 -0800270 case GRPC_SEND_MESSAGE:
271 case GRPC_SEND_FINISH:
272 case GRPC_REQUEST_DATA:
273 if (calld->state == CALL_ACTIVE) {
274 grpc_call_element *child_elem =
275 grpc_child_call_get_top_element(calld->s.active.child_call);
276 child_elem->filter->call_op(child_elem, elem, op);
277 } else {
278 op->done_cb(op->user_data, GRPC_OP_ERROR);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800279 }
280 break;
ctillerbcd62592014-12-17 21:23:05 -0800281 default:
282 GPR_ASSERT(op->dir == GRPC_CALL_UP);
283 grpc_call_next_op(elem, op);
284 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800285 }
286}
287
ctillerf962f522014-12-10 15:28:27 -0800288static void channel_op(grpc_channel_element *elem,
289 grpc_channel_element *from_elem, grpc_channel_op *op) {
ctiller82e275f2014-12-12 08:43:28 -0800290 channel_data *chand = elem->channel_data;
291 grpc_child_channel *child_channel;
Craig Tillerda669372015-02-05 10:10:15 -0800292 grpc_channel_op rop;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800293 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
294
295 switch (op->type) {
ctiller82e275f2014-12-12 08:43:28 -0800296 case GRPC_CHANNEL_GOAWAY:
ctillerc6d61c42014-12-15 14:52:08 -0800297 /* sending goaway: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800298 gpr_mu_lock(&chand->mu);
299 child_channel = chand->active_child;
300 chand->active_child = NULL;
301 gpr_mu_unlock(&chand->mu);
302 if (child_channel) {
303 grpc_child_channel_handle_op(child_channel, op);
ctiller58393c22015-01-07 14:03:30 -0800304 grpc_child_channel_destroy(child_channel, 1);
ctiller82e275f2014-12-12 08:43:28 -0800305 } else {
306 gpr_slice_unref(op->data.goaway.message);
307 }
308 break;
309 case GRPC_CHANNEL_DISCONNECT:
ctillerc6d61c42014-12-15 14:52:08 -0800310 /* sending disconnect: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800311 gpr_mu_lock(&chand->mu);
312 child_channel = chand->active_child;
313 chand->active_child = NULL;
314 gpr_mu_unlock(&chand->mu);
315 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800316 grpc_child_channel_destroy(child_channel, 1);
ctillerc6d61c42014-12-15 14:52:08 -0800317 }
Craig Tillerda669372015-02-05 10:10:15 -0800318 /* fake a transport closed to satisfy the refcounting in client */
319 rop.type = GRPC_TRANSPORT_CLOSED;
320 rop.dir = GRPC_CALL_UP;
321 grpc_channel_next_op(elem, &rop);
ctillerc6d61c42014-12-15 14:52:08 -0800322 break;
323 case GRPC_TRANSPORT_GOAWAY:
324 /* receiving goaway: if it's from our active child, drop the active child;
325 in all cases consume the event here */
326 gpr_mu_lock(&chand->mu);
327 child_channel = grpc_channel_stack_from_top_element(from_elem);
328 if (child_channel == chand->active_child) {
329 chand->active_child = NULL;
330 } else {
331 child_channel = NULL;
332 }
333 gpr_mu_unlock(&chand->mu);
334 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800335 grpc_child_channel_destroy(child_channel, 0);
ctillerc6d61c42014-12-15 14:52:08 -0800336 }
337 gpr_slice_unref(op->data.goaway.message);
338 break;
339 case GRPC_TRANSPORT_CLOSED:
340 /* receiving disconnect: if it's from our active child, drop the active
341 child; in all cases consume the event here */
342 gpr_mu_lock(&chand->mu);
343 child_channel = grpc_channel_stack_from_top_element(from_elem);
344 if (child_channel == chand->active_child) {
345 chand->active_child = NULL;
346 } else {
347 child_channel = NULL;
348 }
349 gpr_mu_unlock(&chand->mu);
350 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800351 grpc_child_channel_destroy(child_channel, 0);
ctiller82e275f2014-12-12 08:43:28 -0800352 }
353 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800354 default:
355 switch (op->dir) {
356 case GRPC_CALL_UP:
357 grpc_channel_next_op(elem, op);
358 break;
359 case GRPC_CALL_DOWN:
ctiller82e275f2014-12-12 08:43:28 -0800360 gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
361 abort();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800362 break;
363 }
364 break;
365 }
366}
367
368static void error_bad_on_complete(void *arg, grpc_op_error error) {
369 gpr_log(GPR_ERROR,
370 "Waiting finished but not started? Bad on_complete callback");
371 abort();
372}
373
374/* Constructor for call_data */
375static void init_call_elem(grpc_call_element *elem,
376 const void *server_transport_data) {
377 call_data *calld = elem->call_data;
378
379 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
380 GPR_ASSERT(server_transport_data == NULL);
381 calld->elem = elem;
382 calld->state = CALL_CREATED;
383 calld->deadline = gpr_inf_future;
384 calld->s.waiting.on_complete = error_bad_on_complete;
385 calld->s.waiting.on_complete_user_data = NULL;
Craig Tiller6902ad22015-04-16 08:01:49 -0700386 grpc_call_op_metadata_init(&calld->pending_metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387}
388
389/* Destructor for call_data */
390static void destroy_call_elem(grpc_call_element *elem) {
391 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800392
393 /* if the metadata buffer is not flushed, destroy it here. */
Craig Tiller6902ad22015-04-16 08:01:49 -0700394 grpc_call_op_metadata_destroy(&calld->pending_metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800395 /* if the call got activated, we need to destroy the child stack also, and
396 remove it from the in-flight requests tracked by the child_entry we
397 picked */
398 if (calld->state == CALL_ACTIVE) {
ctiller82e275f2014-12-12 08:43:28 -0800399 grpc_child_call_destroy(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400 }
401}
402
403/* Constructor for channel_data */
404static void init_channel_elem(grpc_channel_element *elem,
405 const grpc_channel_args *args,
406 grpc_mdctx *metadata_context, int is_first,
407 int is_last) {
408 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409
410 GPR_ASSERT(!is_first);
411 GPR_ASSERT(is_last);
412 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
413
414 gpr_mu_init(&chand->mu);
415 chand->active_child = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800416 chand->waiting_children = NULL;
417 chand->waiting_child_count = 0;
418 chand->waiting_child_capacity = 0;
419 chand->transport_setup = NULL;
420 chand->transport_setup_initiated = 0;
421 chand->args = grpc_channel_args_copy(args);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800422}
423
424/* Destructor for channel_data */
425static void destroy_channel_elem(grpc_channel_element *elem) {
426 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800427
428 grpc_transport_setup_cancel(chand->transport_setup);
429
ctiller82e275f2014-12-12 08:43:28 -0800430 if (chand->active_child) {
ctiller58393c22015-01-07 14:03:30 -0800431 grpc_child_channel_destroy(chand->active_child, 1);
ctiller82e275f2014-12-12 08:43:28 -0800432 chand->active_child = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800433 }
434
435 grpc_channel_args_destroy(chand->args);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436
437 gpr_mu_destroy(&chand->mu);
438 GPR_ASSERT(chand->waiting_child_count == 0);
439 gpr_free(chand->waiting_children);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440}
441
442const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller87d5b192015-04-16 14:37:57 -0700443 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
444 sizeof(channel_data), init_channel_elem, destroy_channel_elem,
445 "client-channel",
446};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800447
448grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
449 grpc_channel_stack *channel_stack, grpc_transport *transport,
450 grpc_channel_filter const **channel_filters, size_t num_channel_filters,
451 grpc_mdctx *mdctx) {
452 /* we just got a new transport: lets create a child channel stack for it */
453 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
454 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800455 size_t num_child_filters = 2 + num_channel_filters;
456 grpc_channel_filter const **child_filters;
457 grpc_transport_setup_result result;
ctiller82e275f2014-12-12 08:43:28 -0800458 grpc_child_channel *old_active = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800459 call_data **waiting_children;
460 size_t waiting_child_count;
461 size_t i;
462 grpc_call_op *call_ops;
463
464 /* build the child filter stack */
465 child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
466 /* we always need a link back filter to get back to the connected channel */
ctiller82e275f2014-12-12 08:43:28 -0800467 child_filters[0] = &grpc_child_channel_top_filter;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468 for (i = 0; i < num_channel_filters; i++) {
469 child_filters[i + 1] = channel_filters[i];
470 }
471 /* and we always need a connected channel to talk to the transport */
472 child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
473
474 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
475
476 /* BEGIN LOCKING CHANNEL */
477 gpr_mu_lock(&chand->mu);
478 chand->transport_setup_initiated = 0;
479
ctiller82e275f2014-12-12 08:43:28 -0800480 if (chand->active_child) {
481 old_active = chand->active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800482 }
ctiller82e275f2014-12-12 08:43:28 -0800483 chand->active_child = grpc_child_channel_create(
484 elem, child_filters, num_child_filters, chand->args, mdctx);
485 result =
486 grpc_connected_channel_bind_transport(chand->active_child, transport);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800487
488 /* capture the waiting children - we'll activate them outside the lock
489 to avoid re-entrancy problems */
490 waiting_children = chand->waiting_children;
491 waiting_child_count = chand->waiting_child_count;
492 /* bumping up inflight_requests here avoids taking a lock per rpc below */
493
494 chand->waiting_children = NULL;
495 chand->waiting_child_count = 0;
496 chand->waiting_child_capacity = 0;
497
498 call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
499
500 for (i = 0; i < waiting_child_count; i++) {
501 call_ops[i].type = GRPC_SEND_START;
502 call_ops[i].dir = GRPC_CALL_DOWN;
503 call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
504 call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
505 call_ops[i].user_data =
506 waiting_children[i]->s.waiting.on_complete_user_data;
ctillerd79b4862014-12-17 16:36:59 -0800507 call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
ctiller82e275f2014-12-12 08:43:28 -0800508 if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509 waiting_children[i] = NULL;
510 call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
511 }
512 }
513
514 /* END LOCKING CHANNEL */
515 gpr_mu_unlock(&chand->mu);
516
517 /* activate any pending operations - this is safe to do as we guarantee one
518 and only one write operation per request at the surface api - if we lose
519 that guarantee we need to do some curly locking here */
520 for (i = 0; i < waiting_child_count; i++) {
521 if (waiting_children[i]) {
ctiller82e275f2014-12-12 08:43:28 -0800522 complete_activate(waiting_children[i]->elem, &call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523 }
524 }
525 gpr_free(waiting_children);
526 gpr_free(call_ops);
527 gpr_free(child_filters);
528
ctiller82e275f2014-12-12 08:43:28 -0800529 if (old_active) {
ctiller58393c22015-01-07 14:03:30 -0800530 grpc_child_channel_destroy(old_active, 1);
ctiller82e275f2014-12-12 08:43:28 -0800531 }
532
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800533 return result;
534}
535
536void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
537 grpc_transport_setup *setup) {
538 /* post construction initialization: set the transport setup pointer */
539 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
540 channel_data *chand = elem->channel_data;
541 GPR_ASSERT(!chand->transport_setup);
542 chand->transport_setup = setup;
Craig Tiller190d3602015-02-18 09:23:38 -0800543}