blob: 740389003a526a6b53694823420fad58221525d6 [file] [log] [blame]
Craig Tilleraf691802015-06-23 14:57:07 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/subchannel.h"
Craig Tiller2595ab72015-06-25 15:26:00 -070035
Craig Tillerf7afa1f2015-06-26 09:02:20 -070036#include <string.h>
37
Craig Tiller2595ab72015-06-25 15:26:00 -070038#include <grpc/support/alloc.h>
39
Craig Tillereb3b12e2015-06-26 14:42:49 -070040#include "src/core/channel/channel_args.h"
Craig Tiller1ada6ad2015-07-16 16:19:14 -070041#include "src/core/channel/client_channel.h"
Craig Tillerff54c922015-06-26 16:57:20 -070042#include "src/core/channel/connected_channel.h"
Craig Tillerff3ae682015-06-29 17:44:04 -070043#include "src/core/iomgr/alarm.h"
Craig Tiller08a1cf82015-06-29 09:37:52 -070044#include "src/core/transport/connectivity_state.h"
Craig Tiller1ada6ad2015-07-16 16:19:14 -070045#include "src/core/surface/channel.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070046
yang-gb4e262c2015-07-21 16:11:55 -070047#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
48#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
49#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
50#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
51#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
52
Craig Tillera82950e2015-09-22 12:33:20 -070053typedef struct {
Craig Tiller4ab82d22015-06-29 09:40:33 -070054 /* all fields protected by subchannel->mu */
55 /** refcount */
56 int refs;
57 /** parent subchannel */
Craig Tillereb3b12e2015-06-26 14:42:49 -070058 grpc_subchannel *subchannel;
59} connection;
60
Craig Tillera82950e2015-09-22 12:33:20 -070061typedef struct {
Craig Tiller33825112015-09-18 07:44:19 -070062 grpc_closure closure;
Craig Tillerdf91ba52015-06-29 10:55:46 -070063 size_t version;
64 grpc_subchannel *subchannel;
65 grpc_connectivity_state connectivity_state;
66} state_watcher;
67
Craig Tillera82950e2015-09-22 12:33:20 -070068typedef struct waiting_for_connect {
Craig Tiller5f84c842015-06-26 16:08:21 -070069 struct waiting_for_connect *next;
Craig Tiller33825112015-09-18 07:44:19 -070070 grpc_closure *notify;
Craig Tillerabf36382015-06-29 16:13:27 -070071 grpc_pollset *pollset;
Craig Tiller5f84c842015-06-26 16:08:21 -070072 grpc_subchannel_call **target;
Craig Tillerdf91ba52015-06-29 10:55:46 -070073 grpc_subchannel *subchannel;
Craig Tiller33825112015-09-18 07:44:19 -070074 grpc_closure continuation;
Craig Tiller5f84c842015-06-26 16:08:21 -070075} waiting_for_connect;
76
Craig Tillera82950e2015-09-22 12:33:20 -070077struct grpc_subchannel {
Craig Tiller91624662015-06-25 16:31:02 -070078 grpc_connector *connector;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070079
80 /** non-transport related channel filters */
81 const grpc_channel_filter **filters;
Craig Tiller5945ee12015-06-27 10:36:09 -070082 size_t num_filters;
Craig Tillerf7afa1f2015-06-26 09:02:20 -070083 /** channel arguments */
84 grpc_channel_args *args;
85 /** address to connect to */
86 struct sockaddr *addr;
87 size_t addr_len;
Craig Tiller5f84c842015-06-26 16:08:21 -070088 /** metadata context */
89 grpc_mdctx *mdctx;
Craig Tillerf0370112015-07-01 14:26:11 -070090 /** master channel - the grpc_channel instance that ultimately owns
91 this channel_data via its channel stack.
92 We occasionally use this to bump the refcount on the master channel
93 to keep ourselves alive through an asynchronous operation. */
Craig Tiller98465032015-06-29 14:36:42 -070094 grpc_channel *master;
Craig Tillerb6fbf1d2015-06-29 15:25:49 -070095 /** have we seen a disconnection? */
96 int disconnected;
Craig Tillereb3b12e2015-06-26 14:42:49 -070097
98 /** set during connection */
Craig Tiller04c5d4b2015-06-26 17:21:41 -070099 grpc_connect_out_args connecting_result;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100
101 /** callback for connection finishing */
Craig Tiller33825112015-09-18 07:44:19 -0700102 grpc_closure connected;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700103
Craig Tiller5f84c842015-06-26 16:08:21 -0700104 /** pollset_set tracking who's interested in a connection
Craig Tiller03dc6552015-07-17 23:12:34 -0700105 being setup - owned by the master channel (in particular the
106 client_channel
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700107 filter there-in) */
108 grpc_pollset_set *pollset_set;
Craig Tiller5f84c842015-06-26 16:08:21 -0700109
Craig Tillereb3b12e2015-06-26 14:42:49 -0700110 /** mutex protecting remaining elements */
111 gpr_mu mu;
112
113 /** active connection */
114 connection *active;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700115 /** version number for the active connection */
116 size_t active_version;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700117 /** refcount */
118 int refs;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119 /** are we connecting */
120 int connecting;
Craig Tiller5f84c842015-06-26 16:08:21 -0700121 /** things waiting for a connection */
122 waiting_for_connect *waiting;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700123 /** connectivity state tracking */
124 grpc_connectivity_state_tracker state_tracker;
Craig Tillerff3ae682015-06-29 17:44:04 -0700125
126 /** next connect attempt time */
127 gpr_timespec next_attempt;
128 /** amount to backoff each failure */
129 gpr_timespec backoff_delta;
130 /** do we have an active alarm? */
131 int have_alarm;
132 /** our alarm */
133 grpc_alarm alarm;
yang-gb4e262c2015-07-21 16:11:55 -0700134 /** current random value */
yang-gc5d3f432015-07-30 12:46:00 -0700135 gpr_uint32 random;
Craig Tiller2595ab72015-06-25 15:26:00 -0700136};
137
Craig Tillera82950e2015-09-22 12:33:20 -0700138struct grpc_subchannel_call {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700139 connection *connection;
Craig Tiller2595ab72015-06-25 15:26:00 -0700140 gpr_refcount refs;
141};
142
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700143#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
144#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
Craig Tiller2595ab72015-06-25 15:26:00 -0700145
Craig Tillera82950e2015-09-22 12:33:20 -0700146static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
147 connection *con);
148static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
149 grpc_subchannel *c,
150 const char *reason);
151static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
152static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
153static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
154 int iomgr_success);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700155
Craig Tillera82950e2015-09-22 12:33:20 -0700156static void subchannel_ref_locked(
157 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
158static int subchannel_unref_locked(
159 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
160static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
161static grpc_subchannel *connection_unref_locked(
162 grpc_exec_ctx *exec_ctx,
163 connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
164static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700165
Craig Tillerc3967532015-06-29 14:59:38 -0700166#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
Craig Tiller079a11b2015-06-30 10:07:15 -0700167#define SUBCHANNEL_REF_LOCKED(p, r) \
168 subchannel_ref_locked((p), __FILE__, __LINE__, (r))
169#define SUBCHANNEL_UNREF_LOCKED(p, r) \
170 subchannel_unref_locked((p), __FILE__, __LINE__, (r))
171#define CONNECTION_REF_LOCKED(p, r) \
172 connection_ref_locked((p), __FILE__, __LINE__, (r))
Craig Tiller8af4c332015-09-22 12:32:31 -0700173#define CONNECTION_UNREF_LOCKED(cl, p, r) \
174 connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
Craig Tillerc3967532015-06-29 14:59:38 -0700175#define REF_PASS_ARGS , file, line, reason
Craig Tiller079a11b2015-06-30 10:07:15 -0700176#define REF_LOG(name, p) \
177 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
178 (name), (p), (p)->refs, (p)->refs + 1, reason)
179#define UNREF_LOG(name, p) \
180 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
181 (name), (p), (p)->refs, (p)->refs - 1, reason)
Craig Tillerc3967532015-06-29 14:59:38 -0700182#else
183#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
184#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
Craig Tiller11bf14e2015-06-29 16:35:41 -0700185#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
Craig Tiller8af4c332015-09-22 12:32:31 -0700186#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
Craig Tillerc3967532015-06-29 14:59:38 -0700187#define REF_PASS_ARGS
Craig Tiller11bf14e2015-06-29 16:35:41 -0700188#define REF_LOG(name, p) \
189 do { \
190 } while (0)
191#define UNREF_LOG(name, p) \
192 do { \
193 } while (0)
Craig Tillerc3967532015-06-29 14:59:38 -0700194#endif
195
Craig Tiller2595ab72015-06-25 15:26:00 -0700196/*
Craig Tillerca3e9d32015-06-27 18:37:27 -0700197 * connection implementation
198 */
199
Craig Tillera82950e2015-09-22 12:33:20 -0700200static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
201 GPR_ASSERT(c->refs == 0);
202 grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
203 gpr_free(c);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700204}
205
Craig Tillera82950e2015-09-22 12:33:20 -0700206static void connection_ref_locked(
207 connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
208 REF_LOG("CONNECTION", c);
209 subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700210 ++c->refs;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700211}
212
Craig Tillera82950e2015-09-22 12:33:20 -0700213static grpc_subchannel *connection_unref_locked(
214 grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700215 grpc_subchannel *destroy = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700216 UNREF_LOG("CONNECTION", c);
217 if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
218 destroy = c->subchannel;
219 }
220 if (--c->refs == 0 && c->subchannel->active != c) {
221 connection_destroy(exec_ctx, c);
222 }
Craig Tillerd7b68e72015-06-28 11:41:09 -0700223 return destroy;
Craig Tillerca3e9d32015-06-27 18:37:27 -0700224}
225
226/*
Craig Tiller2595ab72015-06-25 15:26:00 -0700227 * grpc_subchannel implementation
228 */
229
Craig Tillera82950e2015-09-22 12:33:20 -0700230static void subchannel_ref_locked(
231 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
232 REF_LOG("SUBCHANNEL", c);
Craig Tiller079a11b2015-06-30 10:07:15 -0700233 ++c->refs;
Craig Tillerc3967532015-06-29 14:59:38 -0700234}
Craig Tiller2595ab72015-06-25 15:26:00 -0700235
Craig Tillera82950e2015-09-22 12:33:20 -0700236static int subchannel_unref_locked(
237 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
238 UNREF_LOG("SUBCHANNEL", c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700239 return --c->refs == 0;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700240}
241
Craig Tillera82950e2015-09-22 12:33:20 -0700242void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
243 gpr_mu_lock(&c->mu);
244 subchannel_ref_locked(c REF_PASS_ARGS);
245 gpr_mu_unlock(&c->mu);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700246}
247
Craig Tillera82950e2015-09-22 12:33:20 -0700248void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
249 grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700250 int destroy;
Craig Tillera82950e2015-09-22 12:33:20 -0700251 gpr_mu_lock(&c->mu);
252 destroy = subchannel_unref_locked(c REF_PASS_ARGS);
253 gpr_mu_unlock(&c->mu);
254 if (destroy) subchannel_destroy(exec_ctx, c);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700255}
256
Craig Tillera82950e2015-09-22 12:33:20 -0700257static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
258 if (c->active != NULL) {
259 connection_destroy(exec_ctx, c->active);
260 }
Craig Tiller565b18b2015-09-23 10:09:42 -0700261 gpr_free((void *)c->filters);
Craig Tillera82950e2015-09-22 12:33:20 -0700262 grpc_channel_args_destroy(c->args);
263 gpr_free(c->addr);
264 grpc_mdctx_unref(c->mdctx);
265 grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
266 grpc_connector_unref(exec_ctx, c->connector);
267 gpr_free(c);
Craig Tiller2595ab72015-06-25 15:26:00 -0700268}
269
Craig Tillera82950e2015-09-22 12:33:20 -0700270void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
271 grpc_subchannel *c,
272 grpc_pollset *pollset) {
273 grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
Craig Tiller5f84c842015-06-26 16:08:21 -0700274}
275
Craig Tillera82950e2015-09-22 12:33:20 -0700276void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
277 grpc_subchannel *c,
278 grpc_pollset *pollset) {
279 grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
Craig Tiller5f84c842015-06-26 16:08:21 -0700280}
281
Craig Tillera82950e2015-09-22 12:33:20 -0700282static gpr_uint32 random_seed() {
283 return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
yang-gb4e262c2015-07-21 16:11:55 -0700284}
285
Craig Tillera82950e2015-09-22 12:33:20 -0700286grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
287 grpc_subchannel_args *args) {
288 grpc_subchannel *c = gpr_malloc(sizeof(*c));
289 grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
290 grpc_channel_get_channel_stack(args->master));
291 memset(c, 0, sizeof(*c));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700292 c->refs = 1;
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700293 c->connector = connector;
Craig Tillera82950e2015-09-22 12:33:20 -0700294 grpc_connector_ref(c->connector);
Craig Tiller5945ee12015-06-27 10:36:09 -0700295 c->num_filters = args->filter_count;
Craig Tillera82950e2015-09-22 12:33:20 -0700296 c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
Craig Tiller565b18b2015-09-23 10:09:42 -0700297 memcpy((void *)c->filters, args->filters,
Craig Tillera82950e2015-09-22 12:33:20 -0700298 sizeof(grpc_channel_filter *) * c->num_filters);
299 c->addr = gpr_malloc(args->addr_len);
300 memcpy(c->addr, args->addr, args->addr_len);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700301 c->addr_len = args->addr_len;
Craig Tillera82950e2015-09-22 12:33:20 -0700302 c->args = grpc_channel_args_copy(args->args);
Craig Tiller5f84c842015-06-26 16:08:21 -0700303 c->mdctx = args->mdctx;
Craig Tiller98465032015-06-29 14:36:42 -0700304 c->master = args->master;
Craig Tillera82950e2015-09-22 12:33:20 -0700305 c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
306 c->random = random_seed();
307 grpc_mdctx_ref(c->mdctx);
308 grpc_closure_init(&c->connected, subchannel_connected, c);
309 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
310 "subchannel");
311 gpr_mu_init(&c->mu);
Craig Tillerf7afa1f2015-06-26 09:02:20 -0700312 return c;
313}
314
Craig Tillera82950e2015-09-22 12:33:20 -0700315static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700316 grpc_connect_in_args args;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700317
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700318 args.interested_parties = c->pollset_set;
Craig Tiller4ab82d22015-06-29 09:40:33 -0700319 args.addr = c->addr;
320 args.addr_len = c->addr_len;
Craig Tillera82950e2015-09-22 12:33:20 -0700321 args.deadline = compute_connect_deadline(c);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700322 args.channel_args = c->args;
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700323
Craig Tillera82950e2015-09-22 12:33:20 -0700324 grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
325 &c->connected);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700326}
327
Craig Tillera82950e2015-09-22 12:33:20 -0700328static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
329 c->backoff_delta = gpr_time_from_seconds(
330 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
331 c->next_attempt =
332 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
333 continue_connect(exec_ctx, c);
Craig Tillerff3ae682015-06-29 17:44:04 -0700334}
335
Craig Tillera82950e2015-09-22 12:33:20 -0700336static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
337 int iomgr_success) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700338 waiting_for_connect *w4c = arg;
Craig Tillera82950e2015-09-22 12:33:20 -0700339 grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
340 grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
341 w4c->target, w4c->notify);
342 GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
343 gpr_free(w4c);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700344}
345
Craig Tillera82950e2015-09-22 12:33:20 -0700346void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
347 grpc_pollset *pollset,
348 grpc_subchannel_call **target,
349 grpc_closure *notify) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700350 connection *con;
Craig Tillera82950e2015-09-22 12:33:20 -0700351 gpr_mu_lock(&c->mu);
352 if (c->active != NULL) {
353 con = c->active;
354 CONNECTION_REF_LOCKED(con, "call");
355 gpr_mu_unlock(&c->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700356
Craig Tillera82950e2015-09-22 12:33:20 -0700357 *target = create_call(exec_ctx, con);
358 notify->cb(exec_ctx, notify->cb_arg, 1);
359 } else {
360 waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
361 w4c->next = c->waiting;
362 w4c->notify = notify;
363 w4c->pollset = pollset;
364 w4c->target = target;
365 w4c->subchannel = c;
366 /* released when clearing w4c */
367 SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
368 grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
369 c->waiting = w4c;
370 grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
371 if (!c->connecting) {
372 c->connecting = 1;
373 connectivity_state_changed_locked(exec_ctx, c, "create_call");
374 /* released by connection */
375 SUBCHANNEL_REF_LOCKED(c, "connecting");
376 GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
377 gpr_mu_unlock(&c->mu);
Craig Tiller45724b32015-09-22 10:42:19 -0700378
Craig Tillera82950e2015-09-22 12:33:20 -0700379 start_connect(exec_ctx, c);
380 } else {
381 gpr_mu_unlock(&c->mu);
Craig Tiller45724b32015-09-22 10:42:19 -0700382 }
Craig Tillera82950e2015-09-22 12:33:20 -0700383 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700384}
385
Craig Tillera82950e2015-09-22 12:33:20 -0700386grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700387 grpc_connectivity_state state;
Craig Tillera82950e2015-09-22 12:33:20 -0700388 gpr_mu_lock(&c->mu);
389 state = grpc_connectivity_state_check(&c->state_tracker);
390 gpr_mu_unlock(&c->mu);
Craig Tiller5f84c842015-06-26 16:08:21 -0700391 return state;
392}
393
Craig Tillera82950e2015-09-22 12:33:20 -0700394void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
395 grpc_subchannel *c,
396 grpc_connectivity_state *state,
397 grpc_closure *notify) {
Craig Tiller5f84c842015-06-26 16:08:21 -0700398 int do_connect = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700399 gpr_mu_lock(&c->mu);
400 if (grpc_connectivity_state_notify_on_state_change(
401 exec_ctx, &c->state_tracker, state, notify)) {
402 do_connect = 1;
403 c->connecting = 1;
404 /* released by connection */
405 SUBCHANNEL_REF_LOCKED(c, "connecting");
406 GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
407 connectivity_state_changed_locked(exec_ctx, c, "state_change");
408 }
409 gpr_mu_unlock(&c->mu);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700410
Craig Tillera82950e2015-09-22 12:33:20 -0700411 if (do_connect) {
412 start_connect(exec_ctx, c);
413 }
Craig Tiller5f84c842015-06-26 16:08:21 -0700414}
415
Craig Tillera82950e2015-09-22 12:33:20 -0700416void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
417 grpc_subchannel *c,
418 grpc_transport_op *op) {
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700419 connection *con = NULL;
420 grpc_subchannel *destroy;
Craig Tillerff3ae682015-06-29 17:44:04 -0700421 int cancel_alarm = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700422 gpr_mu_lock(&c->mu);
423 if (c->active != NULL) {
424 con = c->active;
425 CONNECTION_REF_LOCKED(con, "transport-op");
426 }
427 if (op->disconnect) {
428 c->disconnected = 1;
429 connectivity_state_changed_locked(exec_ctx, c, "disconnect");
430 if (c->have_alarm) {
431 cancel_alarm = 1;
Craig Tillerff3ae682015-06-29 17:44:04 -0700432 }
Craig Tillera82950e2015-09-22 12:33:20 -0700433 }
434 gpr_mu_unlock(&c->mu);
Craig Tillerff3ae682015-06-29 17:44:04 -0700435
Craig Tillera82950e2015-09-22 12:33:20 -0700436 if (con != NULL) {
437 grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
438 grpc_channel_element *top_elem =
439 grpc_channel_stack_element(channel_stack, 0);
440 top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
Craig Tiller131f6ed2015-09-15 08:20:20 -0700441
Craig Tillera82950e2015-09-22 12:33:20 -0700442 gpr_mu_lock(&c->mu);
443 destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
444 gpr_mu_unlock(&c->mu);
445 if (destroy) {
446 subchannel_destroy(exec_ctx, destroy);
Craig Tiller45724b32015-09-22 10:42:19 -0700447 }
Craig Tillera82950e2015-09-22 12:33:20 -0700448 }
Craig Tiller45724b32015-09-22 10:42:19 -0700449
Craig Tillera82950e2015-09-22 12:33:20 -0700450 if (cancel_alarm) {
451 grpc_alarm_cancel(exec_ctx, &c->alarm);
452 }
Craig Tiller45724b32015-09-22 10:42:19 -0700453
Craig Tillera82950e2015-09-22 12:33:20 -0700454 if (op->disconnect) {
455 grpc_connector_shutdown(exec_ctx, c->connector);
456 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700457}
458
Craig Tillera82950e2015-09-22 12:33:20 -0700459static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
460 int iomgr_success) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700461 state_watcher *sw = p;
462 grpc_subchannel *c = sw->subchannel;
463 gpr_mu *mu = &c->mu;
464 int destroy;
465 grpc_transport_op op;
466 grpc_channel_element *elem;
467 connection *destroy_connection = NULL;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700468
Craig Tillera82950e2015-09-22 12:33:20 -0700469 gpr_mu_lock(mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700470
471 /* if we failed or there is a version number mismatch, just leave
472 this closure */
Craig Tillera82950e2015-09-22 12:33:20 -0700473 if (!iomgr_success || sw->subchannel->active_version != sw->version) {
474 goto done;
475 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700476
Craig Tillera82950e2015-09-22 12:33:20 -0700477 switch (sw->connectivity_state) {
Craig Tillerdf91ba52015-06-29 10:55:46 -0700478 case GRPC_CHANNEL_CONNECTING:
479 case GRPC_CHANNEL_READY:
480 case GRPC_CHANNEL_IDLE:
481 /* all is still good: keep watching */
Craig Tillera82950e2015-09-22 12:33:20 -0700482 memset(&op, 0, sizeof(op));
Craig Tillerdf91ba52015-06-29 10:55:46 -0700483 op.connectivity_state = &sw->connectivity_state;
484 op.on_connectivity_state_change = &sw->closure;
Craig Tillera82950e2015-09-22 12:33:20 -0700485 elem = grpc_channel_stack_element(
486 CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
487 elem->filter->start_transport_op(exec_ctx, elem, &op);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700488 /* early out */
Craig Tillera82950e2015-09-22 12:33:20 -0700489 gpr_mu_unlock(mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700490 return;
491 case GRPC_CHANNEL_FATAL_FAILURE:
Craig Tiller49924e02015-06-29 22:42:33 -0700492 case GRPC_CHANNEL_TRANSIENT_FAILURE:
Craig Tillerdf91ba52015-06-29 10:55:46 -0700493 /* things have gone wrong, deactivate and enter idle */
Craig Tillera82950e2015-09-22 12:33:20 -0700494 if (sw->subchannel->active->refs == 0) {
495 destroy_connection = sw->subchannel->active;
496 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700497 sw->subchannel->active = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700498 grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
499 c->disconnected
500 ? GRPC_CHANNEL_FATAL_FAILURE
501 : GRPC_CHANNEL_TRANSIENT_FAILURE,
502 "connection_failed");
Craig Tillerdf91ba52015-06-29 10:55:46 -0700503 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700504 }
Craig Tillerdf91ba52015-06-29 10:55:46 -0700505
506done:
Craig Tillera82950e2015-09-22 12:33:20 -0700507 connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
508 destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
509 gpr_free(sw);
510 gpr_mu_unlock(mu);
511 if (destroy) {
512 subchannel_destroy(exec_ctx, c);
513 }
514 if (destroy_connection != NULL) {
515 connection_destroy(exec_ctx, destroy_connection);
516 }
Craig Tillerc7b5f762015-06-27 11:48:42 -0700517}
518
Craig Tillera82950e2015-09-22 12:33:20 -0700519static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700520 size_t channel_stack_size;
521 connection *con;
522 grpc_channel_stack *stk;
523 size_t num_filters;
524 const grpc_channel_filter **filters;
525 waiting_for_connect *w4c;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700526 grpc_transport_op op;
527 state_watcher *sw;
528 connection *destroy_connection = NULL;
529 grpc_channel_element *elem;
Craig Tiller5945ee12015-06-27 10:36:09 -0700530
Craig Tillerdf91ba52015-06-29 10:55:46 -0700531 /* build final filter list */
Craig Tiller4ab82d22015-06-29 09:40:33 -0700532 num_filters = c->num_filters + c->connecting_result.num_filters + 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700533 filters = gpr_malloc(sizeof(*filters) * num_filters);
Craig Tiller565b18b2015-09-23 10:09:42 -0700534 memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
535 memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
Craig Tillera82950e2015-09-22 12:33:20 -0700536 sizeof(*filters) * c->connecting_result.num_filters);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700537 filters[num_filters - 1] = &grpc_connected_channel_filter;
Craig Tiller5945ee12015-06-27 10:36:09 -0700538
Craig Tillerdf91ba52015-06-29 10:55:46 -0700539 /* construct channel stack */
Craig Tillera82950e2015-09-22 12:33:20 -0700540 channel_stack_size = grpc_channel_stack_size(filters, num_filters);
541 con = gpr_malloc(sizeof(connection) + channel_stack_size);
542 stk = (grpc_channel_stack *)(con + 1);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700543 con->refs = 0;
544 con->subchannel = c;
Craig Tillera82950e2015-09-22 12:33:20 -0700545 grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
546 c->mdctx, stk);
547 grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
Craig Tiller565b18b2015-09-23 10:09:42 -0700548 gpr_free((void *)c->connecting_result.filters);
Craig Tillera82950e2015-09-22 12:33:20 -0700549 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
Craig Tillerff54c922015-06-26 16:57:20 -0700550
Craig Tillerdf91ba52015-06-29 10:55:46 -0700551 /* initialize state watcher */
Craig Tillera82950e2015-09-22 12:33:20 -0700552 sw = gpr_malloc(sizeof(*sw));
553 grpc_closure_init(&sw->closure, on_state_changed, sw);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700554 sw->subchannel = c;
555 sw->connectivity_state = GRPC_CHANNEL_READY;
556
Craig Tillera82950e2015-09-22 12:33:20 -0700557 gpr_mu_lock(&c->mu);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700558
Craig Tillera82950e2015-09-22 12:33:20 -0700559 if (c->disconnected) {
560 gpr_mu_unlock(&c->mu);
561 gpr_free(sw);
Craig Tiller565b18b2015-09-23 10:09:42 -0700562 gpr_free((void *)filters);
Craig Tillera82950e2015-09-22 12:33:20 -0700563 grpc_channel_stack_destroy(exec_ctx, stk);
564 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
565 GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
566 return;
567 }
Craig Tillerb6fbf1d2015-06-29 15:25:49 -0700568
Craig Tillerdf91ba52015-06-29 10:55:46 -0700569 /* publish */
Craig Tillera82950e2015-09-22 12:33:20 -0700570 if (c->active != NULL && c->active->refs == 0) {
571 destroy_connection = c->active;
572 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700573 c->active = con;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700574 c->active_version++;
575 sw->version = c->active_version;
Craig Tiller4ab82d22015-06-29 09:40:33 -0700576 c->connecting = 0;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700577
578 /* watch for changes; subchannel ref for connecting is donated
579 to the state watcher */
Craig Tillera82950e2015-09-22 12:33:20 -0700580 memset(&op, 0, sizeof(op));
Craig Tillerdf91ba52015-06-29 10:55:46 -0700581 op.connectivity_state = &sw->connectivity_state;
582 op.on_connectivity_state_change = &sw->closure;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700583 op.bind_pollset_set = c->pollset_set;
Craig Tillera82950e2015-09-22 12:33:20 -0700584 SUBCHANNEL_REF_LOCKED(c, "state_watcher");
585 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
586 GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
587 elem =
588 grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
589 elem->filter->start_transport_op(exec_ctx, elem, &op);
Craig Tillerdf91ba52015-06-29 10:55:46 -0700590
591 /* signal completion */
Craig Tillera82950e2015-09-22 12:33:20 -0700592 connectivity_state_changed_locked(exec_ctx, c, "connected");
Craig Tiller5795da72015-09-17 15:27:13 -0700593 w4c = c->waiting;
594 c->waiting = NULL;
Craig Tillerdf91ba52015-06-29 10:55:46 -0700595
Craig Tillera82950e2015-09-22 12:33:20 -0700596 gpr_mu_unlock(&c->mu);
Craig Tiller5795da72015-09-17 15:27:13 -0700597
Craig Tillera82950e2015-09-22 12:33:20 -0700598 while (w4c != NULL) {
599 waiting_for_connect *next = w4c->next;
600 grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
601 w4c = next;
602 }
Craig Tiller5945ee12015-06-27 10:36:09 -0700603
Craig Tiller565b18b2015-09-23 10:09:42 -0700604 gpr_free((void *)filters);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700605
Craig Tillera82950e2015-09-22 12:33:20 -0700606 if (destroy_connection != NULL) {
607 connection_destroy(exec_ctx, destroy_connection);
608 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700609}
Craig Tillerff54c922015-06-26 16:57:20 -0700610
yang-gb4e262c2015-07-21 16:11:55 -0700611/* Generate a random number between 0 and 1. */
Craig Tillera82950e2015-09-22 12:33:20 -0700612static double generate_uniform_random_number(grpc_subchannel *c) {
613 c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
614 return c->random / (double)((gpr_uint32)1 << 31);
yang-gb4e262c2015-07-21 16:11:55 -0700615}
616
617/* Update backoff_delta and next_attempt in subchannel */
Craig Tillera82950e2015-09-22 12:33:20 -0700618static void update_reconnect_parameters(grpc_subchannel *c) {
yang-gb4e262c2015-07-21 16:11:55 -0700619 gpr_int32 backoff_delta_millis, jitter;
Craig Tillera82950e2015-09-22 12:33:20 -0700620 gpr_int32 max_backoff_millis =
621 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
yang-gb4e262c2015-07-21 16:11:55 -0700622 double jitter_range;
Craig Tillera82950e2015-09-22 12:33:20 -0700623 backoff_delta_millis =
624 (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
625 GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
626 if (backoff_delta_millis > max_backoff_millis) {
627 backoff_delta_millis = max_backoff_millis;
628 }
629 c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
630 c->next_attempt =
631 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
yang-gb4e262c2015-07-21 16:11:55 -0700632
633 jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
Craig Tillera82950e2015-09-22 12:33:20 -0700634 jitter =
635 (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
636 c->next_attempt =
637 gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
yang-gb4e262c2015-07-21 16:11:55 -0700638}
639
Craig Tillera82950e2015-09-22 12:33:20 -0700640static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700641 grpc_subchannel *c = arg;
Craig Tillera82950e2015-09-22 12:33:20 -0700642 gpr_mu_lock(&c->mu);
Craig Tillerff3ae682015-06-29 17:44:04 -0700643 c->have_alarm = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700644 if (c->disconnected) {
645 iomgr_success = 0;
646 }
647 connectivity_state_changed_locked(exec_ctx, c, "alarm");
648 gpr_mu_unlock(&c->mu);
649 if (iomgr_success) {
650 update_reconnect_parameters(c);
651 continue_connect(exec_ctx, c);
652 } else {
653 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
654 GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
655 }
Craig Tiller45724b32015-09-22 10:42:19 -0700656}
657
Craig Tillera82950e2015-09-22 12:33:20 -0700658static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
659 int iomgr_success) {
Craig Tiller45724b32015-09-22 10:42:19 -0700660 grpc_subchannel *c = arg;
Craig Tillera82950e2015-09-22 12:33:20 -0700661 if (c->connecting_result.transport != NULL) {
662 publish_transport(exec_ctx, c);
663 } else {
664 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
665 gpr_mu_lock(&c->mu);
666 GPR_ASSERT(!c->have_alarm);
667 c->have_alarm = 1;
668 connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
669 grpc_alarm_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
670 gpr_mu_unlock(&c->mu);
671 }
Craig Tiller45724b32015-09-22 10:42:19 -0700672}
673
Craig Tillera82950e2015-09-22 12:33:20 -0700674static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
675 gpr_timespec current_deadline =
676 gpr_time_add(c->next_attempt, c->backoff_delta);
677 gpr_timespec min_deadline = gpr_time_add(
678 gpr_now(GPR_CLOCK_MONOTONIC),
679 gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
680 GPR_TIMESPAN));
681 return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
682 : min_deadline;
Craig Tiller45724b32015-09-22 10:42:19 -0700683}
684
Craig Tillera82950e2015-09-22 12:33:20 -0700685static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
686 if (c->disconnected) {
687 return GRPC_CHANNEL_FATAL_FAILURE;
688 }
689 if (c->connecting) {
690 if (c->have_alarm) {
691 return GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tiller45724b32015-09-22 10:42:19 -0700692 }
Craig Tillera82950e2015-09-22 12:33:20 -0700693 return GRPC_CHANNEL_CONNECTING;
694 }
695 if (c->active) {
696 return GRPC_CHANNEL_READY;
697 }
Craig Tiller5f84c842015-06-26 16:08:21 -0700698 return GRPC_CHANNEL_IDLE;
699}
700
Craig Tillera82950e2015-09-22 12:33:20 -0700701static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
702 grpc_subchannel *c,
703 const char *reason) {
704 grpc_connectivity_state current = compute_connectivity_locked(c);
705 grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
Craig Tiller5f84c842015-06-26 16:08:21 -0700706}
707
Craig Tiller2595ab72015-06-25 15:26:00 -0700708/*
709 * grpc_subchannel_call implementation
710 */
711
Craig Tillera82950e2015-09-22 12:33:20 -0700712void grpc_subchannel_call_ref(
713 grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
714 gpr_ref(&c->refs);
Craig Tillerc3967532015-06-29 14:59:38 -0700715}
Craig Tiller2595ab72015-06-25 15:26:00 -0700716
Craig Tillera82950e2015-09-22 12:33:20 -0700717void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
718 grpc_subchannel_call *c
719 GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
720 if (gpr_unref(&c->refs)) {
721 gpr_mu *mu = &c->connection->subchannel->mu;
722 grpc_subchannel *destroy;
723 grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
724 gpr_mu_lock(mu);
725 destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
726 gpr_mu_unlock(mu);
727 gpr_free(c);
728 if (destroy != NULL) {
729 subchannel_destroy(exec_ctx, destroy);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700730 }
Craig Tillera82950e2015-09-22 12:33:20 -0700731 }
Craig Tiller2595ab72015-06-25 15:26:00 -0700732}
733
Craig Tillera82950e2015-09-22 12:33:20 -0700734char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
735 grpc_subchannel_call *call) {
736 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
737 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
738 return top_elem->filter->get_peer(exec_ctx, top_elem);
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700739}
740
Craig Tillera82950e2015-09-22 12:33:20 -0700741void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
742 grpc_subchannel_call *call,
743 grpc_transport_stream_op *op) {
744 grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
745 grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
746 top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
Craig Tiller2595ab72015-06-25 15:26:00 -0700747}
Craig Tillereb3b12e2015-06-26 14:42:49 -0700748
Craig Tillera82950e2015-09-22 12:33:20 -0700749static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
750 connection *con) {
751 grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
752 grpc_subchannel_call *call =
753 gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
754 grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700755 call->connection = con;
Craig Tillera82950e2015-09-22 12:33:20 -0700756 gpr_ref_init(&call->refs, 1);
757 grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk);
Craig Tiller04c5d4b2015-06-26 17:21:41 -0700758 return call;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700759}