blob: c2044c9e70634c4f05f882fbea4cb74d45edb7c4 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070042
43typedef struct {
44 gpr_refcount refs;
45 grpc_subchannel *subchannel;
46} connection;
47
Craig Tiller5f84c842015-06-26 16:08:21 -070048typedef struct waiting_for_connect {
49 struct waiting_for_connect *next;
50 grpc_iomgr_closure *notify;
51 grpc_transport_stream_op *initial_op;
52 grpc_subchannel_call **target;
53} waiting_for_connect;
54
55typedef struct connectivity_state_watcher {
56 struct connectivity_state_watcher *next;
57 grpc_iomgr_closure *notify;
58 grpc_connectivity_state *current;
59} connectivity_state_watcher;
60
Craig Tiller2595ab72015-06-25 15:26:00 -070061struct grpc_subchannel {
62 gpr_refcount refs;
Craig Tiller91624662015-06-25 16:31:02 -070063 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070064
65 /** non-transport related channel filters */
66 const grpc_channel_filter **filters;
67 size_t filter_count;
68 /** channel arguments */
69 grpc_channel_args *args;
70 /** address to connect to */
71 struct sockaddr *addr;
72 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070073 /** metadata context */
74 grpc_mdctx *mdctx;
Craig Tillereb3b12e2015-06-26 14:42:49 -070075
76 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070077 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070078
79 /** callback for connection finishing */
80 grpc_iomgr_closure connected;
81
Craig Tiller5f84c842015-06-26 16:08:21 -070082 /** pollset_set tracking who's interested in a connection
83 being setup */
84 grpc_pollset_set pollset_set;
85
Craig Tillereb3b12e2015-06-26 14:42:49 -070086 /** mutex protecting remaining elements */
87 gpr_mu mu;
88
89 /** active connection */
90 connection *active;
91 /** are we connecting */
92 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -070093 /** things waiting for a connection */
94 waiting_for_connect *waiting;
95 /** things watching the connectivity state */
96 connectivity_state_watcher *watchers;
Craig Tiller2595ab72015-06-25 15:26:00 -070097};
98
99struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700101 gpr_refcount refs;
102};
103
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700104#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
105#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700106
Craig Tillereb3b12e2015-06-26 14:42:49 -0700107static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
Craig Tiller5f84c842015-06-26 16:08:21 -0700108static void connectivity_state_changed_locked(grpc_subchannel *c);
109static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
110static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700111static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700112
Craig Tiller2595ab72015-06-25 15:26:00 -0700113/*
114 * grpc_subchannel implementation
115 */
116
Craig Tillereb3b12e2015-06-26 14:42:49 -0700117void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); }
Craig Tiller2595ab72015-06-25 15:26:00 -0700118
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119void grpc_subchannel_unref(grpc_subchannel *c) {
120 if (gpr_unref(&c->refs)) {
121 gpr_free(c->filters);
122 grpc_channel_args_destroy(c->args);
123 gpr_free(c->addr);
Craig Tiller5f84c842015-06-26 16:08:21 -0700124 grpc_mdctx_unref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700125 grpc_pollset_set_destroy(&c->pollset_set);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700126 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700127 }
128}
129
Craig Tiller5f84c842015-06-26 16:08:21 -0700130void grpc_subchannel_add_interested_party(grpc_subchannel *c,
131 grpc_pollset *pollset) {
132 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
133}
134
135void grpc_subchannel_del_interested_party(grpc_subchannel *c,
136 grpc_pollset *pollset) {
137 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
138}
139
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700140grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
141 grpc_subchannel_args *args) {
142 grpc_subchannel *c = gpr_malloc(sizeof(*c));
143 memset(c, 0, sizeof(*c));
144 gpr_ref_init(&c->refs, 1);
145 c->connector = connector;
146 grpc_connector_ref(c->connector);
Craig Tillerff54c922015-06-26 16:57:20 -0700147 c->filter_count = args->filter_count + 1;
148 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700149 memcpy(c->filters, args->filters,
150 sizeof(grpc_channel_filter *) * args->filter_count);
Craig Tillerff54c922015-06-26 16:57:20 -0700151 c->filters[c->filter_count - 1] = &grpc_connected_channel_filter;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700152 c->addr = gpr_malloc(args->addr_len);
153 memcpy(c->addr, args->addr, args->addr_len);
154 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700155 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700156 c->mdctx = args->mdctx;
157 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700158 grpc_pollset_set_init(&c->pollset_set);
159 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700160 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700161 return c;
162}
163
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700164static void start_connect(grpc_subchannel *c) {
165 grpc_connect_in_args args;
166
167 args.interested_parties = &c->pollset_set;
168 args.addr = c->addr;
169 args.addr_len = c->addr_len;
170 args.deadline = compute_connect_deadline(c);
171 args.channel_args = c->args;
172 args.metadata_context = c->mdctx;
173
174 grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
175}
176
Craig Tillereb3b12e2015-06-26 14:42:49 -0700177void grpc_subchannel_create_call(grpc_subchannel *c,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700178 grpc_transport_stream_op *initial_op,
179 grpc_subchannel_call **target,
180 grpc_iomgr_closure *notify) {
181 connection *con;
182 gpr_mu_lock(&c->mu);
183 if (c->active != NULL) {
184 con = c->active;
185 gpr_ref(&con->refs);
186 gpr_mu_unlock(&c->mu);
187
188 *target = create_call(con, initial_op);
189 notify->cb(notify->cb_arg, 1);
190 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700191 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
192 w4c->next = c->waiting;
193 w4c->notify = notify;
194 w4c->initial_op = initial_op;
195 w4c->target = target;
196 c->waiting = w4c;
197 grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700198 if (!c->connecting) {
199 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700200 connectivity_state_changed_locked(c);
Craig Tillerff54c922015-06-26 16:57:20 -0700201 grpc_subchannel_ref(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700202 gpr_mu_unlock(&c->mu);
203
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700204 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700205 } else {
206 gpr_mu_unlock(&c->mu);
207 }
208 }
209}
210
Craig Tiller5f84c842015-06-26 16:08:21 -0700211grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
212 grpc_connectivity_state state;
213 gpr_mu_lock(&c->mu);
214 state = compute_connectivity_locked(c);
215 gpr_mu_unlock(&c->mu);
216 return state;
217}
218
219void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
220 grpc_connectivity_state *state,
221 grpc_iomgr_closure *notify) {
222 grpc_connectivity_state current;
223 int do_connect = 0;
224 connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
225 w->current = state;
226 w->notify = notify;
227 gpr_mu_lock(&c->mu);
228 current = compute_connectivity_locked(c);
229 if (current == GRPC_CHANNEL_IDLE) {
230 current = GRPC_CHANNEL_CONNECTING;
231 c->connecting = 1;
232 do_connect = 1;
Craig Tillerff54c922015-06-26 16:57:20 -0700233 grpc_subchannel_ref(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700234 connectivity_state_changed_locked(c);
235 }
236 if (current != *state) {
237 gpr_mu_unlock(&c->mu);
238 *state = current;
239 grpc_iomgr_add_callback(notify);
240 gpr_free(w);
241 } else {
242 w->next = c->watchers;
243 c->watchers = w;
244 gpr_mu_unlock(&c->mu);
245 }
246 if (do_connect) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700247 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700248 }
249}
250
Craig Tillerff54c922015-06-26 16:57:20 -0700251static void publish_transport(grpc_subchannel *c) {
252 size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count);
253 connection *con = gpr_malloc(sizeof(connection) + channel_stack_size);
254 grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1);
255 waiting_for_connect *w4c;
256 gpr_ref_init(&con->refs, 1);
257 con->subchannel = c;
258 grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700259 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
260 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700261
262 gpr_mu_lock(&c->mu);
263 GPR_ASSERT(c->active == NULL);
264 c->active = con;
265 c->connecting = 0;
266 connectivity_state_changed_locked(c);
267 while ((w4c = c->waiting)) {
268 abort(); /* not implemented */
269 }
270 gpr_mu_unlock(&c->mu);
271}
272
273static void subchannel_connected(void *arg, int iomgr_success) {
274 grpc_subchannel *c = arg;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700275 if (c->connecting_result.transport) {
Craig Tillerff54c922015-06-26 16:57:20 -0700276 publish_transport(c);
277 } else {
278 grpc_subchannel_unref(c);
279 /* TODO(ctiller): retry after sleeping */
280 abort();
281 }
282}
283
Craig Tiller5f84c842015-06-26 16:08:21 -0700284static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
285 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
286}
287
288static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
289 if (c->connecting) {
290 return GRPC_CHANNEL_CONNECTING;
291 }
292 if (c->active) {
293 return GRPC_CHANNEL_READY;
294 }
295 return GRPC_CHANNEL_IDLE;
296}
297
298static void connectivity_state_changed_locked(grpc_subchannel *c) {
299 grpc_connectivity_state current = compute_connectivity_locked(c);
300 connectivity_state_watcher *new = NULL;
301 connectivity_state_watcher *w;
302 while ((w = c->watchers)) {
303 c->watchers = w->next;
304
305 if (current != *w->current) {
306 *w->current = current;
307 grpc_iomgr_add_callback(w->notify);
308 gpr_free(w);
309 } else {
310 w->next = new;
311 new = w;
312 }
313 }
314 c->watchers = new;
315}
316
Craig Tiller2595ab72015-06-25 15:26:00 -0700317/*
318 * grpc_subchannel_call implementation
319 */
320
321void grpc_subchannel_call_ref(grpc_subchannel_call *call) {
322 gpr_ref(&call->refs);
323}
324
325void grpc_subchannel_call_unref(grpc_subchannel_call *call) {
326 if (gpr_unref(&call->refs)) {
327 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700328 if (gpr_unref(&call->connection->refs)) {
329 gpr_free(call->connection);
330 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700331 gpr_free(call);
332 }
333}
334
335void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
336 grpc_transport_stream_op *op) {
337 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
338 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
339 top_elem->filter->start_transport_stream_op(top_elem, op);
340}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700341
342grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700343 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
344 grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
345 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
346 call->connection = con;
347 gpr_ref_init(&call->refs, 1);
348 grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
349 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700350}