blob: bc481e59ca50a521198c25edf9b8eb5f0d9342ed [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/client_channel.h"
35
36#include <stdio.h>
37
38#include "src/core/channel/channel_args.h"
ctiller82e275f2014-12-12 08:43:28 -080039#include "src/core/channel/child_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include "src/core/channel/connected_channel.h"
ctillerc6d61c42014-12-15 14:52:08 -080041#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080042#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <grpc/support/alloc.h>
44#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <grpc/support/sync.h>
46#include <grpc/support/useful.h>
47
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048/* Client channel implementation */
49
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050typedef struct call_data call_data;
51
52typedef struct {
53 /* protects children, child_count, child_capacity, active_child,
54 transport_setup_initiated
55 does not protect channel stacks held by children
56 transport_setup is assumed to be set once during construction */
57 gpr_mu mu;
58
ctiller82e275f2014-12-12 08:43:28 -080059 /* the sending child (may be null) */
60 grpc_child_channel *active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061
62 /* calls waiting for a channel to be ready */
63 call_data **waiting_children;
64 size_t waiting_child_count;
65 size_t waiting_child_capacity;
66
67 /* transport setup for this channel */
68 grpc_transport_setup *transport_setup;
69 int transport_setup_initiated;
70
71 grpc_channel_args *args;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080072} channel_data;
73
74typedef enum {
75 CALL_CREATED,
76 CALL_WAITING,
77 CALL_ACTIVE,
78 CALL_CANCELLED
79} call_state;
80
81struct call_data {
82 /* owning element */
83 grpc_call_element *elem;
84
Craig Tiller8b282cb2015-04-17 14:57:44 -070085 gpr_uint8 got_first_send;
86
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080087 call_state state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 gpr_timespec deadline;
89 union {
90 struct {
91 /* our child call stack */
ctiller82e275f2014-12-12 08:43:28 -080092 grpc_child_call *child_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093 } active;
Craig Tiller8b282cb2015-04-17 14:57:44 -070094 grpc_call_op waiting_op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080095 } s;
96};
97
ctiller82e275f2014-12-12 08:43:28 -080098static int prepare_activate(grpc_call_element *elem,
99 grpc_child_channel *on_child) {
100 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800101 if (calld->state == CALL_CANCELLED) return 0;
102
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800103 /* no more access to calld->s.waiting allowed */
104 GPR_ASSERT(calld->state == CALL_WAITING);
105 calld->state = CALL_ACTIVE;
106
ctiller82e275f2014-12-12 08:43:28 -0800107 /* create a child call */
108 calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109
110 return 1;
111}
112
113static void do_nothing(void *ignored, grpc_op_error error) {}
114
ctiller82e275f2014-12-12 08:43:28 -0800115static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
ctillerf962f522014-12-10 15:28:27 -0800116 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117 grpc_call_element *child_elem =
ctiller82e275f2014-12-12 08:43:28 -0800118 grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800119
120 GPR_ASSERT(calld->state == CALL_ACTIVE);
121
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800122 /* continue the start call down the stack, this nees to happen after metadata
123 are flushed*/
ctillerf962f522014-12-10 15:28:27 -0800124 child_elem->filter->call_op(child_elem, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125}
126
ctillerf962f522014-12-10 15:28:27 -0800127static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
128 call_data *calld = elem->call_data;
129 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130 gpr_mu_lock(&chand->mu);
131 if (calld->state == CALL_CANCELLED) {
132 gpr_mu_unlock(&chand->mu);
Craig Tillereb40a532015-04-17 16:46:20 -0700133 grpc_metadata_batch_destroy(&op->data.metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800134 op->done_cb(op->user_data, GRPC_OP_ERROR);
135 return;
136 }
137 GPR_ASSERT(calld->state == CALL_CREATED);
138 calld->state = CALL_WAITING;
139 if (chand->active_child) {
140 /* channel is connected - use the connected stack */
ctiller82e275f2014-12-12 08:43:28 -0800141 if (prepare_activate(elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142 gpr_mu_unlock(&chand->mu);
143 /* activate the request (pass it down) outside the lock */
ctiller82e275f2014-12-12 08:43:28 -0800144 complete_activate(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145 } else {
146 gpr_mu_unlock(&chand->mu);
147 }
148 } else {
149 /* check to see if we should initiate a connection (if we're not already),
150 but don't do so until outside the lock to avoid re-entrancy problems if
151 the callback is immediate */
152 int initiate_transport_setup = 0;
153 if (!chand->transport_setup_initiated) {
154 chand->transport_setup_initiated = 1;
155 initiate_transport_setup = 1;
156 }
157 /* add this call to the waiting set to be resumed once we have a child
158 channel stack, growing the waiting set if needed */
159 if (chand->waiting_child_count == chand->waiting_child_capacity) {
160 chand->waiting_child_capacity =
161 GPR_MAX(chand->waiting_child_capacity * 2, 8);
162 chand->waiting_children =
163 gpr_realloc(chand->waiting_children,
164 chand->waiting_child_capacity * sizeof(call_data *));
165 }
Craig Tiller8b282cb2015-04-17 14:57:44 -0700166 calld->s.waiting_op = *op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800167 chand->waiting_children[chand->waiting_child_count++] = calld;
168 gpr_mu_unlock(&chand->mu);
169
170 /* finally initiate transport setup if needed */
171 if (initiate_transport_setup) {
172 grpc_transport_setup_initiate(chand->transport_setup);
173 }
174 }
175}
176
177static void remove_waiting_child(channel_data *chand, call_data *calld) {
178 size_t new_count;
179 size_t i;
180 for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
181 if (chand->waiting_children[i] == calld) continue;
182 chand->waiting_children[new_count++] = chand->waiting_children[i];
183 }
184 GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
185 new_count == chand->waiting_child_count);
186 chand->waiting_child_count = new_count;
187}
188
Craig Tiller8b976d02015-02-05 21:41:23 -0800189static void send_up_cancelled_ops(grpc_call_element *elem) {
190 grpc_call_op finish_op;
Craig Tiller8b976d02015-02-05 21:41:23 -0800191 /* send up a synthesized status */
Craig Tiller6902ad22015-04-16 08:01:49 -0700192 grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
Craig Tiller8b976d02015-02-05 21:41:23 -0800193 /* send up a finish */
194 finish_op.type = GRPC_RECV_FINISH;
195 finish_op.dir = GRPC_CALL_UP;
196 finish_op.flags = 0;
197 finish_op.done_cb = do_nothing;
198 finish_op.user_data = NULL;
199 grpc_call_next_op(elem, &finish_op);
200}
201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
203 call_data *calld = elem->call_data;
204 channel_data *chand = elem->channel_data;
205 grpc_call_element *child_elem;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800206
207 gpr_mu_lock(&chand->mu);
208 switch (calld->state) {
209 case CALL_ACTIVE:
ctiller82e275f2014-12-12 08:43:28 -0800210 child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800211 gpr_mu_unlock(&chand->mu);
ctillerf962f522014-12-10 15:28:27 -0800212 child_elem->filter->call_op(child_elem, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213 return; /* early out */
214 case CALL_WAITING:
Craig Tillereb40a532015-04-17 16:46:20 -0700215 grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216 remove_waiting_child(chand, calld);
Craig Tiller696fda92015-02-06 10:59:44 -0800217 calld->state = CALL_CANCELLED;
Craig Tiller8b976d02015-02-05 21:41:23 -0800218 gpr_mu_unlock(&chand->mu);
219 send_up_cancelled_ops(elem);
Craig Tiller76f5d462015-04-17 14:58:12 -0700220 calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
Craig Tiller8b976d02015-02-05 21:41:23 -0800221 return; /* early out */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800222 case CALL_CREATED:
223 calld->state = CALL_CANCELLED;
224 gpr_mu_unlock(&chand->mu);
Craig Tiller8b976d02015-02-05 21:41:23 -0800225 send_up_cancelled_ops(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 return; /* early out */
227 case CALL_CANCELLED:
228 gpr_mu_unlock(&chand->mu);
229 return; /* early out */
230 }
231 gpr_log(GPR_ERROR, "should never reach here");
232 abort();
233}
234
ctillerf962f522014-12-10 15:28:27 -0800235static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
236 grpc_call_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800237 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
239 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
240
241 switch (op->type) {
242 case GRPC_SEND_METADATA:
Craig Tiller8b282cb2015-04-17 14:57:44 -0700243 if (!calld->got_first_send) {
244 /* filter out the start event to find which child to send on */
245 calld->got_first_send = 1;
246 start_rpc(elem, op);
247 } else {
248 grpc_call_next_op(elem, op);
249 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800250 break;
251 case GRPC_CANCEL_OP:
252 cancel_rpc(elem, op);
253 break;
ctillerbcd62592014-12-17 21:23:05 -0800254 case GRPC_SEND_MESSAGE:
255 case GRPC_SEND_FINISH:
256 case GRPC_REQUEST_DATA:
257 if (calld->state == CALL_ACTIVE) {
258 grpc_call_element *child_elem =
259 grpc_child_call_get_top_element(calld->s.active.child_call);
260 child_elem->filter->call_op(child_elem, elem, op);
261 } else {
262 op->done_cb(op->user_data, GRPC_OP_ERROR);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800263 }
264 break;
ctillerbcd62592014-12-17 21:23:05 -0800265 default:
266 GPR_ASSERT(op->dir == GRPC_CALL_UP);
267 grpc_call_next_op(elem, op);
268 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 }
270}
271
ctillerf962f522014-12-10 15:28:27 -0800272static void channel_op(grpc_channel_element *elem,
273 grpc_channel_element *from_elem, grpc_channel_op *op) {
ctiller82e275f2014-12-12 08:43:28 -0800274 channel_data *chand = elem->channel_data;
275 grpc_child_channel *child_channel;
Craig Tillerda669372015-02-05 10:10:15 -0800276 grpc_channel_op rop;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800277 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
278
279 switch (op->type) {
ctiller82e275f2014-12-12 08:43:28 -0800280 case GRPC_CHANNEL_GOAWAY:
ctillerc6d61c42014-12-15 14:52:08 -0800281 /* sending goaway: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800282 gpr_mu_lock(&chand->mu);
283 child_channel = chand->active_child;
284 chand->active_child = NULL;
285 gpr_mu_unlock(&chand->mu);
286 if (child_channel) {
287 grpc_child_channel_handle_op(child_channel, op);
ctiller58393c22015-01-07 14:03:30 -0800288 grpc_child_channel_destroy(child_channel, 1);
ctiller82e275f2014-12-12 08:43:28 -0800289 } else {
290 gpr_slice_unref(op->data.goaway.message);
291 }
292 break;
293 case GRPC_CHANNEL_DISCONNECT:
ctillerc6d61c42014-12-15 14:52:08 -0800294 /* sending disconnect: clear out the active child on the way through */
ctiller82e275f2014-12-12 08:43:28 -0800295 gpr_mu_lock(&chand->mu);
296 child_channel = chand->active_child;
297 chand->active_child = NULL;
298 gpr_mu_unlock(&chand->mu);
299 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800300 grpc_child_channel_destroy(child_channel, 1);
ctillerc6d61c42014-12-15 14:52:08 -0800301 }
Craig Tillerda669372015-02-05 10:10:15 -0800302 /* fake a transport closed to satisfy the refcounting in client */
303 rop.type = GRPC_TRANSPORT_CLOSED;
304 rop.dir = GRPC_CALL_UP;
305 grpc_channel_next_op(elem, &rop);
ctillerc6d61c42014-12-15 14:52:08 -0800306 break;
307 case GRPC_TRANSPORT_GOAWAY:
308 /* receiving goaway: if it's from our active child, drop the active child;
309 in all cases consume the event here */
310 gpr_mu_lock(&chand->mu);
311 child_channel = grpc_channel_stack_from_top_element(from_elem);
312 if (child_channel == chand->active_child) {
313 chand->active_child = NULL;
314 } else {
315 child_channel = NULL;
316 }
317 gpr_mu_unlock(&chand->mu);
318 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800319 grpc_child_channel_destroy(child_channel, 0);
ctillerc6d61c42014-12-15 14:52:08 -0800320 }
321 gpr_slice_unref(op->data.goaway.message);
322 break;
323 case GRPC_TRANSPORT_CLOSED:
324 /* receiving disconnect: if it's from our active child, drop the active
325 child; in all cases consume the event here */
326 gpr_mu_lock(&chand->mu);
327 child_channel = grpc_channel_stack_from_top_element(from_elem);
328 if (child_channel == chand->active_child) {
329 chand->active_child = NULL;
330 } else {
331 child_channel = NULL;
332 }
333 gpr_mu_unlock(&chand->mu);
334 if (child_channel) {
ctiller58393c22015-01-07 14:03:30 -0800335 grpc_child_channel_destroy(child_channel, 0);
ctiller82e275f2014-12-12 08:43:28 -0800336 }
337 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800338 default:
339 switch (op->dir) {
340 case GRPC_CALL_UP:
341 grpc_channel_next_op(elem, op);
342 break;
343 case GRPC_CALL_DOWN:
ctiller82e275f2014-12-12 08:43:28 -0800344 gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
345 abort();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800346 break;
347 }
348 break;
349 }
350}
351
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800352/* Constructor for call_data */
353static void init_call_elem(grpc_call_element *elem,
354 const void *server_transport_data) {
355 call_data *calld = elem->call_data;
356
357 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
358 GPR_ASSERT(server_transport_data == NULL);
359 calld->elem = elem;
360 calld->state = CALL_CREATED;
361 calld->deadline = gpr_inf_future;
Craig Tiller8b282cb2015-04-17 14:57:44 -0700362 calld->got_first_send = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800363}
364
365/* Destructor for call_data */
366static void destroy_call_elem(grpc_call_element *elem) {
367 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800369 /* if the call got activated, we need to destroy the child stack also, and
370 remove it from the in-flight requests tracked by the child_entry we
371 picked */
372 if (calld->state == CALL_ACTIVE) {
ctiller82e275f2014-12-12 08:43:28 -0800373 grpc_child_call_destroy(calld->s.active.child_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374 }
Craig Tillereb40a532015-04-17 16:46:20 -0700375 if (calld->state == CALL_WAITING) {
376 grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
377 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800378}
379
380/* Constructor for channel_data */
381static void init_channel_elem(grpc_channel_element *elem,
382 const grpc_channel_args *args,
383 grpc_mdctx *metadata_context, int is_first,
384 int is_last) {
385 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800386
387 GPR_ASSERT(!is_first);
388 GPR_ASSERT(is_last);
389 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
390
391 gpr_mu_init(&chand->mu);
392 chand->active_child = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393 chand->waiting_children = NULL;
394 chand->waiting_child_count = 0;
395 chand->waiting_child_capacity = 0;
396 chand->transport_setup = NULL;
397 chand->transport_setup_initiated = 0;
398 chand->args = grpc_channel_args_copy(args);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800399}
400
401/* Destructor for channel_data */
402static void destroy_channel_elem(grpc_channel_element *elem) {
403 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800404
405 grpc_transport_setup_cancel(chand->transport_setup);
406
ctiller82e275f2014-12-12 08:43:28 -0800407 if (chand->active_child) {
ctiller58393c22015-01-07 14:03:30 -0800408 grpc_child_channel_destroy(chand->active_child, 1);
ctiller82e275f2014-12-12 08:43:28 -0800409 chand->active_child = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800410 }
411
412 grpc_channel_args_destroy(chand->args);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413
414 gpr_mu_destroy(&chand->mu);
415 GPR_ASSERT(chand->waiting_child_count == 0);
416 gpr_free(chand->waiting_children);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800417}
418
419const grpc_channel_filter grpc_client_channel_filter = {
Craig Tiller87d5b192015-04-16 14:37:57 -0700420 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
421 sizeof(channel_data), init_channel_elem, destroy_channel_elem,
422 "client-channel",
423};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800424
425grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
426 grpc_channel_stack *channel_stack, grpc_transport *transport,
427 grpc_channel_filter const **channel_filters, size_t num_channel_filters,
428 grpc_mdctx *mdctx) {
429 /* we just got a new transport: lets create a child channel stack for it */
430 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
431 channel_data *chand = elem->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432 size_t num_child_filters = 2 + num_channel_filters;
433 grpc_channel_filter const **child_filters;
434 grpc_transport_setup_result result;
ctiller82e275f2014-12-12 08:43:28 -0800435 grpc_child_channel *old_active = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436 call_data **waiting_children;
437 size_t waiting_child_count;
438 size_t i;
439 grpc_call_op *call_ops;
440
441 /* build the child filter stack */
442 child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
443 /* we always need a link back filter to get back to the connected channel */
ctiller82e275f2014-12-12 08:43:28 -0800444 child_filters[0] = &grpc_child_channel_top_filter;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800445 for (i = 0; i < num_channel_filters; i++) {
446 child_filters[i + 1] = channel_filters[i];
447 }
448 /* and we always need a connected channel to talk to the transport */
449 child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
450
451 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
452
453 /* BEGIN LOCKING CHANNEL */
454 gpr_mu_lock(&chand->mu);
455 chand->transport_setup_initiated = 0;
456
ctiller82e275f2014-12-12 08:43:28 -0800457 if (chand->active_child) {
458 old_active = chand->active_child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800459 }
ctiller82e275f2014-12-12 08:43:28 -0800460 chand->active_child = grpc_child_channel_create(
461 elem, child_filters, num_child_filters, chand->args, mdctx);
462 result =
463 grpc_connected_channel_bind_transport(chand->active_child, transport);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464
465 /* capture the waiting children - we'll activate them outside the lock
466 to avoid re-entrancy problems */
467 waiting_children = chand->waiting_children;
468 waiting_child_count = chand->waiting_child_count;
469 /* bumping up inflight_requests here avoids taking a lock per rpc below */
470
471 chand->waiting_children = NULL;
472 chand->waiting_child_count = 0;
473 chand->waiting_child_capacity = 0;
474
475 call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
476
477 for (i = 0; i < waiting_child_count; i++) {
Craig Tiller8b282cb2015-04-17 14:57:44 -0700478 call_ops[i] = waiting_children[i]->s.waiting_op;
ctiller82e275f2014-12-12 08:43:28 -0800479 if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800480 waiting_children[i] = NULL;
481 call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
482 }
483 }
484
485 /* END LOCKING CHANNEL */
486 gpr_mu_unlock(&chand->mu);
487
488 /* activate any pending operations - this is safe to do as we guarantee one
489 and only one write operation per request at the surface api - if we lose
490 that guarantee we need to do some curly locking here */
491 for (i = 0; i < waiting_child_count; i++) {
492 if (waiting_children[i]) {
ctiller82e275f2014-12-12 08:43:28 -0800493 complete_activate(waiting_children[i]->elem, &call_ops[i]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800494 }
495 }
496 gpr_free(waiting_children);
497 gpr_free(call_ops);
498 gpr_free(child_filters);
499
ctiller82e275f2014-12-12 08:43:28 -0800500 if (old_active) {
ctiller58393c22015-01-07 14:03:30 -0800501 grpc_child_channel_destroy(old_active, 1);
ctiller82e275f2014-12-12 08:43:28 -0800502 }
503
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800504 return result;
505}
506
507void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
508 grpc_transport_setup *setup) {
509 /* post construction initialization: set the transport setup pointer */
510 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
511 channel_data *chand = elem->channel_data;
512 GPR_ASSERT(!chand->transport_setup);
513 chand->transport_setup = setup;
Craig Tiller190d3602015-02-18 09:23:38 -0800514}