blob: b4da9cda3f350186a08d139b7662ccbc011d7138 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tillerc7b5f762015-06-27 11:48:42 -070042#include "src/core/channel/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070043
44typedef struct {
Craig Tillerd7b68e72015-06-28 11:41:09 -070045 /* all fields protected by subchannel->mu */
46 /** refcount */
47 int refs;
48 /** parent subchannel */
Craig Tillereb3b12e2015-06-26 14:42:49 -070049 grpc_subchannel *subchannel;
50} connection;
51
Craig Tiller5f84c842015-06-26 16:08:21 -070052typedef struct waiting_for_connect {
53 struct waiting_for_connect *next;
54 grpc_iomgr_closure *notify;
55 grpc_transport_stream_op *initial_op;
56 grpc_subchannel_call **target;
57} waiting_for_connect;
58
Craig Tiller2595ab72015-06-25 15:26:00 -070059struct grpc_subchannel {
Craig Tiller91624662015-06-25 16:31:02 -070060 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070061
62 /** non-transport related channel filters */
63 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070064 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070065 /** channel arguments */
66 grpc_channel_args *args;
67 /** address to connect to */
68 struct sockaddr *addr;
69 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070070 /** metadata context */
71 grpc_mdctx *mdctx;
Craig Tillereb3b12e2015-06-26 14:42:49 -070072
73 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070074 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070075
76 /** callback for connection finishing */
77 grpc_iomgr_closure connected;
78
Craig Tiller5f84c842015-06-26 16:08:21 -070079 /** pollset_set tracking who's interested in a connection
80 being setup */
81 grpc_pollset_set pollset_set;
82
Craig Tillereb3b12e2015-06-26 14:42:49 -070083 /** mutex protecting remaining elements */
84 gpr_mu mu;
85
86 /** active connection */
87 connection *active;
Craig Tillerd7b68e72015-06-28 11:41:09 -070088 /** refcount */
89 int refs;
Craig Tillereb3b12e2015-06-26 14:42:49 -070090 /** are we connecting */
91 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -070092 /** things waiting for a connection */
93 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -070094 /** connectivity state tracking */
95 grpc_connectivity_state_tracker state_tracker;
Craig Tiller2595ab72015-06-25 15:26:00 -070096};
97
98struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -070099 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700100 gpr_refcount refs;
101};
102
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700103#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
104#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700105
Craig Tillereb3b12e2015-06-26 14:42:49 -0700106static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
Craig Tiller5f84c842015-06-26 16:08:21 -0700107static void connectivity_state_changed_locked(grpc_subchannel *c);
108static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
109static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700110static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700111
Craig Tillerd7b68e72015-06-28 11:41:09 -0700112static void subchannel_ref_locked(grpc_subchannel *c);
113static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
114static void connection_ref_locked(connection *c);
115static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT;
116static void subchannel_destroy(grpc_subchannel *c);
117
Craig Tiller2595ab72015-06-25 15:26:00 -0700118/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700119 * connection implementation
120 */
121
Craig Tillerd7b68e72015-06-28 11:41:09 -0700122static void connection_destroy(connection *c) {
123 GPR_ASSERT(c->refs == 0);
124 grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
125 gpr_free(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700126}
127
Craig Tillerd7b68e72015-06-28 11:41:09 -0700128static void connection_ref_locked(connection *c) {
129 subchannel_ref_locked(c->subchannel);
130 ++c->refs;
131}
132
133static grpc_subchannel *connection_unref_locked(connection *c) {
134 grpc_subchannel *destroy = NULL;
135 if (subchannel_unref_locked(c->subchannel)) {
136 destroy = c->subchannel;
137 }
138 if (--c->refs == 0 && c->subchannel->active != c) {
139 connection_destroy(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700140 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700141 return destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700142}
143
Craig Tillerd7b68e72015-06-28 11:41:09 -0700144
Craig Tillerca3e9d32015-06-27 18:37:27 -0700145/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700146 * grpc_subchannel implementation
147 */
148
Craig Tillerd7b68e72015-06-28 11:41:09 -0700149static void subchannel_ref_locked(grpc_subchannel *c) {
150 ++c->refs;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700151}
Craig Tiller2595ab72015-06-25 15:26:00 -0700152
Craig Tillerd7b68e72015-06-28 11:41:09 -0700153static int subchannel_unref_locked(grpc_subchannel *c) {
154 return --c->refs == 0;
155}
156
157void grpc_subchannel_ref(grpc_subchannel *c) {
158 gpr_mu_lock(&c->mu);
159 subchannel_ref_locked(c);
160 gpr_mu_unlock(&c->mu);
161}
162
Craig Tillereb3b12e2015-06-26 14:42:49 -0700163void grpc_subchannel_unref(grpc_subchannel *c) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700164 int destroy;
165 gpr_mu_lock(&c->mu);
166 destroy = subchannel_unref_locked(c);
167 gpr_mu_unlock(&c->mu);
168 if (destroy) subchannel_destroy(c);
169}
170
171static void subchannel_destroy(grpc_subchannel *c) {
172 if (c->active != NULL) {
173 connection_destroy(c->active);
174 }
175 gpr_free(c->filters);
176 grpc_channel_args_destroy(c->args);
177 gpr_free(c->addr);
178 grpc_mdctx_unref(c->mdctx);
179 grpc_pollset_set_destroy(&c->pollset_set);
180 grpc_connectivity_state_destroy(&c->state_tracker);
181 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700182}
183
Craig Tiller5f84c842015-06-26 16:08:21 -0700184void grpc_subchannel_add_interested_party(grpc_subchannel *c,
185 grpc_pollset *pollset) {
186 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
187}
188
189void grpc_subchannel_del_interested_party(grpc_subchannel *c,
190 grpc_pollset *pollset) {
191 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
192}
193
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700194grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
195 grpc_subchannel_args *args) {
196 grpc_subchannel *c = gpr_malloc(sizeof(*c));
197 memset(c, 0, sizeof(*c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700198 c->refs = 1;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700199 c->connector = connector;
200 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700201 c->num_filters = args->filter_count;
202 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700203 memcpy(c->filters, args->filters,
Craig Tiller5945ee12015-06-27 10:36:09 -0700204 sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700205 c->addr = gpr_malloc(args->addr_len);
206 memcpy(c->addr, args->addr, args->addr_len);
207 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700208 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700209 c->mdctx = args->mdctx;
210 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700211 grpc_pollset_set_init(&c->pollset_set);
212 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700213 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700214 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700215 return c;
216}
217
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700218static void start_connect(grpc_subchannel *c) {
219 grpc_connect_in_args args;
220
221 args.interested_parties = &c->pollset_set;
222 args.addr = c->addr;
223 args.addr_len = c->addr_len;
224 args.deadline = compute_connect_deadline(c);
225 args.channel_args = c->args;
226 args.metadata_context = c->mdctx;
227
228 grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
229}
230
Craig Tillereb3b12e2015-06-26 14:42:49 -0700231void grpc_subchannel_create_call(grpc_subchannel *c,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700232 grpc_transport_stream_op *initial_op,
233 grpc_subchannel_call **target,
234 grpc_iomgr_closure *notify) {
235 connection *con;
236 gpr_mu_lock(&c->mu);
237 if (c->active != NULL) {
238 con = c->active;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700239 connection_ref_locked(con);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700240 gpr_mu_unlock(&c->mu);
241
242 *target = create_call(con, initial_op);
243 notify->cb(notify->cb_arg, 1);
244 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700245 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
246 w4c->next = c->waiting;
247 w4c->notify = notify;
248 w4c->initial_op = initial_op;
249 w4c->target = target;
250 c->waiting = w4c;
251 grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700252 if (!c->connecting) {
253 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700254 connectivity_state_changed_locked(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700255 subchannel_ref_locked(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700256 gpr_mu_unlock(&c->mu);
257
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700258 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700259 } else {
260 gpr_mu_unlock(&c->mu);
261 }
262 }
263}
264
Craig Tiller5f84c842015-06-26 16:08:21 -0700265grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
266 grpc_connectivity_state state;
267 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700268 state = grpc_connectivity_state_check(&c->state_tracker);
Craig Tiller5f84c842015-06-26 16:08:21 -0700269 gpr_mu_unlock(&c->mu);
270 return state;
271}
272
273void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
274 grpc_connectivity_state *state,
275 grpc_iomgr_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700276 int do_connect = 0;
Craig Tiller5f84c842015-06-26 16:08:21 -0700277 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700278 if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
279 do_connect = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700280 c->connecting = 1;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700281 subchannel_ref_locked(c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700282 grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
Craig Tiller5f84c842015-06-26 16:08:21 -0700283 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700284 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700285 if (do_connect) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700286 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700287 }
288}
289
Craig Tillerc7b5f762015-06-27 11:48:42 -0700290void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) {
291 abort();
292}
293
Craig Tillerff54c922015-06-26 16:57:20 -0700294static void publish_transport(grpc_subchannel *c) {
Craig Tiller5945ee12015-06-27 10:36:09 -0700295 size_t channel_stack_size;
296 connection *con;
297 grpc_channel_stack *stk;
298 size_t num_filters;
299 const grpc_channel_filter **filters;
Craig Tillerff54c922015-06-26 16:57:20 -0700300 waiting_for_connect *w4c;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700301 int destroy;
Craig Tiller5945ee12015-06-27 10:36:09 -0700302
303 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
304 filters = gpr_malloc(sizeof(*filters) * num_filters);
305 memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
306 memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters);
307 filters[num_filters - 1] = &grpc_connected_channel_filter;
308
309 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
310 con = gpr_malloc(sizeof(connection) + channel_stack_size);
311 stk = (grpc_channel_stack *)(con + 1);
312
Craig Tillerd7b68e72015-06-28 11:41:09 -0700313 con->refs = 0;
Craig Tillerff54c922015-06-26 16:57:20 -0700314 con->subchannel = c;
Craig Tiller5945ee12015-06-27 10:36:09 -0700315 grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700316 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
317 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700318
319 gpr_mu_lock(&c->mu);
320 GPR_ASSERT(c->active == NULL);
321 c->active = con;
322 c->connecting = 0;
323 connectivity_state_changed_locked(c);
324 while ((w4c = c->waiting)) {
325 abort(); /* not implemented */
326 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700327 destroy = subchannel_unref_locked(c);
Craig Tillerff54c922015-06-26 16:57:20 -0700328 gpr_mu_unlock(&c->mu);
Craig Tiller5945ee12015-06-27 10:36:09 -0700329
330 gpr_free(filters);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700331
332 if (destroy) {
333 subchannel_destroy(c);
334 }
Craig Tillerff54c922015-06-26 16:57:20 -0700335}
336
337static void subchannel_connected(void *arg, int iomgr_success) {
338 grpc_subchannel *c = arg;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700339 if (c->connecting_result.transport) {
Craig Tillerff54c922015-06-26 16:57:20 -0700340 publish_transport(c);
341 } else {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700342 int destroy;
343 gpr_mu_lock(&c->mu);
344 destroy = subchannel_unref_locked(c);
345 gpr_mu_unlock(&c->mu);
346 if (destroy) subchannel_destroy(c);
Craig Tillerff54c922015-06-26 16:57:20 -0700347 /* TODO(ctiller): retry after sleeping */
348 abort();
349 }
350}
351
Craig Tiller5f84c842015-06-26 16:08:21 -0700352static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
353 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
354}
355
356static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
357 if (c->connecting) {
358 return GRPC_CHANNEL_CONNECTING;
359 }
360 if (c->active) {
361 return GRPC_CHANNEL_READY;
362 }
363 return GRPC_CHANNEL_IDLE;
364}
365
366static void connectivity_state_changed_locked(grpc_subchannel *c) {
367 grpc_connectivity_state current = compute_connectivity_locked(c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700368 grpc_connectivity_state_set(&c->state_tracker, current);
Craig Tiller5f84c842015-06-26 16:08:21 -0700369}
370
Craig Tiller2595ab72015-06-25 15:26:00 -0700371/*
372 * grpc_subchannel_call implementation
373 */
374
Craig Tillerca3e9d32015-06-27 18:37:27 -0700375void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700376 gpr_ref(&c->refs);
Craig Tiller2595ab72015-06-25 15:26:00 -0700377}
378
Craig Tillerca3e9d32015-06-27 18:37:27 -0700379void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700380 if (gpr_unref(&c->refs)) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700381 gpr_mu *mu = &c->connection->subchannel->mu;
382 grpc_subchannel *destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700383 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700384 gpr_mu_lock(mu);
385 destroy = connection_unref_locked(c->connection);
386 gpr_mu_unlock(mu);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700387 gpr_free(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700388 if (destroy) {
389 subchannel_destroy(destroy);
390 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700391 }
392}
393
394void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
395 grpc_transport_stream_op *op) {
396 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
397 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
398 top_elem->filter->start_transport_stream_op(top_elem, op);
399}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700400
401grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700402 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
403 grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
404 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
405 call->connection = con;
406 gpr_ref_init(&call->refs, 1);
407 grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
408 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700409}