blob: 7bd6717a3da1b3a8b311d09460b6fcfdea590e22 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070042#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070043
44typedef struct {
Craig Tiller4ab82d22015-06-29 09:40:33 -070045 /* all fields protected by subchannel->mu */
46 /** refcount */
47 int refs;
48 /** parent subchannel */
Craig Tillereb3b12e2015-06-26 14:42:49 -070049 grpc_subchannel *subchannel;
50} connection;
51
Craig Tillerdf91ba52015-06-29 10:55:46 -070052typedef struct {
53 grpc_iomgr_closure closure;
54 size_t version;
55 grpc_subchannel *subchannel;
56 grpc_connectivity_state connectivity_state;
57} state_watcher;
58
Craig Tiller5f84c842015-06-26 16:08:21 -070059typedef struct waiting_for_connect {
60 struct waiting_for_connect *next;
61 grpc_iomgr_closure *notify;
Craig Tillerdf91ba52015-06-29 10:55:46 -070062 grpc_transport_stream_op initial_op;
Craig Tiller5f84c842015-06-26 16:08:21 -070063 grpc_subchannel_call **target;
Craig Tillerdf91ba52015-06-29 10:55:46 -070064 grpc_subchannel *subchannel;
65 grpc_iomgr_closure continuation;
Craig Tiller5f84c842015-06-26 16:08:21 -070066} waiting_for_connect;
67
Craig Tiller2595ab72015-06-25 15:26:00 -070068struct grpc_subchannel {
Craig Tiller91624662015-06-25 16:31:02 -070069 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070070
71 /** non-transport related channel filters */
72 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070073 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070074 /** channel arguments */
75 grpc_channel_args *args;
76 /** address to connect to */
77 struct sockaddr *addr;
78 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070079 /** metadata context */
80 grpc_mdctx *mdctx;
Craig Tillereb3b12e2015-06-26 14:42:49 -070081
82 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070083 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070084
85 /** callback for connection finishing */
86 grpc_iomgr_closure connected;
87
Craig Tiller5f84c842015-06-26 16:08:21 -070088 /** pollset_set tracking who's interested in a connection
89 being setup */
90 grpc_pollset_set pollset_set;
91
Craig Tillereb3b12e2015-06-26 14:42:49 -070092 /** mutex protecting remaining elements */
93 gpr_mu mu;
94
95 /** active connection */
96 connection *active;
Craig Tillerdf91ba52015-06-29 10:55:46 -070097 /** version number for the active connection */
98 size_t active_version;
Craig Tillerd7b68e72015-06-28 11:41:09 -070099 /** refcount */
100 int refs;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700101 /** are we connecting */
102 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -0700103 /** things waiting for a connection */
104 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700105 /** connectivity state tracking */
106 grpc_connectivity_state_tracker state_tracker;
Craig Tiller2595ab72015-06-25 15:26:00 -0700107};
108
109struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700110 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700111 gpr_refcount refs;
112};
113
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700114#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
115#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700116
Craig Tiller4ab82d22015-06-29 09:40:33 -0700117static grpc_subchannel_call *create_call(connection *con,
118 grpc_transport_stream_op *initial_op);
Craig Tiller5f84c842015-06-26 16:08:21 -0700119static void connectivity_state_changed_locked(grpc_subchannel *c);
120static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
121static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700122static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700123
Craig Tillerd7b68e72015-06-28 11:41:09 -0700124static void subchannel_ref_locked(grpc_subchannel *c);
125static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
126static void connection_ref_locked(connection *c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700127static grpc_subchannel *connection_unref_locked(connection *c)
128 GRPC_MUST_USE_RESULT;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700129static void subchannel_destroy(grpc_subchannel *c);
130
Craig Tiller2595ab72015-06-25 15:26:00 -0700131/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700132 * connection implementation
133 */
134
Craig Tillerd7b68e72015-06-28 11:41:09 -0700135static void connection_destroy(connection *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700136 GPR_ASSERT(c->refs == 0);
137 grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700138 gpr_free(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700139}
140
Craig Tiller4ab82d22015-06-29 09:40:33 -0700141static void connection_ref_locked(connection *c) {
142 subchannel_ref_locked(c->subchannel);
143 ++c->refs;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700144}
145
146static grpc_subchannel *connection_unref_locked(connection *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700147 grpc_subchannel *destroy = NULL;
148 if (subchannel_unref_locked(c->subchannel)) {
149 destroy = c->subchannel;
150 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700151 if (--c->refs == 0 && c->subchannel->active != c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700152 connection_destroy(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700153 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700154 return destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700155}
156
157/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700158 * grpc_subchannel implementation
159 */
160
Craig Tiller4ab82d22015-06-29 09:40:33 -0700161static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; }
Craig Tiller2595ab72015-06-25 15:26:00 -0700162
Craig Tillerd7b68e72015-06-28 11:41:09 -0700163static int subchannel_unref_locked(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700164 return --c->refs == 0;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700165}
166
167void grpc_subchannel_ref(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700168 gpr_mu_lock(&c->mu);
169 subchannel_ref_locked(c);
170 gpr_mu_unlock(&c->mu);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700171}
172
Craig Tillereb3b12e2015-06-26 14:42:49 -0700173void grpc_subchannel_unref(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700174 int destroy;
175 gpr_mu_lock(&c->mu);
176 destroy = subchannel_unref_locked(c);
177 gpr_mu_unlock(&c->mu);
178 if (destroy) subchannel_destroy(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700179}
180
181static void subchannel_destroy(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700182 if (c->active != NULL) {
183 connection_destroy(c->active);
184 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700185 gpr_free(c->filters);
186 grpc_channel_args_destroy(c->args);
187 gpr_free(c->addr);
188 grpc_mdctx_unref(c->mdctx);
189 grpc_pollset_set_destroy(&c->pollset_set);
190 grpc_connectivity_state_destroy(&c->state_tracker);
191 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700192}
193
Craig Tiller5f84c842015-06-26 16:08:21 -0700194void grpc_subchannel_add_interested_party(grpc_subchannel *c,
195 grpc_pollset *pollset) {
196 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
197}
198
199void grpc_subchannel_del_interested_party(grpc_subchannel *c,
200 grpc_pollset *pollset) {
201 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
202}
203
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700204grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
205 grpc_subchannel_args *args) {
206 grpc_subchannel *c = gpr_malloc(sizeof(*c));
207 memset(c, 0, sizeof(*c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700208 c->refs = 1;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700209 c->connector = connector;
210 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700211 c->num_filters = args->filter_count;
212 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700213 memcpy(c->filters, args->filters,
Craig Tiller5945ee12015-06-27 10:36:09 -0700214 sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700215 c->addr = gpr_malloc(args->addr_len);
216 memcpy(c->addr, args->addr, args->addr_len);
217 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700218 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700219 c->mdctx = args->mdctx;
220 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700221 grpc_pollset_set_init(&c->pollset_set);
222 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700223 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700224 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700225 return c;
226}
227
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700228static void start_connect(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700229 grpc_connect_in_args args;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700230
Craig Tiller4ab82d22015-06-29 09:40:33 -0700231 args.interested_parties = &c->pollset_set;
232 args.addr = c->addr;
233 args.addr_len = c->addr_len;
234 args.deadline = compute_connect_deadline(c);
235 args.channel_args = c->args;
236 args.metadata_context = c->mdctx;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700237
Craig Tiller4ab82d22015-06-29 09:40:33 -0700238 grpc_connector_connect(c->connector, &args, &c->connecting_result,
239 &c->connected);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700240}
241
Craig Tillerdf91ba52015-06-29 10:55:46 -0700242static void continue_creating_call(void *arg, int iomgr_success) {
243 waiting_for_connect *w4c = arg;
244 grpc_subchannel_create_call(w4c->subchannel,
245 &w4c->initial_op,
246 w4c->target,
247 w4c->notify);
248 grpc_subchannel_unref(w4c->subchannel);
249 gpr_free(w4c);
250}
251
Craig Tillereb3b12e2015-06-26 14:42:49 -0700252void grpc_subchannel_create_call(grpc_subchannel *c,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700253 grpc_transport_stream_op *initial_op,
254 grpc_subchannel_call **target,
255 grpc_iomgr_closure *notify) {
256 connection *con;
257 gpr_mu_lock(&c->mu);
258 if (c->active != NULL) {
259 con = c->active;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700260 connection_ref_locked(con);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700261 gpr_mu_unlock(&c->mu);
262
263 *target = create_call(con, initial_op);
264 notify->cb(notify->cb_arg, 1);
265 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700266 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
267 w4c->next = c->waiting;
268 w4c->notify = notify;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700269 w4c->initial_op = *initial_op;
Craig Tiller5f84c842015-06-26 16:08:21 -0700270 w4c->target = target;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700271 w4c->subchannel = c;
272 subchannel_ref_locked(c);
273 grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700274 c->waiting = w4c;
275 grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700276 if (!c->connecting) {
277 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700278 connectivity_state_changed_locked(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700279 subchannel_ref_locked(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700280 gpr_mu_unlock(&c->mu);
281
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700282 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700283 } else {
284 gpr_mu_unlock(&c->mu);
285 }
286 }
287}
288
Craig Tiller5f84c842015-06-26 16:08:21 -0700289grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
290 grpc_connectivity_state state;
291 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700292 state = grpc_connectivity_state_check(&c->state_tracker);
Craig Tiller5f84c842015-06-26 16:08:21 -0700293 gpr_mu_unlock(&c->mu);
294 return state;
295}
296
297void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
298 grpc_connectivity_state *state,
299 grpc_iomgr_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700300 int do_connect = 0;
Craig Tiller5f84c842015-06-26 16:08:21 -0700301 gpr_mu_lock(&c->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700302 if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
303 notify)) {
304 do_connect = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700305 c->connecting = 1;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700306 subchannel_ref_locked(c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700307 grpc_connectivity_state_set(&c->state_tracker,
308 compute_connectivity_locked(c));
Craig Tiller5f84c842015-06-26 16:08:21 -0700309 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700310 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700311 if (do_connect) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700312 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700313 }
314}
315
Craig Tiller4ab82d22015-06-29 09:40:33 -0700316void grpc_subchannel_process_transport_op(grpc_subchannel *c,
317 grpc_transport_op *op) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700318 abort(); /* not implemented */
319}
320
321static void on_state_changed(void *p, int iomgr_success) {
322 state_watcher *sw = p;
323 grpc_subchannel *c = sw->subchannel;
324 gpr_mu *mu = &c->mu;
325 int destroy;
326 grpc_transport_op op;
327 grpc_channel_element *elem;
328 connection *destroy_connection = NULL;
329 int do_connect = 0;
330
331 gpr_mu_lock(mu);
332
333 /* if we failed or there is a version number mismatch, just leave
334 this closure */
335 if (!iomgr_success || sw->subchannel->active_version != sw->version) {
336 goto done;
337 }
338
339 switch (sw->connectivity_state) {
340 case GRPC_CHANNEL_CONNECTING:
341 case GRPC_CHANNEL_READY:
342 case GRPC_CHANNEL_IDLE:
343 /* all is still good: keep watching */
344 memset(&op, 0, sizeof(op));
345 op.connectivity_state = &sw->connectivity_state;
346 op.on_connectivity_state_change = &sw->closure;
347 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
348 elem->filter->start_transport_op(elem, &op);
349 /* early out */
350 gpr_mu_unlock(mu);
351 return;
352 case GRPC_CHANNEL_FATAL_FAILURE:
353 /* things have gone wrong, deactivate and enter idle */
354 if (sw->subchannel->active->refs == 0) {
355 destroy_connection = sw->subchannel->active;
356 }
357 sw->subchannel->active = NULL;
358 break;
359 case GRPC_CHANNEL_TRANSIENT_FAILURE:
360 /* things are starting to go wrong, reconnect but don't deactivate */
361 subchannel_ref_locked(c);
362 do_connect = 1;
363 c->connecting = 1;
364 break;
365 }
366
367done:
368 grpc_connectivity_state_set(&c->state_tracker,
369 compute_connectivity_locked(c));
370 destroy = subchannel_unref_locked(c);
371 gpr_free(sw);
372 gpr_mu_unlock(mu);
373 if (do_connect) {
374 start_connect(c);
375 }
376 if (destroy) {
377 subchannel_destroy(c);
378 }
379 if (destroy_connection != NULL) {
380 connection_destroy(destroy_connection);
381 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700382}
383
Craig Tillerff54c922015-06-26 16:57:20 -0700384static void publish_transport(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700385 size_t channel_stack_size;
386 connection *con;
387 grpc_channel_stack *stk;
388 size_t num_filters;
389 const grpc_channel_filter **filters;
390 waiting_for_connect *w4c;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700391 grpc_transport_op op;
392 state_watcher *sw;
393 connection *destroy_connection = NULL;
394 grpc_channel_element *elem;
Craig Tiller5945ee12015-06-27 10:36:09 -0700395
Craig Tillerdf91ba52015-06-29 10:55:46 -0700396 /* build final filter list */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700397 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
398 filters = gpr_malloc(sizeof(*filters) * num_filters);
399 memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
400 memcpy(filters + c->num_filters, c->connecting_result.filters,
401 sizeof(*filters) * c->connecting_result.num_filters);
402 filters[num_filters - 1] = &grpc_connected_channel_filter;
Craig Tiller5945ee12015-06-27 10:36:09 -0700403
Craig Tillerdf91ba52015-06-29 10:55:46 -0700404 /* construct channel stack */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700405 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
406 con = gpr_malloc(sizeof(connection) + channel_stack_size);
407 stk = (grpc_channel_stack *)(con + 1);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700408 con->refs = 0;
409 con->subchannel = c;
410 grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
411 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
412 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700413
Craig Tillerdf91ba52015-06-29 10:55:46 -0700414 /* initialize state watcher */
415 sw = gpr_malloc(sizeof(*sw));
416 grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
417 sw->subchannel = c;
418 sw->connectivity_state = GRPC_CHANNEL_READY;
419
Craig Tiller4ab82d22015-06-29 09:40:33 -0700420 gpr_mu_lock(&c->mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700421
422 /* publish */
423 if (c->active != NULL && c->active->refs == 0) {
424 destroy_connection = c->active;
425 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700426 c->active = con;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700427 c->active_version++;
428 sw->version = c->active_version;
Craig Tiller4ab82d22015-06-29 09:40:33 -0700429 c->connecting = 0;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700430
431 /* watch for changes; subchannel ref for connecting is donated
432 to the state watcher */
433 memset(&op, 0, sizeof(op));
434 op.connectivity_state = &sw->connectivity_state;
435 op.on_connectivity_state_change = &sw->closure;
436 elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
437 elem->filter->start_transport_op(elem, &op);
438
439 /* signal completion */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700440 connectivity_state_changed_locked(c);
441 while ((w4c = c->waiting)) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700442 c->waiting = w4c->next;
443 grpc_iomgr_add_callback(&w4c->continuation);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700444 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700445
Craig Tiller4ab82d22015-06-29 09:40:33 -0700446 gpr_mu_unlock(&c->mu);
Craig Tiller5945ee12015-06-27 10:36:09 -0700447
Craig Tiller4ab82d22015-06-29 09:40:33 -0700448 gpr_free(filters);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700449
Craig Tillerdf91ba52015-06-29 10:55:46 -0700450 if (destroy_connection != NULL) {
451 connection_destroy(destroy_connection);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700452 }
453}
Craig Tillerff54c922015-06-26 16:57:20 -0700454
455static void subchannel_connected(void *arg, int iomgr_success) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700456 grpc_subchannel *c = arg;
457 if (c->connecting_result.transport) {
458 publish_transport(c);
459 } else {
460 int destroy;
461 gpr_mu_lock(&c->mu);
462 destroy = subchannel_unref_locked(c);
463 gpr_mu_unlock(&c->mu);
464 if (destroy) subchannel_destroy(c);
465 /* TODO(ctiller): retry after sleeping */
466 abort();
467 }
Craig Tillerff54c922015-06-26 16:57:20 -0700468}
469
Craig Tiller5f84c842015-06-26 16:08:21 -0700470static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
471 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
472}
473
474static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
475 if (c->connecting) {
476 return GRPC_CHANNEL_CONNECTING;
477 }
478 if (c->active) {
479 return GRPC_CHANNEL_READY;
480 }
481 return GRPC_CHANNEL_IDLE;
482}
483
484static void connectivity_state_changed_locked(grpc_subchannel *c) {
485 grpc_connectivity_state current = compute_connectivity_locked(c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700486 grpc_connectivity_state_set(&c->state_tracker, current);
Craig Tiller5f84c842015-06-26 16:08:21 -0700487}
488
Craig Tiller2595ab72015-06-25 15:26:00 -0700489/*
490 * grpc_subchannel_call implementation
491 */
492
Craig Tiller4ab82d22015-06-29 09:40:33 -0700493void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); }
Craig Tiller2595ab72015-06-25 15:26:00 -0700494
Craig Tillerca3e9d32015-06-27 18:37:27 -0700495void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700496 if (gpr_unref(&c->refs)) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700497 gpr_mu *mu = &c->connection->subchannel->mu;
498 grpc_subchannel *destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700499 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700500 gpr_mu_lock(mu);
501 destroy = connection_unref_locked(c->connection);
502 gpr_mu_unlock(mu);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700503 gpr_free(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700504 if (destroy) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700505 subchannel_destroy(destroy);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700506 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700507 }
508}
509
510void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
511 grpc_transport_stream_op *op) {
512 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
513 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
514 top_elem->filter->start_transport_stream_op(top_elem, op);
515}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700516
Craig Tiller4ab82d22015-06-29 09:40:33 -0700517grpc_subchannel_call *create_call(connection *con,
518 grpc_transport_stream_op *initial_op) {
519 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
520 grpc_subchannel_call *call =
521 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700522 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
523 call->connection = con;
524 gpr_ref_init(&call->refs, 1);
525 grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
526 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700527}