blob: 2a12fbc86d1ac9b18564f87c27bd79549d395359 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tillerff54c922015-06-26 16:57:20 -070041#include "src/core/channel/connected_channel.h"
Craig Tillerff3ae682015-06-29 17:44:04 -070042#include "src/core/iomgr/alarm.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070043#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070044
45typedef struct {
Craig Tiller4ab82d22015-06-29 09:40:33 -070046 /* all fields protected by subchannel->mu */
47 /** refcount */
48 int refs;
49 /** parent subchannel */
Craig Tillereb3b12e2015-06-26 14:42:49 -070050 grpc_subchannel *subchannel;
51} connection;
52
Craig Tillerdf91ba52015-06-29 10:55:46 -070053typedef struct {
54 grpc_iomgr_closure closure;
55 size_t version;
56 grpc_subchannel *subchannel;
57 grpc_connectivity_state connectivity_state;
58} state_watcher;
59
Craig Tiller5f84c842015-06-26 16:08:21 -070060typedef struct waiting_for_connect {
61 struct waiting_for_connect *next;
62 grpc_iomgr_closure *notify;
Craig Tillerabf36382015-06-29 16:13:27 -070063 grpc_pollset *pollset;
Craig Tiller5f84c842015-06-26 16:08:21 -070064 grpc_subchannel_call **target;
Craig Tillerdf91ba52015-06-29 10:55:46 -070065 grpc_subchannel *subchannel;
66 grpc_iomgr_closure continuation;
Craig Tiller5f84c842015-06-26 16:08:21 -070067} waiting_for_connect;
68
Craig Tiller2595ab72015-06-25 15:26:00 -070069struct grpc_subchannel {
Craig Tiller91624662015-06-25 16:31:02 -070070 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070071
72 /** non-transport related channel filters */
73 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070074 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070075 /** channel arguments */
76 grpc_channel_args *args;
77 /** address to connect to */
78 struct sockaddr *addr;
79 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070080 /** metadata context */
81 grpc_mdctx *mdctx;
Craig Tillerf0370112015-07-01 14:26:11 -070082 /** master channel - the grpc_channel instance that ultimately owns
83 this channel_data via its channel stack.
84 We occasionally use this to bump the refcount on the master channel
85 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070086 grpc_channel *master;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -070087 /** have we seen a disconnection? */
88 int disconnected;
Craig Tillereb3b12e2015-06-26 14:42:49 -070089
90 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070091 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -070092
93 /** callback for connection finishing */
94 grpc_iomgr_closure connected;
95
Craig Tiller5f84c842015-06-26 16:08:21 -070096 /** pollset_set tracking who's interested in a connection
97 being setup */
98 grpc_pollset_set pollset_set;
99
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100 /** mutex protecting remaining elements */
101 gpr_mu mu;
102
103 /** active connection */
104 connection *active;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700105 /** version number for the active connection */
106 size_t active_version;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700107 /** refcount */
108 int refs;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700109 /** are we connecting */
110 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -0700111 /** things waiting for a connection */
112 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700113 /** connectivity state tracking */
114 grpc_connectivity_state_tracker state_tracker;
Craig Tillerff3ae682015-06-29 17:44:04 -0700115
116 /** next connect attempt time */
117 gpr_timespec next_attempt;
118 /** amount to backoff each failure */
119 gpr_timespec backoff_delta;
120 /** do we have an active alarm? */
121 int have_alarm;
122 /** our alarm */
123 grpc_alarm alarm;
Craig Tiller2595ab72015-06-25 15:26:00 -0700124};
125
126struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700127 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700128 gpr_refcount refs;
129};
130
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700131#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
132#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700133
Craig Tillerabf36382015-06-29 16:13:27 -0700134static grpc_subchannel_call *create_call(connection *con);
Craig Tiller5f84c842015-06-26 16:08:21 -0700135static void connectivity_state_changed_locked(grpc_subchannel *c);
136static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
137static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
Craig Tillerff54c922015-06-26 16:57:20 -0700138static void subchannel_connected(void *subchannel, int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700139
Craig Tiller079a11b2015-06-30 10:07:15 -0700140static void subchannel_ref_locked(
141 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
142static int subchannel_unref_locked(
143 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
Craig Tillerc3967532015-06-29 14:59:38 -0700144static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
Craig Tiller079a11b2015-06-30 10:07:15 -0700145static grpc_subchannel *connection_unref_locked(
146 connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700147static void subchannel_destroy(grpc_subchannel *c);
148
Craig Tillerc3967532015-06-29 14:59:38 -0700149#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
Craig Tiller079a11b2015-06-30 10:07:15 -0700150#define SUBCHANNEL_REF_LOCKED(p, r) \
151 subchannel_ref_locked((p), __FILE__, __LINE__, (r))
152#define SUBCHANNEL_UNREF_LOCKED(p, r) \
153 subchannel_unref_locked((p), __FILE__, __LINE__, (r))
154#define CONNECTION_REF_LOCKED(p, r) \
155 connection_ref_locked((p), __FILE__, __LINE__, (r))
156#define CONNECTION_UNREF_LOCKED(p, r) \
157 connection_unref_locked((p), __FILE__, __LINE__, (r))
Craig Tillerc3967532015-06-29 14:59:38 -0700158#define REF_PASS_ARGS , file, line, reason
Craig Tiller079a11b2015-06-30 10:07:15 -0700159#define REF_LOG(name, p) \
160 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
161 (name), (p), (p)->refs, (p)->refs + 1, reason)
162#define UNREF_LOG(name, p) \
163 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
164 (name), (p), (p)->refs, (p)->refs - 1, reason)
Craig Tillerc3967532015-06-29 14:59:38 -0700165#else
166#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
167#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
Craig Tiller11bf14e2015-06-29 16:35:41 -0700168#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
169#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p))
Craig Tillerc3967532015-06-29 14:59:38 -0700170#define REF_PASS_ARGS
Craig Tiller11bf14e2015-06-29 16:35:41 -0700171#define REF_LOG(name, p) \
172 do { \
173 } while (0)
174#define UNREF_LOG(name, p) \
175 do { \
176 } while (0)
Craig Tillerc3967532015-06-29 14:59:38 -0700177#endif
178
Craig Tiller2595ab72015-06-25 15:26:00 -0700179/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700180 * connection implementation
181 */
182
Craig Tillerd7b68e72015-06-28 11:41:09 -0700183static void connection_destroy(connection *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700184 GPR_ASSERT(c->refs == 0);
185 grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700186 gpr_free(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700187}
188
Craig Tiller079a11b2015-06-30 10:07:15 -0700189static void connection_ref_locked(
190 connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tillerc3967532015-06-29 14:59:38 -0700191 REF_LOG("CONNECTION", c);
192 subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700193 ++c->refs;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700194}
195
Craig Tiller079a11b2015-06-30 10:07:15 -0700196static grpc_subchannel *connection_unref_locked(
197 connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700198 grpc_subchannel *destroy = NULL;
Craig Tillerc3967532015-06-29 14:59:38 -0700199 UNREF_LOG("CONNECTION", c);
200 if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700201 destroy = c->subchannel;
202 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700203 if (--c->refs == 0 && c->subchannel->active != c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700204 connection_destroy(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700205 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700206 return destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700207}
208
209/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700210 * grpc_subchannel implementation
211 */
212
Craig Tiller079a11b2015-06-30 10:07:15 -0700213static void subchannel_ref_locked(
214 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tillerc3967532015-06-29 14:59:38 -0700215 REF_LOG("SUBCHANNEL", c);
Craig Tiller079a11b2015-06-30 10:07:15 -0700216 ++c->refs;
Craig Tillerc3967532015-06-29 14:59:38 -0700217}
Craig Tiller2595ab72015-06-25 15:26:00 -0700218
Craig Tiller079a11b2015-06-30 10:07:15 -0700219static int subchannel_unref_locked(
220 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tillerc3967532015-06-29 14:59:38 -0700221 UNREF_LOG("SUBCHANNEL", c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700222 return --c->refs == 0;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700223}
224
Craig Tillerc3967532015-06-29 14:59:38 -0700225void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700226 gpr_mu_lock(&c->mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700227 subchannel_ref_locked(c REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700228 gpr_mu_unlock(&c->mu);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700229}
230
Craig Tillerc3967532015-06-29 14:59:38 -0700231void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700232 int destroy;
233 gpr_mu_lock(&c->mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700234 destroy = subchannel_unref_locked(c REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700235 gpr_mu_unlock(&c->mu);
236 if (destroy) subchannel_destroy(c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700237}
238
239static void subchannel_destroy(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700240 if (c->active != NULL) {
241 connection_destroy(c->active);
242 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700243 gpr_free(c->filters);
244 grpc_channel_args_destroy(c->args);
245 gpr_free(c->addr);
246 grpc_mdctx_unref(c->mdctx);
247 grpc_pollset_set_destroy(&c->pollset_set);
248 grpc_connectivity_state_destroy(&c->state_tracker);
Craig Tillerc3967532015-06-29 14:59:38 -0700249 grpc_connector_unref(c->connector);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700250 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700251}
252
Craig Tiller5f84c842015-06-26 16:08:21 -0700253void grpc_subchannel_add_interested_party(grpc_subchannel *c,
254 grpc_pollset *pollset) {
255 grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
256}
257
258void grpc_subchannel_del_interested_party(grpc_subchannel *c,
259 grpc_pollset *pollset) {
260 grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
261}
262
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700263grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
264 grpc_subchannel_args *args) {
265 grpc_subchannel *c = gpr_malloc(sizeof(*c));
266 memset(c, 0, sizeof(*c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700267 c->refs = 1;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700268 c->connector = connector;
269 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700270 c->num_filters = args->filter_count;
271 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700272 memcpy(c->filters, args->filters,
Craig Tiller5945ee12015-06-27 10:36:09 -0700273 sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700274 c->addr = gpr_malloc(args->addr_len);
275 memcpy(c->addr, args->addr, args->addr_len);
276 c->addr_len = args->addr_len;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700277 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700278 c->mdctx = args->mdctx;
Craig Tiller98465032015-06-29 14:36:42 -0700279 c->master = args->master;
Craig Tiller5f84c842015-06-26 16:08:21 -0700280 grpc_mdctx_ref(c->mdctx);
Craig Tillerff54c922015-06-26 16:57:20 -0700281 grpc_pollset_set_init(&c->pollset_set);
282 grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700283 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700284 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700285 return c;
286}
287
Craig Tillerff3ae682015-06-29 17:44:04 -0700288static void continue_connect(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700289 grpc_connect_in_args args;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700290
Craig Tiller4ab82d22015-06-29 09:40:33 -0700291 args.interested_parties = &c->pollset_set;
292 args.addr = c->addr;
293 args.addr_len = c->addr_len;
294 args.deadline = compute_connect_deadline(c);
295 args.channel_args = c->args;
296 args.metadata_context = c->mdctx;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700297
Craig Tiller4ab82d22015-06-29 09:40:33 -0700298 grpc_connector_connect(c->connector, &args, &c->connecting_result,
299 &c->connected);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700300}
301
Craig Tillerff3ae682015-06-29 17:44:04 -0700302static void start_connect(grpc_subchannel *c) {
Craig Tillera25ca0b2015-07-07 13:54:12 -0700303 gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerff3ae682015-06-29 17:44:04 -0700304 c->next_attempt = now;
Craig Tiller58bbc862015-07-13 09:51:17 -0700305 c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN);
Craig Tillerff3ae682015-06-29 17:44:04 -0700306
307 continue_connect(c);
308}
309
Craig Tillerdf91ba52015-06-29 10:55:46 -0700310static void continue_creating_call(void *arg, int iomgr_success) {
311 waiting_for_connect *w4c = arg;
Craig Tillerabf36382015-06-29 16:13:27 -0700312 grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700313 w4c->notify);
Craig Tillerc3967532015-06-29 14:59:38 -0700314 GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700315 gpr_free(w4c);
316}
317
Craig Tillerabf36382015-06-29 16:13:27 -0700318void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
Craig Tillereb3b12e2015-06-26 14:42:49 -0700319 grpc_subchannel_call **target,
320 grpc_iomgr_closure *notify) {
321 connection *con;
322 gpr_mu_lock(&c->mu);
323 if (c->active != NULL) {
324 con = c->active;
Craig Tillerc3967532015-06-29 14:59:38 -0700325 CONNECTION_REF_LOCKED(con, "call");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700326 gpr_mu_unlock(&c->mu);
327
Craig Tillerabf36382015-06-29 16:13:27 -0700328 *target = create_call(con);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700329 notify->cb(notify->cb_arg, 1);
330 } else {
Craig Tiller5f84c842015-06-26 16:08:21 -0700331 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
332 w4c->next = c->waiting;
333 w4c->notify = notify;
Craig Tillerabf36382015-06-29 16:13:27 -0700334 w4c->pollset = pollset;
Craig Tiller5f84c842015-06-26 16:08:21 -0700335 w4c->target = target;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700336 w4c->subchannel = c;
Craig Tiller98465032015-06-29 14:36:42 -0700337 /* released when clearing w4c */
Craig Tillerc3967532015-06-29 14:59:38 -0700338 SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700339 grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700340 c->waiting = w4c;
Craig Tillerabf36382015-06-29 16:13:27 -0700341 grpc_subchannel_add_interested_party(c, pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700342 if (!c->connecting) {
343 c->connecting = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700344 connectivity_state_changed_locked(c);
Craig Tiller98465032015-06-29 14:36:42 -0700345 /* released by connection */
Craig Tillerc3967532015-06-29 14:59:38 -0700346 SUBCHANNEL_REF_LOCKED(c, "connecting");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700347 gpr_mu_unlock(&c->mu);
348
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700349 start_connect(c);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700350 } else {
351 gpr_mu_unlock(&c->mu);
352 }
353 }
354}
355
Craig Tiller5f84c842015-06-26 16:08:21 -0700356grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
357 grpc_connectivity_state state;
358 gpr_mu_lock(&c->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700359 state = grpc_connectivity_state_check(&c->state_tracker);
Craig Tiller5f84c842015-06-26 16:08:21 -0700360 gpr_mu_unlock(&c->mu);
361 return state;
362}
363
364void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
365 grpc_connectivity_state *state,
366 grpc_iomgr_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700367 int do_connect = 0;
Craig Tiller5f84c842015-06-26 16:08:21 -0700368 gpr_mu_lock(&c->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700369 if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
370 notify)) {
371 do_connect = 1;
Craig Tiller5f84c842015-06-26 16:08:21 -0700372 c->connecting = 1;
Craig Tiller98465032015-06-29 14:36:42 -0700373 /* released by connection */
Craig Tillerc3967532015-06-29 14:59:38 -0700374 SUBCHANNEL_REF_LOCKED(c, "connecting");
Craig Tiller11bf14e2015-06-29 16:35:41 -0700375 connectivity_state_changed_locked(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700376 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700377 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700378 if (do_connect) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700379 start_connect(c);
Craig Tiller5f84c842015-06-26 16:08:21 -0700380 }
381}
382
Craig Tiller4ab82d22015-06-29 09:40:33 -0700383void grpc_subchannel_process_transport_op(grpc_subchannel *c,
384 grpc_transport_op *op) {
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700385 connection *con = NULL;
386 grpc_subchannel *destroy;
Craig Tillerff3ae682015-06-29 17:44:04 -0700387 int cancel_alarm = 0;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700388 gpr_mu_lock(&c->mu);
389 if (op->disconnect) {
390 c->disconnected = 1;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700391 connectivity_state_changed_locked(c);
Craig Tillerff3ae682015-06-29 17:44:04 -0700392 if (c->have_alarm) {
393 cancel_alarm = 1;
394 }
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700395 }
396 if (c->active != NULL) {
397 con = c->active;
398 CONNECTION_REF_LOCKED(con, "transport-op");
399 }
400 gpr_mu_unlock(&c->mu);
401
402 if (con != NULL) {
403 grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
Craig Tiller079a11b2015-06-30 10:07:15 -0700404 grpc_channel_element *top_elem =
405 grpc_channel_stack_element(channel_stack, 0);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700406 top_elem->filter->start_transport_op(top_elem, op);
407
408 gpr_mu_lock(&c->mu);
409 destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
410 gpr_mu_unlock(&c->mu);
411 if (destroy) {
412 subchannel_destroy(destroy);
413 }
414 }
Craig Tillerff3ae682015-06-29 17:44:04 -0700415
416 if (cancel_alarm) {
417 grpc_alarm_cancel(&c->alarm);
418 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700419}
420
421static void on_state_changed(void *p, int iomgr_success) {
422 state_watcher *sw = p;
423 grpc_subchannel *c = sw->subchannel;
424 gpr_mu *mu = &c->mu;
425 int destroy;
426 grpc_transport_op op;
427 grpc_channel_element *elem;
428 connection *destroy_connection = NULL;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700429
430 gpr_mu_lock(mu);
431
432 /* if we failed or there is a version number mismatch, just leave
433 this closure */
434 if (!iomgr_success || sw->subchannel->active_version != sw->version) {
435 goto done;
436 }
437
438 switch (sw->connectivity_state) {
439 case GRPC_CHANNEL_CONNECTING:
440 case GRPC_CHANNEL_READY:
441 case GRPC_CHANNEL_IDLE:
442 /* all is still good: keep watching */
443 memset(&op, 0, sizeof(op));
444 op.connectivity_state = &sw->connectivity_state;
445 op.on_connectivity_state_change = &sw->closure;
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700446 elem = grpc_channel_stack_element(
447 CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700448 elem->filter->start_transport_op(elem, &op);
449 /* early out */
450 gpr_mu_unlock(mu);
451 return;
452 case GRPC_CHANNEL_FATAL_FAILURE:
Craig Tiller49924e02015-06-29 22:42:33 -0700453 case GRPC_CHANNEL_TRANSIENT_FAILURE:
Craig Tillerdf91ba52015-06-29 10:55:46 -0700454 /* things have gone wrong, deactivate and enter idle */
455 if (sw->subchannel->active->refs == 0) {
456 destroy_connection = sw->subchannel->active;
457 }
458 sw->subchannel->active = NULL;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700459 grpc_connectivity_state_set(&c->state_tracker,
460 GRPC_CHANNEL_TRANSIENT_FAILURE);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700461 break;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700462 }
463
464done:
Craig Tiller11bf14e2015-06-29 16:35:41 -0700465 connectivity_state_changed_locked(c);
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700466 destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700467 gpr_free(sw);
468 gpr_mu_unlock(mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700469 if (destroy) {
470 subchannel_destroy(c);
471 }
472 if (destroy_connection != NULL) {
473 connection_destroy(destroy_connection);
474 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700475}
476
Craig Tillerff54c922015-06-26 16:57:20 -0700477static void publish_transport(grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700478 size_t channel_stack_size;
479 connection *con;
480 grpc_channel_stack *stk;
481 size_t num_filters;
482 const grpc_channel_filter **filters;
483 waiting_for_connect *w4c;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700484 grpc_transport_op op;
485 state_watcher *sw;
486 connection *destroy_connection = NULL;
487 grpc_channel_element *elem;
Craig Tiller5945ee12015-06-27 10:36:09 -0700488
Craig Tillerdf91ba52015-06-29 10:55:46 -0700489 /* build final filter list */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700490 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
491 filters = gpr_malloc(sizeof(*filters) * num_filters);
492 memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
493 memcpy(filters + c->num_filters, c->connecting_result.filters,
494 sizeof(*filters) * c->connecting_result.num_filters);
495 filters[num_filters - 1] = &grpc_connected_channel_filter;
Craig Tiller5945ee12015-06-27 10:36:09 -0700496
Craig Tillerdf91ba52015-06-29 10:55:46 -0700497 /* construct channel stack */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700498 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
499 con = gpr_malloc(sizeof(connection) + channel_stack_size);
500 stk = (grpc_channel_stack *)(con + 1);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700501 con->refs = 0;
502 con->subchannel = c;
Craig Tiller079a11b2015-06-30 10:07:15 -0700503 grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx,
504 stk);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700505 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
Craig Tiller98465032015-06-29 14:36:42 -0700506 gpr_free(c->connecting_result.filters);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700507 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700508
Craig Tillerdf91ba52015-06-29 10:55:46 -0700509 /* initialize state watcher */
510 sw = gpr_malloc(sizeof(*sw));
511 grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
512 sw->subchannel = c;
513 sw->connectivity_state = GRPC_CHANNEL_READY;
514
Craig Tiller4ab82d22015-06-29 09:40:33 -0700515 gpr_mu_lock(&c->mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700516
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700517 if (c->disconnected) {
518 gpr_mu_unlock(&c->mu);
519 gpr_free(sw);
520 gpr_free(filters);
521 grpc_channel_stack_destroy(stk);
522 return;
523 }
524
Craig Tillerdf91ba52015-06-29 10:55:46 -0700525 /* publish */
526 if (c->active != NULL && c->active->refs == 0) {
527 destroy_connection = c->active;
528 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700529 c->active = con;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700530 c->active_version++;
531 sw->version = c->active_version;
Craig Tiller4ab82d22015-06-29 09:40:33 -0700532 c->connecting = 0;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700533
534 /* watch for changes; subchannel ref for connecting is donated
535 to the state watcher */
536 memset(&op, 0, sizeof(op));
537 op.connectivity_state = &sw->connectivity_state;
538 op.on_connectivity_state_change = &sw->closure;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700539 SUBCHANNEL_REF_LOCKED(c, "state_watcher");
540 GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
Craig Tillerf62d6fc2015-06-29 10:55:59 -0700541 elem =
542 grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700543 elem->filter->start_transport_op(elem, &op);
544
545 /* signal completion */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700546 connectivity_state_changed_locked(c);
547 while ((w4c = c->waiting)) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700548 c->waiting = w4c->next;
549 grpc_iomgr_add_callback(&w4c->continuation);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700550 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700551
Craig Tiller4ab82d22015-06-29 09:40:33 -0700552 gpr_mu_unlock(&c->mu);
Craig Tiller5945ee12015-06-27 10:36:09 -0700553
Craig Tiller4ab82d22015-06-29 09:40:33 -0700554 gpr_free(filters);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700555
Craig Tillerdf91ba52015-06-29 10:55:46 -0700556 if (destroy_connection != NULL) {
557 connection_destroy(destroy_connection);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700558 }
559}
Craig Tillerff54c922015-06-26 16:57:20 -0700560
Craig Tillerff3ae682015-06-29 17:44:04 -0700561static void on_alarm(void *arg, int iomgr_success) {
562 grpc_subchannel *c = arg;
563 gpr_mu_lock(&c->mu);
564 c->have_alarm = 0;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700565 if (c->disconnected) {
566 iomgr_success = 0;
567 }
Craig Tiller87cc0842015-06-30 08:15:55 -0700568 connectivity_state_changed_locked(c);
Craig Tillerff3ae682015-06-29 17:44:04 -0700569 gpr_mu_unlock(&c->mu);
570 if (iomgr_success) {
571 continue_connect(c);
572 } else {
573 GRPC_SUBCHANNEL_UNREF(c, "connecting");
574 }
575}
576
Craig Tillerff54c922015-06-26 16:57:20 -0700577static void subchannel_connected(void *arg, int iomgr_success) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700578 grpc_subchannel *c = arg;
Craig Tiller11bf14e2015-06-29 16:35:41 -0700579 if (c->connecting_result.transport != NULL) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700580 publish_transport(c);
581 } else {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700582 gpr_mu_lock(&c->mu);
Craig Tiller87cc0842015-06-30 08:15:55 -0700583 connectivity_state_changed_locked(c);
Craig Tillerff3ae682015-06-29 17:44:04 -0700584 GPR_ASSERT(!c->have_alarm);
585 c->have_alarm = 1;
586 c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta);
587 c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
Craig Tillera25ca0b2015-07-07 13:54:12 -0700588 grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_REALTIME));
Craig Tiller4ab82d22015-06-29 09:40:33 -0700589 gpr_mu_unlock(&c->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700590 }
Craig Tillerff54c922015-06-26 16:57:20 -0700591}
592
Craig Tiller5f84c842015-06-26 16:08:21 -0700593static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
Craig Tiller87cc0842015-06-30 08:15:55 -0700594 return gpr_time_add(c->next_attempt, c->backoff_delta);
Craig Tiller5f84c842015-06-26 16:08:21 -0700595}
596
597static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700598 if (c->disconnected) {
599 return GRPC_CHANNEL_FATAL_FAILURE;
600 }
Craig Tiller5f84c842015-06-26 16:08:21 -0700601 if (c->connecting) {
Craig Tiller87cc0842015-06-30 08:15:55 -0700602 if (c->have_alarm) {
603 return GRPC_CHANNEL_TRANSIENT_FAILURE;
604 }
Craig Tiller5f84c842015-06-26 16:08:21 -0700605 return GRPC_CHANNEL_CONNECTING;
606 }
607 if (c->active) {
608 return GRPC_CHANNEL_READY;
609 }
610 return GRPC_CHANNEL_IDLE;
611}
612
613static void connectivity_state_changed_locked(grpc_subchannel *c) {
614 grpc_connectivity_state current = compute_connectivity_locked(c);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700615 grpc_connectivity_state_set(&c->state_tracker, current);
Craig Tiller5f84c842015-06-26 16:08:21 -0700616}
617
Craig Tiller2595ab72015-06-25 15:26:00 -0700618/*
619 * grpc_subchannel_call implementation
620 */
621
Craig Tiller079a11b2015-06-30 10:07:15 -0700622void grpc_subchannel_call_ref(
623 grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
624 gpr_ref(&c->refs);
Craig Tillerc3967532015-06-29 14:59:38 -0700625}
Craig Tiller2595ab72015-06-25 15:26:00 -0700626
Craig Tiller079a11b2015-06-30 10:07:15 -0700627void grpc_subchannel_call_unref(
628 grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tillerca3e9d32015-06-27 18:37:27 -0700629 if (gpr_unref(&c->refs)) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700630 gpr_mu *mu = &c->connection->subchannel->mu;
631 grpc_subchannel *destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700632 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700633 gpr_mu_lock(mu);
Craig Tillerc3967532015-06-29 14:59:38 -0700634 destroy = CONNECTION_UNREF_LOCKED(c->connection, "call");
Craig Tillerd7b68e72015-06-28 11:41:09 -0700635 gpr_mu_unlock(mu);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700636 gpr_free(c);
Craig Tillerc3967532015-06-29 14:59:38 -0700637 if (destroy != NULL) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700638 subchannel_destroy(destroy);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700639 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700640 }
641}
642
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700643char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) {
644 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
645 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
646 return top_elem->filter->get_peer(top_elem);
647}
648
Craig Tiller2595ab72015-06-25 15:26:00 -0700649void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
650 grpc_transport_stream_op *op) {
651 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
652 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
653 top_elem->filter->start_transport_stream_op(top_elem, op);
654}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700655
Craig Tillerabf36382015-06-29 16:13:27 -0700656grpc_subchannel_call *create_call(connection *con) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700657 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
658 grpc_subchannel_call *call =
659 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700660 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
661 call->connection = con;
662 gpr_ref_init(&call->refs, 1);
Craig Tillerabf36382015-06-29 16:13:27 -0700663 grpc_call_stack_init(chanstk, NULL, NULL, callstk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700664 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700665}