blob: 3db2fae593458ad9dda3b598c853fcc6b04eb47e [file] [log] [blame]
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/client_config/subchannel.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/transport/connectivity_state.h"
#include "src/core/surface/channel.h"
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
typedef struct
{
/* all fields protected by subchannel->mu */
/** refcount */
int refs;
/** parent subchannel */
grpc_subchannel *subchannel;
} connection;
typedef struct
{
grpc_closure closure;
size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
} state_watcher;
typedef struct waiting_for_connect
{
struct waiting_for_connect *next;
grpc_closure *notify;
grpc_pollset *pollset;
grpc_subchannel_call **target;
grpc_subchannel *subchannel;
grpc_closure continuation;
} waiting_for_connect;
struct grpc_subchannel
{
grpc_connector *connector;
/** non-transport related channel filters */
const grpc_channel_filter **filters;
size_t num_filters;
/** channel arguments */
grpc_channel_args *args;
/** address to connect to */
struct sockaddr *addr;
size_t addr_len;
/** metadata context */
grpc_mdctx *mdctx;
/** master channel - the grpc_channel instance that ultimately owns
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
grpc_channel *master;
/** have we seen a disconnection? */
int disconnected;
/** set during connection */
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
grpc_closure connected;
/** pollset_set tracking who's interested in a connection
being setup - owned by the master channel (in particular the
client_channel
filter there-in) */
grpc_pollset_set *pollset_set;
/** mutex protecting remaining elements */
gpr_mu mu;
/** active connection */
connection *active;
/** version number for the active connection */
size_t active_version;
/** refcount */
int refs;
/** are we connecting */
int connecting;
/** things waiting for a connection */
waiting_for_connect *waiting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
/** next connect attempt time */
gpr_timespec next_attempt;
/** amount to backoff each failure */
gpr_timespec backoff_delta;
/** do we have an active alarm? */
int have_alarm;
/** our alarm */
grpc_alarm alarm;
/** current random value */
gpr_uint32 random;
};
struct grpc_subchannel_call
{
connection *connection;
gpr_refcount refs;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call (grpc_exec_ctx * exec_ctx, connection * con);
static void connectivity_state_changed_locked (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, const char *reason);
static grpc_connectivity_state compute_connectivity_locked (grpc_subchannel * c);
static gpr_timespec compute_connect_deadline (grpc_subchannel * c);
static void subchannel_connected (grpc_exec_ctx * exec_ctx, void *subchannel, int iomgr_success);
static void subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static int
subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
GRPC_MUST_USE_RESULT;
static void connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static grpc_subchannel *connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void subchannel_destroy (grpc_exec_ctx * exec_ctx, grpc_subchannel * c);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) \
subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) \
connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(p, r, cl) \
connection_unref_locked((p), (cl), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs + 1, reason)
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs - 1, reason)
#else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
#define CONNECTION_UNREF_LOCKED(p, r, cl) connection_unref_locked((p), (cl))
#define REF_PASS_ARGS
#define REF_LOG(name, p) \
do { \
} while (0)
#define UNREF_LOG(name, p) \
do { \
} while (0)
#endif
/*
* connection implementation
*/
static void connection_destroy (grpc_exec_ctx * exec_ctx, connection * c)
{
GPR_ASSERT (c->refs == 0);
grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (exec_ctx, c));
gpr_free (c);
}
static void
connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
REF_LOG ("CONNECTION", c);
subchannel_ref_locked (c->subchannel REF_PASS_ARGS);
++c->refs;
}
static grpc_subchannel *
connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
grpc_subchannel *destroy = NULL;
UNREF_LOG ("CONNECTION", c);
if (subchannel_unref_locked (c->subchannel REF_PASS_ARGS))
{
destroy = c->subchannel;
}
if (--c->refs == 0 && c->subchannel->active != c)
{
connection_destroy (exec_ctx, c);
}
return destroy;
}
/*
* grpc_subchannel implementation
*/
static void
subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
REF_LOG ("SUBCHANNEL", c);
++c->refs;
}
static int
subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
UNREF_LOG ("SUBCHANNEL", c);
return --c->refs == 0;
}
void
grpc_subchannel_ref (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
gpr_mu_lock (&c->mu);
subchannel_ref_locked (c REF_PASS_ARGS);
gpr_mu_unlock (&c->mu);
}
void
grpc_subchannel_unref (grpc_subchannel * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
int destroy;
gpr_mu_lock (&c->mu);
destroy = subchannel_unref_locked (c REF_PASS_ARGS);
gpr_mu_unlock (&c->mu);
if (destroy)
subchannel_destroy (exec_ctx, c);
}
static void
subchannel_destroy (grpc_exec_ctx * exec_ctx, grpc_subchannel * c)
{
if (c->active != NULL)
{
connection_destroy (exec_ctx, c->active);
}
gpr_free (c->filters);
grpc_channel_args_destroy (c->args);
gpr_free (c->addr);
grpc_mdctx_unref (c->mdctx);
grpc_connectivity_state_destroy (exec_ctx, &c->state_tracker);
grpc_connector_unref (exec_ctx, c->connector);
gpr_free (c);
}
void
grpc_subchannel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset)
{
grpc_pollset_set_add_pollset (exec_ctx, c->pollset_set, pollset);
}
void
grpc_subchannel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset)
{
grpc_pollset_set_del_pollset (exec_ctx, c->pollset_set, pollset);
}
static gpr_uint32
random_seed ()
{
return (gpr_uint32) (gpr_time_to_millis (gpr_now (GPR_CLOCK_MONOTONIC)));
}
grpc_subchannel *
grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args)
{
grpc_subchannel *c = gpr_malloc (sizeof (*c));
grpc_channel_element *parent_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (args->master));
memset (c, 0, sizeof (*c));
c->refs = 1;
c->connector = connector;
grpc_connector_ref (c->connector);
c->num_filters = args->filter_count;
c->filters = gpr_malloc (sizeof (grpc_channel_filter *) * c->num_filters);
memcpy (c->filters, args->filters, sizeof (grpc_channel_filter *) * c->num_filters);
c->addr = gpr_malloc (args->addr_len);
memcpy (c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
c->args = grpc_channel_args_copy (args->args);
c->mdctx = args->mdctx;
c->master = args->master;
c->pollset_set = grpc_client_channel_get_connecting_pollset_set (parent_elem);
c->random = random_seed ();
grpc_mdctx_ref (c->mdctx);
grpc_closure_init (&c->connected, subchannel_connected, c);
grpc_connectivity_state_init (&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel");
gpr_mu_init (&c->mu);
return c;
}
static void
continue_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c)
{
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline (c);
args.channel_args = c->args;
grpc_connector_connect (exec_ctx, c->connector, &args, &c->connecting_result, &c->connected);
}
static void
start_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c)
{
c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta);
continue_connect (exec_ctx, c);
}
static void
continue_creating_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party (exec_ctx, w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call (exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
GRPC_SUBCHANNEL_UNREF (exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free (w4c);
}
void
grpc_subchannel_create_call (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify)
{
connection *con;
gpr_mu_lock (&c->mu);
if (c->active != NULL)
{
con = c->active;
CONNECTION_REF_LOCKED (con, "call");
gpr_mu_unlock (&c->mu);
*target = create_call (exec_ctx, con);
notify->cb (exec_ctx, notify->cb_arg, 1);
}
else
{
waiting_for_connect *w4c = gpr_malloc (sizeof (*w4c));
w4c->next = c->waiting;
w4c->notify = notify;
w4c->pollset = pollset;
w4c->target = target;
w4c->subchannel = c;
/* released when clearing w4c */
SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect");
grpc_closure_init (&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party (exec_ctx, c, pollset);
if (!c->connecting)
{
c->connecting = 1;
connectivity_state_changed_locked (exec_ctx, c, "create_call");
/* released by connection */
SUBCHANNEL_REF_LOCKED (c, "connecting");
GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting");
gpr_mu_unlock (&c->mu);
start_connect (exec_ctx, c);
}
else
{
gpr_mu_unlock (&c->mu);
}
}
}
grpc_connectivity_state
grpc_subchannel_check_connectivity (grpc_subchannel * c)
{
grpc_connectivity_state state;
gpr_mu_lock (&c->mu);
state = grpc_connectivity_state_check (&c->state_tracker);
gpr_mu_unlock (&c->mu);
return state;
}
void
grpc_subchannel_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_connectivity_state * state, grpc_closure * notify)
{
int do_connect = 0;
gpr_mu_lock (&c->mu);
if (grpc_connectivity_state_notify_on_state_change (exec_ctx, &c->state_tracker, state, notify))
{
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED (c, "connecting");
GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting");
connectivity_state_changed_locked (exec_ctx, c, "state_change");
}
gpr_mu_unlock (&c->mu);
if (do_connect)
{
start_connect (exec_ctx, c);
}
}
void
grpc_subchannel_process_transport_op (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_transport_op * op)
{
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
gpr_mu_lock (&c->mu);
if (c->active != NULL)
{
con = c->active;
CONNECTION_REF_LOCKED (con, "transport-op");
}
if (op->disconnect)
{
c->disconnected = 1;
connectivity_state_changed_locked (exec_ctx, c, "disconnect");
if (c->have_alarm)
{
cancel_alarm = 1;
}
}
gpr_mu_unlock (&c->mu);
if (con != NULL)
{
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con);
grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0);
top_elem->filter->start_transport_op (exec_ctx, top_elem, op);
gpr_mu_lock (&c->mu);
destroy = CONNECTION_UNREF_LOCKED (exec_ctx, con, "transport-op");
gpr_mu_unlock (&c->mu);
if (destroy)
{
subchannel_destroy (exec_ctx, destroy);
}
}
if (cancel_alarm)
{
grpc_alarm_cancel (exec_ctx, &c->alarm);
}
if (op->disconnect)
{
grpc_connector_shutdown (exec_ctx, c->connector);
}
}
static void
on_state_changed (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success)
{
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
int destroy;
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
gpr_mu_lock (mu);
/* if we failed or there is a version number mismatch, just leave
this closure */
if (!iomgr_success || sw->subchannel->active_version != sw->version)
{
goto done;
}
switch (sw->connectivity_state)
{
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
case GRPC_CHANNEL_IDLE:
/* all is still good: keep watching */
memset (&op, 0, sizeof (op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0);
elem->filter->start_transport_op (exec_ctx, elem, &op);
/* early out */
gpr_mu_unlock (mu);
return;
case GRPC_CHANNEL_FATAL_FAILURE:
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things have gone wrong, deactivate and enter idle */
if (sw->subchannel->active->refs == 0)
{
destroy_connection = sw->subchannel->active;
}
sw->subchannel->active = NULL;
grpc_connectivity_state_set (exec_ctx, &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed");
break;
}
done:
connectivity_state_changed_locked (exec_ctx, c, "transport_state_changed");
destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher");
gpr_free (sw);
gpr_mu_unlock (mu);
if (destroy)
{
subchannel_destroy (exec_ctx, c);
}
if (destroy_connection != NULL)
{
connection_destroy (exec_ctx, destroy_connection);
}
}
static void
publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c)
{
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
size_t num_filters;
const grpc_channel_filter **filters;
waiting_for_connect *w4c;
grpc_transport_op op;
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc (sizeof (*filters) * num_filters);
memcpy (filters, c->filters, sizeof (*filters) * c->num_filters);
memcpy (filters + c->num_filters, c->connecting_result.filters, sizeof (*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
/* construct channel stack */
channel_stack_size = grpc_channel_stack_size (filters, num_filters);
con = gpr_malloc (sizeof (connection) + channel_stack_size);
stk = (grpc_channel_stack *) (con + 1);
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init (exec_ctx, filters, num_filters, c->master, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport (stk, c->connecting_result.transport);
gpr_free (c->connecting_result.filters);
memset (&c->connecting_result, 0, sizeof (c->connecting_result));
/* initialize state watcher */
sw = gpr_malloc (sizeof (*sw));
grpc_closure_init (&sw->closure, on_state_changed, sw);
sw->subchannel = c;
sw->connectivity_state = GRPC_CHANNEL_READY;
gpr_mu_lock (&c->mu);
if (c->disconnected)
{
gpr_mu_unlock (&c->mu);
gpr_free (sw);
gpr_free (filters);
grpc_channel_stack_destroy (exec_ctx, stk);
GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting");
return;
}
/* publish */
if (c->active != NULL && c->active->refs == 0)
{
destroy_connection = c->active;
}
c->active = con;
c->active_version++;
sw->version = c->active_version;
c->connecting = 0;
/* watch for changes; subchannel ref for connecting is donated
to the state watcher */
memset (&op, 0, sizeof (op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
op.bind_pollset_set = c->pollset_set;
SUBCHANNEL_REF_LOCKED (c, "state_watcher");
GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting");
GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting"));
elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0);
elem->filter->start_transport_op (exec_ctx, elem, &op);
/* signal completion */
connectivity_state_changed_locked (exec_ctx, c, "connected");
w4c = c->waiting;
c->waiting = NULL;
gpr_mu_unlock (&c->mu);
while (w4c != NULL)
{
waiting_for_connect *next = w4c->next;
grpc_closure_list_add (closure_list, &w4c->continuation, 1);
w4c = next;
}
gpr_free (filters);
if (destroy_connection != NULL)
{
connection_destroy (exec_ctx, destroy_connection);
}
}
/* Generate a random number between 0 and 1. */
static double
generate_uniform_random_number (grpc_subchannel * c)
{
c->random = (1103515245 * c->random + 12345) % ((gpr_uint32) 1 << 31);
return c->random / (double) ((gpr_uint32) 1 << 31);
}
/* Update backoff_delta and next_attempt in subchannel */
static void
update_reconnect_parameters (grpc_subchannel * c)
{
gpr_int32 backoff_delta_millis, jitter;
gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
double jitter_range;
backoff_delta_millis = (gpr_int32) (gpr_time_to_millis (c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
if (backoff_delta_millis > max_backoff_millis)
{
backoff_delta_millis = max_backoff_millis;
}
c->backoff_delta = gpr_time_from_millis (backoff_delta_millis, GPR_TIMESPAN);
c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta);
jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
jitter = (gpr_int32) ((2 * generate_uniform_random_number (c) - 1) * jitter_range);
c->next_attempt = gpr_time_add (c->next_attempt, gpr_time_from_millis (jitter, GPR_TIMESPAN));
}
static void
on_alarm (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
grpc_subchannel *c = arg;
gpr_mu_lock (&c->mu);
c->have_alarm = 0;
if (c->disconnected)
{
iomgr_success = 0;
}
connectivity_state_changed_locked (exec_ctx, c, "alarm");
gpr_mu_unlock (&c->mu);
if (iomgr_success)
{
update_reconnect_parameters (c);
continue_connect (exec_ctx, c);
}
else
{
GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting");
}
}
static void
subchannel_connected (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL)
{
publish_transport (exec_ctx, c);
}
else
{
gpr_timespec now = gpr_now (GPR_CLOCK_MONOTONIC);
gpr_mu_lock (&c->mu);
GPR_ASSERT (!c->have_alarm);
c->have_alarm = 1;
connectivity_state_changed_locked (exec_ctx, c, "connect_failed");
grpc_alarm_init (exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock (&c->mu);
}
}
static gpr_timespec
compute_connect_deadline (grpc_subchannel * c)
{
gpr_timespec current_deadline = gpr_time_add (c->next_attempt, c->backoff_delta);
gpr_timespec min_deadline = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds (GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
GPR_TIMESPAN));
return gpr_time_cmp (current_deadline, min_deadline) > 0 ? current_deadline : min_deadline;
}
static grpc_connectivity_state
compute_connectivity_locked (grpc_subchannel * c)
{
if (c->disconnected)
{
return GRPC_CHANNEL_FATAL_FAILURE;
}
if (c->connecting)
{
if (c->have_alarm)
{
return GRPC_CHANNEL_TRANSIENT_FAILURE;
}
return GRPC_CHANNEL_CONNECTING;
}
if (c->active)
{
return GRPC_CHANNEL_READY;
}
return GRPC_CHANNEL_IDLE;
}
static void
connectivity_state_changed_locked (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, const char *reason)
{
grpc_connectivity_state current = compute_connectivity_locked (c);
grpc_connectivity_state_set (exec_ctx, &c->state_tracker, current, reason);
}
/*
* grpc_subchannel_call implementation
*/
void
grpc_subchannel_call_ref (grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
gpr_ref (&c->refs);
}
void
grpc_subchannel_call_unref (grpc_subchannel_call * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
{
if (gpr_unref (&c->refs))
{
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;
grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (exec_ctx, c));
gpr_mu_lock (mu);
destroy = CONNECTION_UNREF_LOCKED (exec_ctx, c->connection, "call");
gpr_mu_unlock (mu);
gpr_free (c);
if (destroy != NULL)
{
subchannel_destroy (exec_ctx, destroy);
}
}
}
char *
grpc_subchannel_call_get_peer (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * call)
{
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call);
grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0);
return top_elem->filter->get_peer (exec_ctx, top_elem);
}
void
grpc_subchannel_call_process_op (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * call, grpc_transport_stream_op * op)
{
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call);
grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0);
top_elem->filter->start_transport_stream_op (exec_ctx, top_elem, op);
}
static grpc_subchannel_call *
create_call (grpc_exec_ctx * exec_ctx, connection * con)
{
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION (con);
grpc_subchannel_call *call = gpr_malloc (sizeof (grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call);
call->connection = con;
gpr_ref_init (&call->refs, 1);
grpc_call_stack_init (exec_ctx, chanstk, NULL, NULL, callstk);
return call;
}