blob: 3db462b24657374401ce74be76803fc6276f6180 [file] [log] [blame]
Craig Tiller577c9b22015-11-02 14:11:15 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller577c9b22015-11-02 14:11:15 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerb112f392016-04-05 12:44:04 -070034#include "src/core/ext/client_config/subchannel_call_holder.h"
Craig Tiller577c9b22015-11-02 14:11:15 -080035
36#include <grpc/support/alloc.h>
37
Craig Tiller9533d042016-03-25 17:11:06 -070038#include "src/core/lib/profiling/timers.h"
Craig Tiller577c9b22015-11-02 14:11:15 -080039
40#define GET_CALL(holder) \
41 ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
42
43#define CANCELLED_CALL ((grpc_subchannel_call *)1)
44
45static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
Craig Tiller6c396862016-01-28 13:53:40 -080046 bool success);
Craig Tiller577c9b22015-11-02 14:11:15 -080047static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
Craig Tiller6c396862016-01-28 13:53:40 -080048 bool success);
Craig Tiller577c9b22015-11-02 14:11:15 -080049
50static void add_waiting_locked(grpc_subchannel_call_holder *holder,
51 grpc_transport_stream_op *op);
52static void fail_locked(grpc_exec_ctx *exec_ctx,
53 grpc_subchannel_call_holder *holder);
54static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
55 grpc_subchannel_call_holder *holder);
56
57void grpc_subchannel_call_holder_init(
58 grpc_subchannel_call_holder *holder,
59 grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
Craig Tiller11beb9a2015-11-24 10:29:32 -080060 void *pick_subchannel_arg, grpc_call_stack *owning_call) {
Craig Tiller577c9b22015-11-02 14:11:15 -080061 gpr_atm_rel_store(&holder->subchannel_call, 0);
62 holder->pick_subchannel = pick_subchannel;
63 holder->pick_subchannel_arg = pick_subchannel_arg;
64 gpr_mu_init(&holder->mu);
Craig Tillerb5585d42015-11-17 07:18:31 -080065 holder->connected_subchannel = NULL;
Craig Tiller577c9b22015-11-02 14:11:15 -080066 holder->waiting_ops = NULL;
67 holder->waiting_ops_count = 0;
68 holder->waiting_ops_capacity = 0;
69 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tiller11beb9a2015-11-24 10:29:32 -080070 holder->owning_call = owning_call;
Craig Tiller577c9b22015-11-02 14:11:15 -080071}
72
73void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
74 grpc_subchannel_call_holder *holder) {
75 grpc_subchannel_call *call = GET_CALL(holder);
76 if (call != NULL && call != CANCELLED_CALL) {
77 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
78 }
79 GPR_ASSERT(holder->creation_phase ==
80 GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
81 gpr_mu_destroy(&holder->mu);
82 GPR_ASSERT(holder->waiting_ops_count == 0);
83 gpr_free(holder->waiting_ops);
84}
85
86void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
87 grpc_subchannel_call_holder *holder,
88 grpc_transport_stream_op *op) {
89 /* try to (atomically) get the call */
90 grpc_subchannel_call *call = GET_CALL(holder);
91 GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
92 if (call == CANCELLED_CALL) {
93 grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
94 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
95 return;
96 }
97 if (call != NULL) {
98 grpc_subchannel_call_process_op(exec_ctx, call, op);
99 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
100 return;
101 }
102 /* we failed; lock and figure out what to do */
103 gpr_mu_lock(&holder->mu);
104retry:
105 /* need to recheck that another thread hasn't set the call */
106 call = GET_CALL(holder);
107 if (call == CANCELLED_CALL) {
108 gpr_mu_unlock(&holder->mu);
109 grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
110 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
111 return;
112 }
113 if (call != NULL) {
114 gpr_mu_unlock(&holder->mu);
115 grpc_subchannel_call_process_op(exec_ctx, call, op);
116 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
117 return;
118 }
119 /* if this is a cancellation, then we can raise our cancelled flag */
120 if (op->cancel_with_status != GRPC_STATUS_OK) {
121 if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
122 goto retry;
123 } else {
124 switch (holder->creation_phase) {
125 case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
Craig Tiller1726e832015-11-03 12:45:11 -0800126 fail_locked(exec_ctx, holder);
Craig Tiller577c9b22015-11-02 14:11:15 -0800127 break;
Craig Tiller577c9b22015-11-02 14:11:15 -0800128 case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
129 holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800130 0, &holder->connected_subchannel, NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800131 break;
132 }
133 gpr_mu_unlock(&holder->mu);
134 grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
135 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
136 return;
137 }
138 }
139 /* if we don't have a subchannel, try to get one */
140 if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
Craig Tillerab33b482015-11-21 08:11:04 -0800141 holder->connected_subchannel == NULL &&
142 op->send_initial_metadata != NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800143 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
144 grpc_closure_init(&holder->next_step, subchannel_ready, holder);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800145 GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
Craig Tillerab33b482015-11-21 08:11:04 -0800146 if (holder->pick_subchannel(
147 exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800148 op->send_initial_metadata_flags, &holder->connected_subchannel,
149 &holder->next_step)) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800150 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tiller11beb9a2015-11-24 10:29:32 -0800151 GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
Craig Tiller577c9b22015-11-02 14:11:15 -0800152 }
153 }
154 /* if we've got a subchannel, then let's ask it to create a call */
155 if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
Craig Tillerb5585d42015-11-17 07:18:31 -0800156 holder->connected_subchannel != NULL) {
Craig Tillerab33b482015-11-21 08:11:04 -0800157 gpr_atm_rel_store(
158 &holder->subchannel_call,
Craig Tiller7536af02015-12-22 13:49:30 -0800159 (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
Craig Tillerab33b482015-11-21 08:11:04 -0800160 exec_ctx, holder->connected_subchannel, holder->pollset));
Craig Tillerb5585d42015-11-17 07:18:31 -0800161 retry_waiting_locked(exec_ctx, holder);
162 goto retry;
Craig Tiller577c9b22015-11-02 14:11:15 -0800163 }
164 /* nothing to be done but wait */
165 add_waiting_locked(holder, op);
166 gpr_mu_unlock(&holder->mu);
167 GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
168}
169
Craig Tiller6c396862016-01-28 13:53:40 -0800170static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800171 grpc_subchannel_call_holder *holder = arg;
Craig Tiller577c9b22015-11-02 14:11:15 -0800172 gpr_mu_lock(&holder->mu);
173 GPR_ASSERT(holder->creation_phase ==
174 GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800175 holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
Craig Tillerb5585d42015-11-17 07:18:31 -0800176 if (holder->connected_subchannel == NULL) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800177 fail_locked(exec_ctx, holder);
Craig Tillerd426d9d2016-03-09 09:30:18 -0800178 } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
179 /* already cancelled before subchannel became ready */
180 fail_locked(exec_ctx, holder);
Craig Tiller577c9b22015-11-02 14:11:15 -0800181 } else {
Craig Tillerd426d9d2016-03-09 09:30:18 -0800182 gpr_atm_rel_store(
183 &holder->subchannel_call,
184 (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
185 exec_ctx, holder->connected_subchannel, holder->pollset));
186 retry_waiting_locked(exec_ctx, holder);
Craig Tiller577c9b22015-11-02 14:11:15 -0800187 }
188 gpr_mu_unlock(&holder->mu);
Craig Tiller11beb9a2015-11-24 10:29:32 -0800189 GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
Craig Tiller577c9b22015-11-02 14:11:15 -0800190}
191
192typedef struct {
193 grpc_transport_stream_op *ops;
194 size_t nops;
195 grpc_subchannel_call *call;
196} retry_ops_args;
197
198static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
199 grpc_subchannel_call_holder *holder) {
200 retry_ops_args *a = gpr_malloc(sizeof(*a));
201 a->ops = holder->waiting_ops;
202 a->nops = holder->waiting_ops_count;
203 a->call = GET_CALL(holder);
204 if (a->call == CANCELLED_CALL) {
205 gpr_free(a);
206 fail_locked(exec_ctx, holder);
207 return;
208 }
209 holder->waiting_ops = NULL;
210 holder->waiting_ops_count = 0;
211 holder->waiting_ops_capacity = 0;
212 GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
Craig Tiller6c396862016-01-28 13:53:40 -0800213 grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
214 NULL);
Craig Tiller577c9b22015-11-02 14:11:15 -0800215}
216
Craig Tiller6c396862016-01-28 13:53:40 -0800217static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800218 retry_ops_args *a = args;
219 size_t i;
220 for (i = 0; i < a->nops; i++) {
221 grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
222 }
223 GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
224 gpr_free(a->ops);
225 gpr_free(a);
226}
227
228static void add_waiting_locked(grpc_subchannel_call_holder *holder,
229 grpc_transport_stream_op *op) {
230 GPR_TIMER_BEGIN("add_waiting_locked", 0);
231 if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
232 holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
233 holder->waiting_ops =
234 gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
235 sizeof(*holder->waiting_ops));
236 }
237 holder->waiting_ops[holder->waiting_ops_count++] = *op;
238 GPR_TIMER_END("add_waiting_locked", 0);
239}
240
241static void fail_locked(grpc_exec_ctx *exec_ctx,
242 grpc_subchannel_call_holder *holder) {
243 size_t i;
244 for (i = 0; i < holder->waiting_ops_count; i++) {
Craig Tillera44cbfc2016-02-03 16:02:49 -0800245 grpc_transport_stream_op_finish_with_failure(exec_ctx,
246 &holder->waiting_ops[i]);
Craig Tiller577c9b22015-11-02 14:11:15 -0800247 }
248 holder->waiting_ops_count = 0;
249}
250
Craig Tiller7b435612015-11-24 08:15:05 -0800251char *grpc_subchannel_call_holder_get_peer(
252 grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800253 grpc_subchannel_call *subchannel_call = GET_CALL(holder);
254
255 if (subchannel_call) {
256 return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
257 } else {
Craig Tiller906e3bc2015-11-24 07:31:31 -0800258 return NULL;
Craig Tiller577c9b22015-11-02 14:11:15 -0800259 }
260}