blob: e863c5b97ccd4e860f5dba10db720476db01c8e1 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070042
43typedef struct {
44 gpr_refcount refs;
45 grpc_subchannel *subchannel;
46} connection;
47
Craig Tiller5f84c842015-06-26 16:08:21 -070048typedef struct waiting_for_connect {
49 struct waiting_for_connect *next;
50 grpc_iomgr_closure *notify;
51 grpc_transport_stream_op *initial_op;
52 grpc_subchannel_call **target;
53} waiting_for_connect;
54
55typedef struct connectivity_state_watcher {
56 struct connectivity_state_watcher *next;
57 grpc_iomgr_closure *notify;
58 grpc_connectivity_state *current;
59} connectivity_state_watcher;
60
Craig Tiller2595ab72015-06-25 15:26:00 -070061struct grpc_subchannel {
62 gpr_refcount refs;
Craig Tiller91624662015-06-25 16:31:02 -070063 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070064
65 /** non-transport related channel filters */
66 const grpc_channel_filter **filters;
67 size_t filter_count;
68 /** channel arguments */
69 grpc_channel_args *args;
70 /** address to connect to */
71 struct sockaddr *addr;
72 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070073 /** metadata context */
74 grpc_mdctx *mdctx;
Craig Tillereb3b12e2015-06-26 14:42:49 -070075
76 /** set during connection */
77 grpc_transport *connecting_transport;
78
79 /** callback for connection finishing */
80 grpc_iomgr_closure connected;
81
Craig Tiller5f84c842015-06-26 16:08:21 -070082 /** pollset_set tracking who's interested in a connection
83 being setup */
84 grpc_pollset_set pollset_set;
85
Craig Tillereb3b12e2015-06-26 14:42:49 -070086 /** mutex protecting remaining elements */
87 gpr_mu mu;
88
89 /** active connection */
90 connection *active;
91 /** are we connecting */
92 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -070093 /** things waiting for a connection */
94 waiting_for_connect *waiting;
95 /** things watching the connectivity state */
96 connectivity_state_watcher *watchers;
Craig Tiller2595ab72015-06-25 15:26:00 -070097};
98
99struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700101 gpr_refcount refs;
102};
103
104#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
105
Craig Tillereb3b12e2015-06-26 14:42:49 -0700106static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
Craig Tiller5f84c842015-06-26 16:08:21 -0700107static void connectivity_state_changed_locked(grpc_subchannel *c);
108static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
109static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700110static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700111
Craig Tiller2595ab72015-06-25 15:26:00 -0700112/*
113 * grpc_subchannel implementation
114 */
115
Craig Tillereb3b12e2015-06-26 14:42:49 -0700116void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); }
Craig Tiller2595ab72015-06-25 15:26:00 -0700117
Craig Tillereb3b12e2015-06-26 14:42:49 -0700118void grpc_subchannel_unref(grpc_subchannel *c) {
119 if (gpr_unref(&c->refs)) {
120 gpr_free(c->filters);
121 grpc_channel_args_destroy(c->args);
122 gpr_free(c->addr);
Craig Tiller5f84c842015-06-26 16:08:21 -0700123 grpc_mdctx_unref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700124 grpc_pollset_set_destroy(&c->pollset_set);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700125 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700126 }
127}
128
Craig Tiller5f84c842015-06-26 16:08:21 -0700129void grpc_subchannel_add_interested_party(grpc_subchannel *c,
130 grpc_pollset *pollset) {
131 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
132}
133
134void grpc_subchannel_del_interested_party(grpc_subchannel *c,
135 grpc_pollset *pollset) {
136 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
137}
138
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700139grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
140 grpc_subchannel_args *args) {
141 grpc_subchannel *c = gpr_malloc(sizeof(*c));
142 memset(c, 0, sizeof(*c));
143 gpr_ref_init(&c->refs, 1);
144 c->connector = connector;
145 grpc_connector_ref(c->connector);
Craig Tillerff54c922015-06-26 16:57:20 -0700146 c->filter_count = args->filter_count + 1;
147 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700148 memcpy(c->filters, args->filters,
149 sizeof(grpc_channel_filter *) * args->filter_count);
Craig Tillerff54c922015-06-26 16:57:20 -0700150 c->filters[c->filter_count - 1] = &grpc_connected_channel_filter;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700151 c->addr = gpr_malloc(args->addr_len);
152 memcpy(c->addr, args->addr, args->addr_len);
153 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700154 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700155 c->mdctx = args->mdctx;
156 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700157 grpc_pollset_set_init(&c->pollset_set);
158 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700159 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700160 return c;
161}
162
Craig Tillereb3b12e2015-06-26 14:42:49 -0700163void grpc_subchannel_create_call(grpc_subchannel *c,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700164 grpc_transport_stream_op *initial_op,
165 grpc_subchannel_call **target,
166 grpc_iomgr_closure *notify) {
167 connection *con;
168 gpr_mu_lock(&c->mu);
169 if (c->active != NULL) {
170 con = c->active;
171 gpr_ref(&con->refs);
172 gpr_mu_unlock(&c->mu);
173
174 *target = create_call(con, initial_op);
175 notify->cb(notify->cb_arg, 1);
176 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700177 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
178 w4c->next = c->waiting;
179 w4c->notify = notify;
180 w4c->initial_op = initial_op;
181 w4c->target = target;
182 c->waiting = w4c;
183 grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700184 if (!c->connecting) {
185 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700186 connectivity_state_changed_locked(c);
Craig Tillerff54c922015-06-26 16:57:20 -0700187 grpc_subchannel_ref(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700188 gpr_mu_unlock(&c->mu);
189
Craig Tiller5f84c842015-06-26 16:08:21 -0700190 grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
191 c->addr_len, compute_connect_deadline(c), c->args,
192 c->mdctx, &c->connecting_transport, &c->connected);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700193 } else {
194 gpr_mu_unlock(&c->mu);
195 }
196 }
197}
198
Craig Tiller5f84c842015-06-26 16:08:21 -0700199grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
200 grpc_connectivity_state state;
201 gpr_mu_lock(&c->mu);
202 state = compute_connectivity_locked(c);
203 gpr_mu_unlock(&c->mu);
204 return state;
205}
206
207void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
208 grpc_connectivity_state *state,
209 grpc_iomgr_closure *notify) {
210 grpc_connectivity_state current;
211 int do_connect = 0;
212 connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
213 w->current = state;
214 w->notify = notify;
215 gpr_mu_lock(&c->mu);
216 current = compute_connectivity_locked(c);
217 if (current == GRPC_CHANNEL_IDLE) {
218 current = GRPC_CHANNEL_CONNECTING;
219 c->connecting = 1;
220 do_connect = 1;
Craig Tillerff54c922015-06-26 16:57:20 -0700221 grpc_subchannel_ref(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700222 connectivity_state_changed_locked(c);
223 }
224 if (current != *state) {
225 gpr_mu_unlock(&c->mu);
226 *state = current;
227 grpc_iomgr_add_callback(notify);
228 gpr_free(w);
229 } else {
230 w->next = c->watchers;
231 c->watchers = w;
232 gpr_mu_unlock(&c->mu);
233 }
234 if (do_connect) {
235 grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len,
236 compute_connect_deadline(c), c->args, c->mdctx,
237 &c->connecting_transport, &c->connected);
238 }
239}
240
Craig Tillerff54c922015-06-26 16:57:20 -0700241static void publish_transport(grpc_subchannel *c) {
242 size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count);
243 connection *con = gpr_malloc(sizeof(connection) + channel_stack_size);
244 grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1);
245 waiting_for_connect *w4c;
246 gpr_ref_init(&con->refs, 1);
247 con->subchannel = c;
248 grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
249 grpc_connected_channel_bind_transport(stk, c->connecting_transport);
250 c->connecting_transport = NULL;
251
252 gpr_mu_lock(&c->mu);
253 GPR_ASSERT(c->active == NULL);
254 c->active = con;
255 c->connecting = 0;
256 connectivity_state_changed_locked(c);
257 while ((w4c = c->waiting)) {
258 abort(); /* not implemented */
259 }
260 gpr_mu_unlock(&c->mu);
261}
262
263static void subchannel_connected(void *arg, int iomgr_success) {
264 grpc_subchannel *c = arg;
265 if (c->connecting_transport) {
266 publish_transport(c);
267 } else {
268 grpc_subchannel_unref(c);
269 /* TODO(ctiller): retry after sleeping */
270 abort();
271 }
272}
273
Craig Tiller5f84c842015-06-26 16:08:21 -0700274static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
275 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
276}
277
278static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
279 if (c->connecting) {
280 return GRPC_CHANNEL_CONNECTING;
281 }
282 if (c->active) {
283 return GRPC_CHANNEL_READY;
284 }
285 return GRPC_CHANNEL_IDLE;
286}
287
288static void connectivity_state_changed_locked(grpc_subchannel *c) {
289 grpc_connectivity_state current = compute_connectivity_locked(c);
290 connectivity_state_watcher *new = NULL;
291 connectivity_state_watcher *w;
292 while ((w = c->watchers)) {
293 c->watchers = w->next;
294
295 if (current != *w->current) {
296 *w->current = current;
297 grpc_iomgr_add_callback(w->notify);
298 gpr_free(w);
299 } else {
300 w->next = new;
301 new = w;
302 }
303 }
304 c->watchers = new;
305}
306
Craig Tiller2595ab72015-06-25 15:26:00 -0700307/*
308 * grpc_subchannel_call implementation
309 */
310
311void grpc_subchannel_call_ref(grpc_subchannel_call *call) {
312 gpr_ref(&call->refs);
313}
314
315void grpc_subchannel_call_unref(grpc_subchannel_call *call) {
316 if (gpr_unref(&call->refs)) {
317 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700318 if (gpr_unref(&call->connection->refs)) {
319 gpr_free(call->connection);
320 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700321 gpr_free(call);
322 }
323}
324
325void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
326 grpc_transport_stream_op *op) {
327 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
328 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
329 top_elem->filter->start_transport_stream_op(top_elem, op);
330}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700331
332grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
333 abort();
334 return NULL;
335}