blob: be6d054af4ee931df71abb97f28045e7391d91df [file] [log] [blame]
Craig Tiller577c9b22015-11-02 14:11:15 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller577c9b22015-11-02 14:11:15 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerb112f392016-04-05 12:44:04 -070034#include "src/core/ext/client_config/subchannel_call_holder.h"
Craig Tiller577c9b22015-11-02 14:11:15 -080035
36#include <grpc/support/alloc.h>
37
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/profiling/timers.h"
Craig Tiller577c9b22015-11-02 14:11:15 -080039
40#define GET_CALL(holder) \
41 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
42
43#define CANCELLED_CALL ((grpc_subchannel_call *)1)
44
45static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
Craig Tiller804ff712016-05-05 16:25:40 -070046 grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -080047static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
Craig Tiller804ff712016-05-05 16:25:40 -070048 grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -080049
50static void add_waiting_locked(grpc_subchannel_call_holder *holder,
51 grpc_transport_stream_op *op);
52static void fail_locked(grpc_exec_ctx *exec_ctx,
Craig Tiller804ff712016-05-05 16:25:40 -070053 grpc_subchannel_call_holder *holder, grpc_error *error);
Craig Tiller577c9b22015-11-02 14:11:15 -080054static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
55 grpc_subchannel_call_holder *holder);
56
57void grpc_subchannel_call_holder_init(
58 grpc_subchannel_call_holder *holder,
59 grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
Craig Tiller11beb9a2015-11-24 10:29:32 -080060 void *pick_subchannel_arg, grpc_call_stack *owning_call) {
Craig Tiller577c9b22015-11-02 14:11:15 -080061 gpr_atm_rel_store(&holder->subchannel_call, 0);
62 holder->pick_subchannel = pick_subchannel;
63 holder->pick_subchannel_arg = pick_subchannel_arg;
64 gpr_mu_init(&holder->mu);
Craig Tillerb5585d42015-11-17 07:18:31 -080065 holder->connected_subchannel = NULL;
Craig Tiller577c9b22015-11-02 14:11:15 -080066 holder->waiting_ops = NULL;
67 holder->waiting_ops_count = 0;
68 holder->waiting_ops_capacity = 0;
69 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tiller11beb9a2015-11-24 10:29:32 -080070 holder->owning_call = owning_call;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -070071 holder->pollent = NULL;
Craig Tiller577c9b22015-11-02 14:11:15 -080072}
73
74void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
75 grpc_subchannel_call_holder *holder) {
76 grpc_subchannel_call *call = GET_CALL(holder);
77 if (call != NULL && call != CANCELLED_CALL) {
78 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
79 }
80 GPR_ASSERT(holder->creation_phase ==
81 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
82 gpr_mu_destroy(&holder->mu);
83 GPR_ASSERT(holder->waiting_ops_count == 0);
84 gpr_free(holder->waiting_ops);
85}
86
Mark D. Roth09e66982016-06-23 11:14:18 -070087// The logic here is fairly complicated, due to (a) the fact that we
88// need to handle the case where we receive the send op before the
89// initial metadata op, and (b) the need for efficiency, especially in
90// the streaming case.
91// TODO(ctiller): Explain this more thoroughly.
Craig Tiller577c9b22015-11-02 14:11:15 -080092void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
93 grpc_subchannel_call_holder *holder,
94 grpc_transport_stream_op *op) {
95 /* try to (atomically) get the call */
96 grpc_subchannel_call *call = GET_CALL(holder);
97 GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
98 if (call == CANCELLED_CALL) {
Craig Tiller804ff712016-05-05 16:25:40 -070099 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
100 GRPC_ERROR_CANCELLED);
Craig Tiller577c9b22015-11-02 14:11:15 -0800101 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
102 return;
103 }
104 if (call != NULL) {
105 grpc_subchannel_call_process_op(exec_ctx, call, op);
106 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
107 return;
108 }
109 /* we failed; lock and figure out what to do */
110 gpr_mu_lock(&holder->mu);
111retry:
112 /* need to recheck that another thread hasn't set the call */
113 call = GET_CALL(holder);
114 if (call == CANCELLED_CALL) {
115 gpr_mu_unlock(&holder->mu);
Craig Tiller804ff712016-05-05 16:25:40 -0700116 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
117 GRPC_ERROR_CANCELLED);
Craig Tiller577c9b22015-11-02 14:11:15 -0800118 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
119 return;
120 }
121 if (call != NULL) {
122 gpr_mu_unlock(&holder->mu);
123 grpc_subchannel_call_process_op(exec_ctx, call, op);
124 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
125 return;
126 }
127 /* if this is a cancellation, then we can raise our cancelled flag */
Craig Tillerf0f70a82016-06-23 13:55:06 -0700128 if (op->cancel_error != GRPC_ERROR_NONE) {
Mark D. Roth09e66982016-06-23 11:14:18 -0700129 if (!gpr_atm_rel_cas(&holder->subchannel_call, 0,
130 (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800131 goto retry;
132 } else {
133 switch (holder->creation_phase) {
134 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
Craig Tillerf0f70a82016-06-23 13:55:06 -0700135 fail_locked(exec_ctx, holder, GRPC_ERROR_REF(op->cancel_error));
Craig Tiller577c9b22015-11-02 14:11:15 -0800136 break;
Craig Tiller577c9b22015-11-02 14:11:15 -0800137 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
138 holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800139 0, &holder->connected_subchannel, NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800140 break;
141 }
142 gpr_mu_unlock(&holder->mu);
Craig Tiller804ff712016-05-05 16:25:40 -0700143 grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
144 GRPC_ERROR_CANCELLED);
Craig Tiller577c9b22015-11-02 14:11:15 -0800145 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
146 return;
147 }
148 }
149 /* if we don't have a subchannel, try to get one */
150 if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
Craig Tillerab33b482015-11-21 08:11:04 -0800151 holder->connected_subchannel == NULL &&
152 op->send_initial_metadata != NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800153 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
154 grpc_closure_init(&holder->next_step, subchannel_ready, holder);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800155 GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
Craig Tillerab33b482015-11-21 08:11:04 -0800156 if (holder->pick_subchannel(
157 exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800158 op->send_initial_metadata_flags, &holder->connected_subchannel,
159 &holder->next_step)) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800160 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tiller11beb9a2015-11-24 10:29:32 -0800161 GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
Craig Tiller577c9b22015-11-02 14:11:15 -0800162 }
163 }
164 /* if we've got a subchannel, then let's ask it to create a call */
165 if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
Craig Tillerb5585d42015-11-17 07:18:31 -0800166 holder->connected_subchannel != NULL) {
Mark D. Roth76d24422016-06-23 13:22:10 -0700167 grpc_subchannel_call *subchannel_call = NULL;
168 grpc_error *error = grpc_connected_subchannel_create_call(
Mark D. Roth09e66982016-06-23 11:14:18 -0700169 exec_ctx, holder->connected_subchannel, holder->pollent,
170 &subchannel_call);
171 if (error != GRPC_ERROR_NONE) {
172 subchannel_call = CANCELLED_CALL;
Mark D. Roth05d73af2016-07-27 15:52:46 +0000173 fail_locked(exec_ctx, holder, GRPC_ERROR_REF(error));
Mark D. Roth09e66982016-06-23 11:14:18 -0700174 grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
175 }
176 gpr_atm_rel_store(&holder->subchannel_call,
177 (gpr_atm)(uintptr_t)subchannel_call);
Craig Tillerb5585d42015-11-17 07:18:31 -0800178 retry_waiting_locked(exec_ctx, holder);
179 goto retry;
Craig Tiller577c9b22015-11-02 14:11:15 -0800180 }
181 /* nothing to be done but wait */
182 add_waiting_locked(holder, op);
183 gpr_mu_unlock(&holder->mu);
184 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
185}
186
Craig Tiller804ff712016-05-05 16:25:40 -0700187static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
188 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800189 grpc_subchannel_call_holder *holder = arg;
Craig Tiller577c9b22015-11-02 14:11:15 -0800190 gpr_mu_lock(&holder->mu);
191 GPR_ASSERT(holder->creation_phase ==
192 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800193 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tillerb5585d42015-11-17 07:18:31 -0800194 if (holder->connected_subchannel == NULL) {
Sree Kuchibhotlaad0f7922016-05-04 19:49:31 -0700195 gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
Craig Tiller804ff712016-05-05 16:25:40 -0700196 fail_locked(exec_ctx, holder,
197 GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel",
198 &error, 1));
Craig Tillerd426d9d2016-03-09 09:30:18 -0800199 } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
200 /* already cancelled before subchannel became ready */
Craig Tiller804ff712016-05-05 16:25:40 -0700201 fail_locked(exec_ctx, holder,
202 GRPC_ERROR_CREATE_REFERENCING(
203 "Cancelled before creating subchannel", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -0800204 } else {
Mark D. Roth76d24422016-06-23 13:22:10 -0700205 grpc_subchannel_call *subchannel_call = NULL;
206 grpc_error *new_error = grpc_connected_subchannel_create_call(
Mark D. Roth09e66982016-06-23 11:14:18 -0700207 exec_ctx, holder->connected_subchannel, holder->pollent,
208 &subchannel_call);
209 if (new_error != GRPC_ERROR_NONE) {
Mark D. Roth3c945ee2016-07-01 14:17:38 -0700210 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth09e66982016-06-23 11:14:18 -0700211 subchannel_call = CANCELLED_CALL;
212 fail_locked(exec_ctx, holder, new_error);
213 }
214 gpr_atm_rel_store(&holder->subchannel_call,
215 (gpr_atm)(uintptr_t)subchannel_call);
Craig Tillerd426d9d2016-03-09 09:30:18 -0800216 retry_waiting_locked(exec_ctx, holder);
Craig Tiller577c9b22015-11-02 14:11:15 -0800217 }
218 gpr_mu_unlock(&holder->mu);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800219 GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
Craig Tiller577c9b22015-11-02 14:11:15 -0800220}
221
222typedef struct {
223 grpc_transport_stream_op *ops;
224 size_t nops;
225 grpc_subchannel_call *call;
226} retry_ops_args;
227
228static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
229 grpc_subchannel_call_holder *holder) {
230 retry_ops_args *a = gpr_malloc(sizeof(*a));
231 a->ops = holder->waiting_ops;
232 a->nops = holder->waiting_ops_count;
233 a->call = GET_CALL(holder);
234 if (a->call == CANCELLED_CALL) {
235 gpr_free(a);
Craig Tiller804ff712016-05-05 16:25:40 -0700236 fail_locked(exec_ctx, holder, GRPC_ERROR_CANCELLED);
Craig Tiller577c9b22015-11-02 14:11:15 -0800237 return;
238 }
239 holder->waiting_ops = NULL;
240 holder->waiting_ops_count = 0;
241 holder->waiting_ops_capacity = 0;
242 GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
Craig Tiller332f1b32016-05-24 13:21:21 -0700243 grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
244 GRPC_ERROR_NONE, NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800245}
246
Craig Tiller804ff712016-05-05 16:25:40 -0700247static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800248 retry_ops_args *a = args;
249 size_t i;
250 for (i = 0; i < a->nops; i++) {
251 grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
252 }
253 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
254 gpr_free(a->ops);
255 gpr_free(a);
256}
257
258static void add_waiting_locked(grpc_subchannel_call_holder *holder,
259 grpc_transport_stream_op *op) {
260 GPR_TIMER_BEGIN("add_waiting_locked", 0);
261 if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
262 holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
263 holder->waiting_ops =
264 gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
265 sizeof(*holder->waiting_ops));
266 }
267 holder->waiting_ops[holder->waiting_ops_count++] = *op;
268 GPR_TIMER_END("add_waiting_locked", 0);
269}
270
271static void fail_locked(grpc_exec_ctx *exec_ctx,
Craig Tiller804ff712016-05-05 16:25:40 -0700272 grpc_subchannel_call_holder *holder,
273 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800274 size_t i;
275 for (i = 0; i < holder->waiting_ops_count; i++) {
Craig Tiller804ff712016-05-05 16:25:40 -0700276 grpc_transport_stream_op_finish_with_failure(
Craig Tillerf707d622016-05-06 14:26:12 -0700277 exec_ctx, &holder->waiting_ops[i], GRPC_ERROR_REF(error));
Craig Tiller577c9b22015-11-02 14:11:15 -0800278 }
279 holder->waiting_ops_count = 0;
Craig Tillerf707d622016-05-06 14:26:12 -0700280 GRPC_ERROR_UNREF(error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800281}
282
Craig Tiller7b435612015-11-24 08:15:05 -0800283char *grpc_subchannel_call_holder_get_peer(
284 grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800285 grpc_subchannel_call *subchannel_call = GET_CALL(holder);
286
Craig Tiller347e9f92016-04-19 17:09:13 -0700287 if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
Craig Tiller906e3bc2015-11-24 07:31:31 -0800288 return NULL;
Craig Tiller347e9f92016-04-19 17:09:13 -0700289 } else {
290 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
Craig Tiller577c9b22015-11-02 14:11:15 -0800291 }
292}