blob: e441befc0c97b4e0adcc02ac335599b9b34adbae [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tillerc7b5f762015-06-27 11:48:42 -070042#include "src/core/channel/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070043
44typedef struct {
45 gpr_refcount refs;
46 grpc_subchannel *subchannel;
47} connection;
48
Craig Tiller5f84c842015-06-26 16:08:21 -070049typedef struct waiting_for_connect {
50 struct waiting_for_connect *next;
51 grpc_iomgr_closure *notify;
52 grpc_transport_stream_op *initial_op;
53 grpc_subchannel_call **target;
54} waiting_for_connect;
55
Craig Tiller2595ab72015-06-25 15:26:00 -070056struct grpc_subchannel {
57 gpr_refcount refs;
Craig Tiller91624662015-06-25 16:31:02 -070058 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070059
60 /** non-transport related channel filters */
61 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070062 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070063 /** channel arguments */
64 grpc_channel_args *args;
65 /** address to connect to */
66 struct sockaddr *addr;
67 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070068 /** metadata context */
69 grpc_mdctx *mdctx;
Craig Tillereb3b12e2015-06-26 14:42:49 -070070
71 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070072 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070073
74 /** callback for connection finishing */
75 grpc_iomgr_closure connected;
76
Craig Tiller5f84c842015-06-26 16:08:21 -070077 /** pollset_set tracking who's interested in a connection
78 being setup */
79 grpc_pollset_set pollset_set;
80
Craig Tillereb3b12e2015-06-26 14:42:49 -070081 /** mutex protecting remaining elements */
82 gpr_mu mu;
83
84 /** active connection */
85 connection *active;
86 /** are we connecting */
87 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -070088 /** things waiting for a connection */
89 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -070090 /** connectivity state tracking */
91 grpc_connectivity_state_tracker state_tracker;
Craig Tiller2595ab72015-06-25 15:26:00 -070092};
93
94struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -070095 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -070096 gpr_refcount refs;
97};
98
Craig Tiller04c5d4b2015-06-26 17:21:41 -070099#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
100#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700101
Craig Tillereb3b12e2015-06-26 14:42:49 -0700102static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
Craig Tiller5f84c842015-06-26 16:08:21 -0700103static void connectivity_state_changed_locked(grpc_subchannel *c);
104static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
105static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700106static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700107
Craig Tiller2595ab72015-06-25 15:26:00 -0700108/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700109 * connection implementation
110 */
111
112#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
113#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r))
114#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r))
115#else
116#define CONNECTION_REF(c, r) connection_ref((c))
117#define CONNECTION_UNREF(c, r) connection_unref((c))
118#endif
119
120#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
121static void connection_ref(connection *c, const char *file, int line, const char *reason) {
122 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s",
123 c, (int)c->refs.count, (int)c->refs.count + 1,
124 reason);
125#else
126static void connection_ref(connection *c) {
127#endif
128 gpr_ref(&c->refs);
129}
130
131#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
132static void connection_unref(connection *c, const char *file, int line, const char *reason) {
133 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s",
134 c, (int)c->refs.count, (int)c->refs.count - 1,
135 reason);
136#else
137static void connection_unref(connection *c) {
138#endif
139 if (gpr_unref(&c->refs)) {
140 GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection");
141 gpr_free(c);
142 }
143}
144
145/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700146 * grpc_subchannel implementation
147 */
148
Craig Tillerca3e9d32015-06-27 18:37:27 -0700149#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
150void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) {
151 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s",
152 c, (int)c->refs.count, (int)c->refs.count + 1,
153 reason);
154#else
155void grpc_subchannel_ref(grpc_subchannel *c) {
156#endif
157 gpr_ref(&c->refs);
158}
Craig Tiller2595ab72015-06-25 15:26:00 -0700159
Craig Tillerca3e9d32015-06-27 18:37:27 -0700160#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
161void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) {
162 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s",
163 c, (int)c->refs.count, (int)c->refs.count - 1,
164 reason);
165#else
Craig Tillereb3b12e2015-06-26 14:42:49 -0700166void grpc_subchannel_unref(grpc_subchannel *c) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700167#endif
Craig Tillereb3b12e2015-06-26 14:42:49 -0700168 if (gpr_unref(&c->refs)) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700169 if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700170 gpr_free(c->filters);
171 grpc_channel_args_destroy(c->args);
172 gpr_free(c->addr);
Craig Tiller5f84c842015-06-26 16:08:21 -0700173 grpc_mdctx_unref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700174 grpc_pollset_set_destroy(&c->pollset_set);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700175 grpc_connectivity_state_destroy(&c->state_tracker);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700176 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700177 }
178}
179
Craig Tiller5f84c842015-06-26 16:08:21 -0700180void grpc_subchannel_add_interested_party(grpc_subchannel *c,
181 grpc_pollset *pollset) {
182 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
183}
184
185void grpc_subchannel_del_interested_party(grpc_subchannel *c,
186 grpc_pollset *pollset) {
187 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
188}
189
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700190grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
191 grpc_subchannel_args *args) {
192 grpc_subchannel *c = gpr_malloc(sizeof(*c));
193 memset(c, 0, sizeof(*c));
194 gpr_ref_init(&c->refs, 1);
195 c->connector = connector;
196 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700197 c->num_filters = args->filter_count;
198 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700199 memcpy(c->filters, args->filters,
Craig Tiller5945ee12015-06-27 10:36:09 -0700200 sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700201 c->addr = gpr_malloc(args->addr_len);
202 memcpy(c->addr, args->addr, args->addr_len);
203 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700204 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700205 c->mdctx = args->mdctx;
206 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700207 grpc_pollset_set_init(&c->pollset_set);
208 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700209 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700210 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700211 return c;
212}
213
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700214static void start_connect(grpc_subchannel *c) {
215 grpc_connect_in_args args;
216
217 args.interested_parties = &c->pollset_set;
218 args.addr = c->addr;
219 args.addr_len = c->addr_len;
220 args.deadline = compute_connect_deadline(c);
221 args.channel_args = c->args;
222 args.metadata_context = c->mdctx;
223
224 grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
225}
226
Craig Tillereb3b12e2015-06-26 14:42:49 -0700227void grpc_subchannel_create_call(grpc_subchannel *c,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700228 grpc_transport_stream_op *initial_op,
229 grpc_subchannel_call **target,
230 grpc_iomgr_closure *notify) {
231 connection *con;
232 gpr_mu_lock(&c->mu);
233 if (c->active != NULL) {
234 con = c->active;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700235 CONNECTION_REF(con, "call");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700236 gpr_mu_unlock(&c->mu);
237
238 *target = create_call(con, initial_op);
239 notify->cb(notify->cb_arg, 1);
240 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700241 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
242 w4c->next = c->waiting;
243 w4c->notify = notify;
244 w4c->initial_op = initial_op;
245 w4c->target = target;
246 c->waiting = w4c;
247 grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700248 if (!c->connecting) {
249 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700250 connectivity_state_changed_locked(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700251 GRPC_SUBCHANNEL_REF(c, "connection");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700252 gpr_mu_unlock(&c->mu);
253
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700254 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700255 } else {
256 gpr_mu_unlock(&c->mu);
257 }
258 }
259}
260
Craig Tiller5f84c842015-06-26 16:08:21 -0700261grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
262 grpc_connectivity_state state;
263 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700264 state = grpc_connectivity_state_check(&c->state_tracker);
Craig Tiller5f84c842015-06-26 16:08:21 -0700265 gpr_mu_unlock(&c->mu);
266 return state;
267}
268
269void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
270 grpc_connectivity_state *state,
271 grpc_iomgr_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700272 int do_connect = 0;
Craig Tiller5f84c842015-06-26 16:08:21 -0700273 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700274 if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
275 do_connect = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700276 c->connecting = 1;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700277 GRPC_SUBCHANNEL_REF(c, "connection");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700278 grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
Craig Tiller5f84c842015-06-26 16:08:21 -0700279 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700280 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700281 if (do_connect) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700282 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700283 }
284}
285
Craig Tillerc7b5f762015-06-27 11:48:42 -0700286void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) {
287 abort();
288}
289
Craig Tillerff54c922015-06-26 16:57:20 -0700290static void publish_transport(grpc_subchannel *c) {
Craig Tiller5945ee12015-06-27 10:36:09 -0700291 size_t channel_stack_size;
292 connection *con;
293 grpc_channel_stack *stk;
294 size_t num_filters;
295 const grpc_channel_filter **filters;
Craig Tillerff54c922015-06-26 16:57:20 -0700296 waiting_for_connect *w4c;
Craig Tiller5945ee12015-06-27 10:36:09 -0700297
298 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
299 filters = gpr_malloc(sizeof(*filters) * num_filters);
300 memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
301 memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters);
302 filters[num_filters - 1] = &grpc_connected_channel_filter;
303
304 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
305 con = gpr_malloc(sizeof(connection) + channel_stack_size);
306 stk = (grpc_channel_stack *)(con + 1);
307
Craig Tillerff54c922015-06-26 16:57:20 -0700308 gpr_ref_init(&con->refs, 1);
309 con->subchannel = c;
Craig Tiller5945ee12015-06-27 10:36:09 -0700310 grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700311 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
312 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700313
314 gpr_mu_lock(&c->mu);
315 GPR_ASSERT(c->active == NULL);
316 c->active = con;
317 c->connecting = 0;
318 connectivity_state_changed_locked(c);
319 while ((w4c = c->waiting)) {
320 abort(); /* not implemented */
321 }
322 gpr_mu_unlock(&c->mu);
Craig Tiller5945ee12015-06-27 10:36:09 -0700323
324 gpr_free(filters);
Craig Tillerff54c922015-06-26 16:57:20 -0700325}
326
327static void subchannel_connected(void *arg, int iomgr_success) {
328 grpc_subchannel *c = arg;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700329 if (c->connecting_result.transport) {
Craig Tillerff54c922015-06-26 16:57:20 -0700330 publish_transport(c);
331 } else {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700332 GRPC_SUBCHANNEL_UNREF(c, "connection");
Craig Tillerff54c922015-06-26 16:57:20 -0700333 /* TODO(ctiller): retry after sleeping */
334 abort();
335 }
336}
337
Craig Tiller5f84c842015-06-26 16:08:21 -0700338static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
339 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
340}
341
342static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
343 if (c->connecting) {
344 return GRPC_CHANNEL_CONNECTING;
345 }
346 if (c->active) {
347 return GRPC_CHANNEL_READY;
348 }
349 return GRPC_CHANNEL_IDLE;
350}
351
352static void connectivity_state_changed_locked(grpc_subchannel *c) {
353 grpc_connectivity_state current = compute_connectivity_locked(c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700354 grpc_connectivity_state_set(&c->state_tracker, current);
Craig Tiller5f84c842015-06-26 16:08:21 -0700355}
356
Craig Tiller2595ab72015-06-25 15:26:00 -0700357/*
358 * grpc_subchannel_call implementation
359 */
360
Craig Tillerca3e9d32015-06-27 18:37:27 -0700361#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
362void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
363 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s",
364 c, (int)c->refs.count, (int)c->refs.count + 1,
365 reason);
366#else
367void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
368#endif
369 gpr_ref(&c->refs);
Craig Tiller2595ab72015-06-25 15:26:00 -0700370}
371
Craig Tillerca3e9d32015-06-27 18:37:27 -0700372#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
373void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
374 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s",
375 c, (int)c->refs.count, (int)c->refs.count - 1,
376 reason);
377#else
378void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
379#endif
380 if (gpr_unref(&c->refs)) {
381 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
382 CONNECTION_UNREF(c->connection, "call");
383 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700384 }
385}
386
387void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
388 grpc_transport_stream_op *op) {
389 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
390 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
391 top_elem->filter->start_transport_stream_op(top_elem, op);
392}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700393
394grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700395 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
396 grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
397 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
398 call->connection = con;
399 gpr_ref_init(&call->refs, 1);
400 grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
401 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700402}