blob: 19ec1c0b4ba1394ae3bf28eb699f89bb89d2cf06 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070042#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070043
44typedef struct {
Craig Tiller4ab82d22015-06-29 09:40:33 -070045 /* all fields protected by subchannel->mu */
46 /** refcount */
47 int refs;
48 /** parent subchannel */
Craig Tillereb3b12e2015-06-26 14:42:49 -070049 grpc_subchannel *subchannel;
50} connection;
51
Craig Tillerdf91ba52015-06-29 10:55:46 -070052typedef struct {
53 grpc_iomgr_closure closure;
54 size_t version;
55 grpc_subchannel *subchannel;
56 grpc_connectivity_state connectivity_state;
57} state_watcher;
58
Craig Tiller5f84c842015-06-26 16:08:21 -070059typedef struct waiting_for_connect {
60 struct waiting_for_connect *next;
61 grpc_iomgr_closure *notify;
Craig Tillerabf36382015-06-29 16:13:27 -070062 grpc_pollset *pollset;
Craig Tiller5f84c842015-06-26 16:08:21 -070063 grpc_subchannel_call **target;
Craig Tillerdf91ba52015-06-29 10:55:46 -070064 grpc_subchannel *subchannel;
65 grpc_iomgr_closure continuation;
Craig Tiller5f84c842015-06-26 16:08:21 -070066} waiting_for_connect;
67
Craig Tiller2595ab72015-06-25 15:26:00 -070068struct grpc_subchannel {
Craig Tiller91624662015-06-25 16:31:02 -070069 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070070
71 /** non-transport related channel filters */
72 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070073 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070074 /** channel arguments */
75 grpc_channel_args *args;
76 /** address to connect to */
77 struct sockaddr *addr;
78 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070079 /** metadata context */
80 grpc_mdctx *mdctx;
Craig Tiller98465032015-06-29 14:36:42 -070081 /** master channel */
82 grpc_channel *master;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -070083 /** have we seen a disconnection? */
84 int disconnected;
Craig Tillereb3b12e2015-06-26 14:42:49 -070085
86 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070087 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070088
89 /** callback for connection finishing */
90 grpc_iomgr_closure connected;
91
Craig Tiller5f84c842015-06-26 16:08:21 -070092 /** pollset_set tracking who's interested in a connection
93 being setup */
94 grpc_pollset_set pollset_set;
95
Craig Tillereb3b12e2015-06-26 14:42:49 -070096 /** mutex protecting remaining elements */
97 gpr_mu mu;
98
99 /** active connection */
100 connection *active;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700101 /** version number for the active connection */
102 size_t active_version;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700103 /** refcount */
104 int refs;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700105 /** are we connecting */
106 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -0700107 /** things waiting for a connection */
108 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700109 /** connectivity state tracking */
110 grpc_connectivity_state_tracker state_tracker;
Craig Tiller2595ab72015-06-25 15:26:00 -0700111};
112
113struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700114 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700115 gpr_refcount refs;
116};
117
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700118#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
119#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700120
Craig Tillerabf36382015-06-29 16:13:27 -0700121static grpc_subchannel_call *create_call(connection *con);
Craig Tiller5f84c842015-06-26 16:08:21 -0700122static void connectivity_state_changed_locked(grpc_subchannel *c);
123static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
124static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700125static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700126
Craig Tillerc3967532015-06-29 14:59:38 -0700127static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
128static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
129static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
130static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
Craig Tiller4ab82d22015-06-29 09:40:33 -0700131 GRPC_MUST_USE_RESULT;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700132static void subchannel_destroy(grpc_subchannel *c);
133
Craig Tillerc3967532015-06-29 14:59:38 -0700134#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
135#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p), __FILE__, __LINE__, (r))
136#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p), __FILE__, __LINE__, (r))
137#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p), __FILE__, __LINE__, (r))
138#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p), __FILE__, __LINE__, (r))
139#define REF_PASS_ARGS , file, line, reason
140#define REF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", (name), (p), (p)->refs, (p)->refs + 1, reason)
141#define UNREF_LOG(name, p) gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", (name), (p), (p)->refs, (p)->refs - 1, reason)
142#else
143#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
144#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
Craig Tiller11bf14e2015-06-29 16:35:41 -0700145#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
146#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p))
Craig Tillerc3967532015-06-29 14:59:38 -0700147#define REF_PASS_ARGS
Craig Tiller11bf14e2015-06-29 16:35:41 -0700148#define REF_LOG(name, p) \
149 do { \
150 } while (0)
151#define UNREF_LOG(name, p) \
152 do { \
153 } while (0)
Craig Tillerc3967532015-06-29 14:59:38 -0700154#endif
155
Craig Tiller2595ab72015-06-25 15:26:00 -0700156/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700157 * connection implementation
158 */
159
Craig Tillerd7b68e72015-06-28 11:41:09 -0700160static void connection_destroy(connection *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700161 GPR_ASSERT(c->refs == 0);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700162 gpr_log(GPR_DEBUG, "CONNECTION_DESTROY %p", c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700163 grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700164 gpr_free(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700165}
166
Craig Tillerc3967532015-06-29 14:59:38 -0700167static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
168 REF_LOG("CONNECTION", c);
169 subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700170 ++c->refs;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700171}
172
Craig Tillerc3967532015-06-29 14:59:38 -0700173static grpc_subchannel *connection_unref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700174 grpc_subchannel *destroy = NULL;
Craig Tillerc3967532015-06-29 14:59:38 -0700175 UNREF_LOG("CONNECTION", c);
176 if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700177 destroy = c->subchannel;
178 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700179 if (--c->refs == 0 && c->subchannel->active != c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700180 connection_destroy(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700181 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700182 return destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700183}
184
185/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700186 * grpc_subchannel implementation
187 */
188
Craig Tillerc3967532015-06-29 14:59:38 -0700189static void subchannel_ref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
190 REF_LOG("SUBCHANNEL", c);
191 ++c->refs;
192}
Craig Tiller2595ab72015-06-25 15:26:00 -0700193
Craig Tillerc3967532015-06-29 14:59:38 -0700194static int subchannel_unref_locked(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
195 UNREF_LOG("SUBCHANNEL", c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700196 return --c->refs == 0;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700197}
198
Craig Tillerc3967532015-06-29 14:59:38 -0700199void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700200 gpr_mu_lock(&c->mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700201 subchannel_ref_locked(c REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700202 gpr_mu_unlock(&c->mu);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700203}
204
Craig Tillerc3967532015-06-29 14:59:38 -0700205void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700206 int destroy;
207 gpr_mu_lock(&c->mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700208 destroy = subchannel_unref_locked(c REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700209 gpr_mu_unlock(&c->mu);
210 if (destroy) subchannel_destroy(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700211}
212
213static void subchannel_destroy(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700214 if (c->active != NULL) {
215 connection_destroy(c->active);
216 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700217 gpr_free(c->filters);
218 grpc_channel_args_destroy(c->args);
219 gpr_free(c->addr);
220 grpc_mdctx_unref(c->mdctx);
221 grpc_pollset_set_destroy(&c->pollset_set);
222 grpc_connectivity_state_destroy(&c->state_tracker);
Craig Tillerc3967532015-06-29 14:59:38 -0700223 grpc_connector_unref(c->connector);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700224 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700225}
226
Craig Tiller5f84c842015-06-26 16:08:21 -0700227void grpc_subchannel_add_interested_party(grpc_subchannel *c,
228 grpc_pollset *pollset) {
229 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
230}
231
232void grpc_subchannel_del_interested_party(grpc_subchannel *c,
233 grpc_pollset *pollset) {
234 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
235}
236
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700237grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
238 grpc_subchannel_args *args) {
239 grpc_subchannel *c = gpr_malloc(sizeof(*c));
240 memset(c, 0, sizeof(*c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700241 c->refs = 1;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700242 c->connector = connector;
243 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700244 c->num_filters = args->filter_count;
245 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700246 memcpy(c->filters, args->filters,
Craig Tiller5945ee12015-06-27 10:36:09 -0700247 sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700248 c->addr = gpr_malloc(args->addr_len);
249 memcpy(c->addr, args->addr, args->addr_len);
250 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700251 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700252 c->mdctx = args->mdctx;
Craig Tiller98465032015-06-29 14:36:42 -0700253 c->master = args->master;
Craig Tiller5f84c842015-06-26 16:08:21 -0700254 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700255 grpc_pollset_set_init(&c->pollset_set);
256 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700257 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700258 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700259 return c;
260}
261
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700262static void start_connect(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700263 grpc_connect_in_args args;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700264
Craig Tiller4ab82d22015-06-29 09:40:33 -0700265 args.interested_parties = &c->pollset_set;
266 args.addr = c->addr;
267 args.addr_len = c->addr_len;
268 args.deadline = compute_connect_deadline(c);
269 args.channel_args = c->args;
270 args.metadata_context = c->mdctx;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700271
Craig Tiller4ab82d22015-06-29 09:40:33 -0700272 grpc_connector_connect(c->connector, &args, &c->connecting_result,
273 &c->connected);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700274}
275
Craig Tillerdf91ba52015-06-29 10:55:46 -0700276static void continue_creating_call(void *arg, int iomgr_success) {
277 waiting_for_connect *w4c = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700278 grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700279 w4c->notify);
Craig Tillerc3967532015-06-29 14:59:38 -0700280 GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700281 gpr_free(w4c);
282}
283
Craig Tillerabf36382015-06-29 16:13:27 -0700284void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700285 grpc_subchannel_call **target,
286 grpc_iomgr_closure *notify) {
287 connection *con;
288 gpr_mu_lock(&c->mu);
289 if (c->active != NULL) {
290 con = c->active;
Craig Tillerc3967532015-06-29 14:59:38 -0700291 CONNECTION_REF_LOCKED(con, "call");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700292 gpr_mu_unlock(&c->mu);
293
Craig Tillerabf36382015-06-29 16:13:27 -0700294 *target = create_call(con);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700295 notify->cb(notify->cb_arg, 1);
296 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700297 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
298 w4c->next = c->waiting;
299 w4c->notify = notify;
Craig Tillerabf36382015-06-29 16:13:27 -0700300 w4c->pollset = pollset;
Craig Tiller5f84c842015-06-26 16:08:21 -0700301 w4c->target = target;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700302 w4c->subchannel = c;
Craig Tiller98465032015-06-29 14:36:42 -0700303 /* released when clearing w4c */
Craig Tillerc3967532015-06-29 14:59:38 -0700304 SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700305 grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700306 c->waiting = w4c;
Craig Tillerabf36382015-06-29 16:13:27 -0700307 grpc_subchannel_add_interested_party(c, pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700308 if (!c->connecting) {
309 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700310 connectivity_state_changed_locked(c);
Craig Tiller98465032015-06-29 14:36:42 -0700311 /* released by connection */
Craig Tillerc3967532015-06-29 14:59:38 -0700312 SUBCHANNEL_REF_LOCKED(c, "connecting");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700313 gpr_mu_unlock(&c->mu);
314
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700315 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700316 } else {
317 gpr_mu_unlock(&c->mu);
318 }
319 }
320}
321
Craig Tiller5f84c842015-06-26 16:08:21 -0700322grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
323 grpc_connectivity_state state;
324 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700325 state = grpc_connectivity_state_check(&c->state_tracker);
Craig Tiller5f84c842015-06-26 16:08:21 -0700326 gpr_mu_unlock(&c->mu);
327 return state;
328}
329
330void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
331 grpc_connectivity_state *state,
332 grpc_iomgr_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700333 int do_connect = 0;
Craig Tiller5f84c842015-06-26 16:08:21 -0700334 gpr_mu_lock(&c->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700335 if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
336 notify)) {
337 do_connect = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700338 c->connecting = 1;
Craig Tiller98465032015-06-29 14:36:42 -0700339 /* released by connection */
Craig Tillerc3967532015-06-29 14:59:38 -0700340 SUBCHANNEL_REF_LOCKED(c, "connecting");
Craig Tiller11bf14e2015-06-29 16:35:41 -0700341 connectivity_state_changed_locked(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700342 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700343 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700344 if (do_connect) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700345 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700346 }
347}
348
Craig Tiller4ab82d22015-06-29 09:40:33 -0700349void grpc_subchannel_process_transport_op(grpc_subchannel *c,
350 grpc_transport_op *op) {
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700351 connection *con = NULL;
352 grpc_subchannel *destroy;
353 gpr_mu_lock(&c->mu);
354 if (op->disconnect) {
355 c->disconnected = 1;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700356 connectivity_state_changed_locked(c);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700357 }
358 if (c->active != NULL) {
359 con = c->active;
360 CONNECTION_REF_LOCKED(con, "transport-op");
361 }
362 gpr_mu_unlock(&c->mu);
363
364 if (con != NULL) {
365 grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
366 grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
367 top_elem->filter->start_transport_op(top_elem, op);
368
369 gpr_mu_lock(&c->mu);
370 destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
371 gpr_mu_unlock(&c->mu);
372 if (destroy) {
373 subchannel_destroy(destroy);
374 }
375 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700376}
377
378static void on_state_changed(void *p, int iomgr_success) {
379 state_watcher *sw = p;
380 grpc_subchannel *c = sw->subchannel;
381 gpr_mu *mu = &c->mu;
382 int destroy;
383 grpc_transport_op op;
384 grpc_channel_element *elem;
385 connection *destroy_connection = NULL;
386 int do_connect = 0;
387
388 gpr_mu_lock(mu);
389
390 /* if we failed or there is a version number mismatch, just leave
391 this closure */
392 if (!iomgr_success || sw->subchannel->active_version != sw->version) {
393 goto done;
394 }
395
Craig Tiller11bf14e2015-06-29 16:35:41 -0700396 gpr_log(GPR_DEBUG, "TRANSPORT STATE: %d", sw->connectivity_state);
397
Craig Tillerdf91ba52015-06-29 10:55:46 -0700398 switch (sw->connectivity_state) {
399 case GRPC_CHANNEL_CONNECTING:
400 case GRPC_CHANNEL_READY:
401 case GRPC_CHANNEL_IDLE:
402 /* all is still good: keep watching */
403 memset(&op, 0, sizeof(op));
404 op.connectivity_state = &sw->connectivity_state;
405 op.on_connectivity_state_change = &sw->closure;
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700406 elem = grpc_channel_stack_element(
407 CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700408 elem->filter->start_transport_op(elem, &op);
409 /* early out */
410 gpr_mu_unlock(mu);
411 return;
412 case GRPC_CHANNEL_FATAL_FAILURE:
413 /* things have gone wrong, deactivate and enter idle */
414 if (sw->subchannel->active->refs == 0) {
415 destroy_connection = sw->subchannel->active;
416 }
417 sw->subchannel->active = NULL;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700418 grpc_connectivity_state_set(&c->state_tracker,
419 GRPC_CHANNEL_TRANSIENT_FAILURE);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700420 break;
421 case GRPC_CHANNEL_TRANSIENT_FAILURE:
422 /* things are starting to go wrong, reconnect but don't deactivate */
Craig Tiller98465032015-06-29 14:36:42 -0700423 /* released by connection */
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700424 SUBCHANNEL_REF_LOCKED(c, "connecting");
Craig Tiller11bf14e2015-06-29 16:35:41 -0700425 grpc_connectivity_state_set(&c->state_tracker,
426 GRPC_CHANNEL_TRANSIENT_FAILURE);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700427 do_connect = 1;
428 c->connecting = 1;
429 break;
430 }
431
432done:
Craig Tiller11bf14e2015-06-29 16:35:41 -0700433 connectivity_state_changed_locked(c);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700434 destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700435 gpr_free(sw);
436 gpr_mu_unlock(mu);
437 if (do_connect) {
438 start_connect(c);
439 }
440 if (destroy) {
441 subchannel_destroy(c);
442 }
443 if (destroy_connection != NULL) {
444 connection_destroy(destroy_connection);
445 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700446}
447
Craig Tillerff54c922015-06-26 16:57:20 -0700448static void publish_transport(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700449 size_t channel_stack_size;
450 connection *con;
451 grpc_channel_stack *stk;
452 size_t num_filters;
453 const grpc_channel_filter **filters;
454 waiting_for_connect *w4c;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700455 grpc_transport_op op;
456 state_watcher *sw;
457 connection *destroy_connection = NULL;
458 grpc_channel_element *elem;
Craig Tiller5945ee12015-06-27 10:36:09 -0700459
Craig Tillerdf91ba52015-06-29 10:55:46 -0700460 /* build final filter list */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700461 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
462 filters = gpr_malloc(sizeof(*filters) * num_filters);
463 memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
464 memcpy(filters + c->num_filters, c->connecting_result.filters,
465 sizeof(*filters) * c->connecting_result.num_filters);
466 filters[num_filters - 1] = &grpc_connected_channel_filter;
Craig Tiller5945ee12015-06-27 10:36:09 -0700467
Craig Tillerdf91ba52015-06-29 10:55:46 -0700468 /* construct channel stack */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700469 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
470 con = gpr_malloc(sizeof(connection) + channel_stack_size);
471 stk = (grpc_channel_stack *)(con + 1);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700472 con->refs = 0;
473 con->subchannel = c;
Craig Tiller98465032015-06-29 14:36:42 -0700474 grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, stk);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700475 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
Craig Tiller98465032015-06-29 14:36:42 -0700476 gpr_free(c->connecting_result.filters);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700477 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700478
Craig Tillerdf91ba52015-06-29 10:55:46 -0700479 /* initialize state watcher */
480 sw = gpr_malloc(sizeof(*sw));
481 grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
482 sw->subchannel = c;
483 sw->connectivity_state = GRPC_CHANNEL_READY;
484
Craig Tiller4ab82d22015-06-29 09:40:33 -0700485 gpr_mu_lock(&c->mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700486
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700487 if (c->disconnected) {
488 gpr_mu_unlock(&c->mu);
489 gpr_free(sw);
490 gpr_free(filters);
491 grpc_channel_stack_destroy(stk);
492 return;
493 }
494
Craig Tillerdf91ba52015-06-29 10:55:46 -0700495 /* publish */
496 if (c->active != NULL && c->active->refs == 0) {
497 destroy_connection = c->active;
498 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700499 c->active = con;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700500 c->active_version++;
501 sw->version = c->active_version;
Craig Tiller4ab82d22015-06-29 09:40:33 -0700502 c->connecting = 0;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700503
504 /* watch for changes; subchannel ref for connecting is donated
505 to the state watcher */
506 memset(&op, 0, sizeof(op));
507 op.connectivity_state = &sw->connectivity_state;
508 op.on_connectivity_state_change = &sw->closure;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700509 SUBCHANNEL_REF_LOCKED(c, "state_watcher");
510 GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700511 elem =
512 grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700513 elem->filter->start_transport_op(elem, &op);
514
515 /* signal completion */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700516 connectivity_state_changed_locked(c);
517 while ((w4c = c->waiting)) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700518 c->waiting = w4c->next;
519 grpc_iomgr_add_callback(&w4c->continuation);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700520 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700521
Craig Tiller4ab82d22015-06-29 09:40:33 -0700522 gpr_mu_unlock(&c->mu);
Craig Tiller5945ee12015-06-27 10:36:09 -0700523
Craig Tiller4ab82d22015-06-29 09:40:33 -0700524 gpr_free(filters);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700525
Craig Tillerdf91ba52015-06-29 10:55:46 -0700526 if (destroy_connection != NULL) {
527 connection_destroy(destroy_connection);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700528 }
529}
Craig Tillerff54c922015-06-26 16:57:20 -0700530
531static void subchannel_connected(void *arg, int iomgr_success) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700532 grpc_subchannel *c = arg;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700533 if (c->connecting_result.transport != NULL) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700534 publish_transport(c);
535 } else {
536 int destroy;
537 gpr_mu_lock(&c->mu);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700538 destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting");
Craig Tiller4ab82d22015-06-29 09:40:33 -0700539 gpr_mu_unlock(&c->mu);
540 if (destroy) subchannel_destroy(c);
541 /* TODO(ctiller): retry after sleeping */
542 abort();
543 }
Craig Tillerff54c922015-06-26 16:57:20 -0700544}
545
Craig Tiller5f84c842015-06-26 16:08:21 -0700546static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
547 return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
548}
549
550static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700551 if (c->disconnected) {
552 return GRPC_CHANNEL_FATAL_FAILURE;
553 }
Craig Tiller5f84c842015-06-26 16:08:21 -0700554 if (c->connecting) {
555 return GRPC_CHANNEL_CONNECTING;
556 }
557 if (c->active) {
558 return GRPC_CHANNEL_READY;
559 }
560 return GRPC_CHANNEL_IDLE;
561}
562
563static void connectivity_state_changed_locked(grpc_subchannel *c) {
564 grpc_connectivity_state current = compute_connectivity_locked(c);
Craig Tiller11bf14e2015-06-29 16:35:41 -0700565 gpr_log(GPR_DEBUG, "SUBCHANNEL constate=%d", current);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700566 grpc_connectivity_state_set(&c->state_tracker, current);
Craig Tiller5f84c842015-06-26 16:08:21 -0700567}
568
Craig Tiller2595ab72015-06-25 15:26:00 -0700569/*
570 * grpc_subchannel_call implementation
571 */
572
Craig Tillerc3967532015-06-29 14:59:38 -0700573void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
574 gpr_ref(&c->refs);
575}
Craig Tiller2595ab72015-06-25 15:26:00 -0700576
Craig Tillerc3967532015-06-29 14:59:38 -0700577void grpc_subchannel_call_unref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700578 if (gpr_unref(&c->refs)) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700579 gpr_mu *mu = &c->connection->subchannel->mu;
580 grpc_subchannel *destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700581 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700582 gpr_mu_lock(mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700583 destroy = CONNECTION_UNREF_LOCKED(c->connection, "call");
Craig Tillerd7b68e72015-06-28 11:41:09 -0700584 gpr_mu_unlock(mu);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700585 gpr_free(c);
Craig Tillerc3967532015-06-29 14:59:38 -0700586 if (destroy != NULL) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700587 subchannel_destroy(destroy);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700588 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700589 }
590}
591
592void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
593 grpc_transport_stream_op *op) {
594 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
595 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
596 top_elem->filter->start_transport_stream_op(top_elem, op);
597}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700598
Craig Tillerabf36382015-06-29 16:13:27 -0700599grpc_subchannel_call *create_call(connection *con) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700600 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
601 grpc_subchannel_call *call =
602 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700603 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
604 call->connection = con;
605 gpr_ref_init(&call->refs, 1);
Craig Tillerabf36382015-06-29 16:13:27 -0700606 grpc_call_stack_init(chanstk, NULL, NULL, callstk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700607 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700608}