blob: 9a7c1c77ca1a90e2068f3401722f5233684bd8be [file] [log] [blame]
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/client_channel.h"
#include <stdio.h>
#include <string.h>
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
/* Client channel implementation */
typedef struct call_data call_data;
typedef struct
{
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
grpc_resolver *resolver;
/** have we started resolving this channel */
int started_resolving;
/** master channel - the grpc_channel instance that ultimately owns
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
grpc_channel *master;
/** mutex protecting client configuration, including all
variables below in this data structure */
gpr_mu mu_config;
/** currently active load balancer - guarded by mu_config */
grpc_lb_policy *lb_policy;
/** incoming configuration - set by resolver.next
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
/** resolver callback */
grpc_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
int exit_idle_when_lb_policy_arrives;
/** pollset_set of interested parties in a new connection */
grpc_pollset_set pollset_set;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
resolver,
to watch for state changes from the lb_policy. When a state change is seen,
we
update the channel, and create a new watcher */
typedef struct
{
channel_data *chand;
grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
typedef enum
{
CALL_CREATED,
CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CONFIG,
CALL_WAITING_FOR_PICK,
CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
struct call_data
{
/* owning element */
grpc_call_element *elem;
gpr_mu mu_state;
call_state state;
gpr_timespec deadline;
grpc_subchannel *picked_channel;
grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
};
static grpc_closure *
merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops)
{
grpc_stream_ops_unref_owned_objects (op->send_ops->ops, op->send_ops->nops);
op->on_done_send->cb (exec_ctx, op->on_done_send->cb_arg, 0);
}
if (op->recv_ops)
{
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa (GRPC_STATUS_CANCELLED, status);
calld->status.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-status", status);
calld->details.md = grpc_mdelem_from_strings (chand->mdctx, "grpc-message", "Cancelled");
calld->status.prev = calld->details.next = NULL;
calld->status.next = &calld->details;
calld->details.prev = &calld->status;
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata (op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb (exec_ctx, op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed)
{
op->on_consumed->cb (exec_ctx, op->on_consumed->cb_arg, 0);
}
}
typedef struct
{
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
static void perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation);
static void
continue_with_pick (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
waiting_call *wc = arg;
call_data *calld = wc->elem->call_data;
perform_transport_stream_op (exec_ctx, wc->elem, &calld->waiting_op, 1);
gpr_free (wc);
}
static void
add_to_lb_policy_wait_queue_locked_state_config (grpc_call_element * elem)
{
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc (sizeof (*wc));
grpc_closure_init (&wc->closure, continue_with_pick, wc);
wc->elem = elem;
grpc_closure_list_add (&chand->waiting_for_config_closures, &wc->closure, 1);
}
static int
is_empty (void *p, int len)
{
char *ptr = p;
int i;
for (i = 0; i < len; i++)
{
if (ptr[i] != 0)
return 0;
}
return 1;
}
static void
started_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
gpr_mu_lock (&calld->mu_state);
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL)
{
memset (&op, 0, sizeof (op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
gpr_mu_unlock (&calld->mu_state);
grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &op);
}
else if (calld->state == CALL_WAITING_FOR_CALL)
{
have_waiting = !is_empty (&calld->waiting_op, sizeof (calld->waiting_op));
if (calld->subchannel_call != NULL)
{
calld->state = CALL_ACTIVE;
gpr_mu_unlock (&calld->mu_state);
if (have_waiting)
{
grpc_subchannel_call_process_op (exec_ctx, calld->subchannel_call, &calld->waiting_op);
}
}
else
{
calld->state = CALL_CANCELLED;
gpr_mu_unlock (&calld->mu_state);
if (have_waiting)
{
handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op);
}
}
}
else
{
GPR_ASSERT (calld->state == CALL_CANCELLED);
gpr_mu_unlock (&calld->mu_state);
}
}
static void
picked_target (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
call_data *calld = arg;
grpc_pollset *pollset;
if (calld->picked_channel == NULL)
{
/* treat this like a cancellation */
calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
perform_transport_stream_op (exec_ctx, calld->elem, &calld->waiting_op, 1);
}
else
{
gpr_mu_lock (&calld->mu_state);
if (calld->state == CALL_CANCELLED)
{
gpr_mu_unlock (&calld->mu_state);
handle_op_after_cancellation (exec_ctx, calld->elem, &calld->waiting_op);
}
else
{
GPR_ASSERT (calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
gpr_mu_unlock (&calld->mu_state);
grpc_closure_init (&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call (exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, &calld->async_setup_task);
}
}
}
static grpc_closure *
merge_into_waiting_op (grpc_call_element * elem, grpc_transport_stream_op * new_op)
{
call_data *calld = elem->call_data;
grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT ((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT ((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
if (new_op->send_ops != NULL)
{
waiting_op->send_ops = new_op->send_ops;
waiting_op->is_last_send = new_op->is_last_send;
waiting_op->on_done_send = new_op->on_done_send;
}
if (new_op->recv_ops != NULL)
{
waiting_op->recv_ops = new_op->recv_ops;
waiting_op->recv_state = new_op->recv_state;
waiting_op->on_done_recv = new_op->on_done_recv;
}
if (new_op->on_consumed != NULL)
{
if (waiting_op->on_consumed != NULL)
{
consumed_op = waiting_op->on_consumed;
}
waiting_op->on_consumed = new_op->on_consumed;
}
if (new_op->cancel_with_status != GRPC_STATUS_OK)
{
waiting_op->cancel_with_status = new_op->cancel_with_status;
}
return consumed_op;
}
static char *
cc_get_peer (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
char *result;
gpr_mu_lock (&calld->mu_state);
if (calld->state == CALL_ACTIVE)
{
subchannel_call = calld->subchannel_call;
GRPC_SUBCHANNEL_CALL_REF (subchannel_call, "get_peer");
gpr_mu_unlock (&calld->mu_state);
result = grpc_subchannel_call_get_peer (exec_ctx, subchannel_call);
GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "get_peer");
return result;
}
else
{
gpr_mu_unlock (&calld->mu_state);
return grpc_channel_get_target (chand->master);
}
}
static void
perform_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op, int continuation)
{
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP (GPR_INFO, elem, op);
gpr_mu_lock (&calld->mu_state);
switch (calld->state)
{
case CALL_ACTIVE:
GPR_ASSERT (!continuation);
subchannel_call = calld->subchannel_call;
gpr_mu_unlock (&calld->mu_state);
grpc_subchannel_call_process_op (exec_ctx, subchannel_call, op);
break;
case CALL_CANCELLED:
gpr_mu_unlock (&calld->mu_state);
handle_op_after_cancellation (exec_ctx, elem, op);
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT (!continuation);
grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1);
if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK)
{
gpr_mu_unlock (&calld->mu_state);
break;
}
*op = calld->waiting_op;
memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
continuation = 1;
/* fall through */
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CALL:
if (!continuation)
{
if (op->cancel_with_status != GRPC_STATUS_OK)
{
calld->state = CALL_CANCELLED;
op2 = calld->waiting_op;
memset (&calld->waiting_op, 0, sizeof (calld->waiting_op));
if (op->on_consumed)
{
calld->waiting_op.on_consumed = op->on_consumed;
op->on_consumed = NULL;
}
else if (op2.on_consumed)
{
calld->waiting_op.on_consumed = op2.on_consumed;
op2.on_consumed = NULL;
}
gpr_mu_unlock (&calld->mu_state);
handle_op_after_cancellation (exec_ctx, elem, op);
handle_op_after_cancellation (exec_ctx, elem, &op2);
}
else
{
grpc_closure_list_add (closure_list, merge_into_waiting_op (elem, op), 1);
gpr_mu_unlock (&calld->mu_state);
}
break;
}
/* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK)
{
calld->state = CALL_CANCELLED;
gpr_mu_unlock (&calld->mu_state);
handle_op_after_cancellation (exec_ctx, elem, op);
}
else
{
calld->waiting_op = *op;
if (op->send_ops == NULL)
{
/* need to have some send ops before we can select the
lb target */
calld->state = CALL_WAITING_FOR_SEND;
gpr_mu_unlock (&calld->mu_state);
}
else
{
gpr_mu_lock (&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy)
{
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF (lb_policy, "pick");
gpr_mu_unlock (&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
GPR_ASSERT (op->bind_pollset);
GPR_ASSERT (op->send_ops);
GPR_ASSERT (op->send_ops->nops >= 1);
GPR_ASSERT (op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock (&calld->mu_state);
grpc_closure_init (&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick (exec_ctx, lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, &calld->async_setup_task);
GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "pick");
}
else if (chand->resolver != NULL)
{
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config (elem);
if (!chand->started_resolving && chand->resolver != NULL)
{
GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
chand->started_resolving = 1;
grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
}
gpr_mu_unlock (&chand->mu_config);
gpr_mu_unlock (&calld->mu_state);
}
else
{
calld->state = CALL_CANCELLED;
gpr_mu_unlock (&chand->mu_config);
gpr_mu_unlock (&calld->mu_state);
handle_op_after_cancellation (exec_ctx, elem, op);
}
}
}
break;
}
}
static void
cc_start_transport_stream_op (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, grpc_transport_stream_op * op)
{
perform_transport_stream_op (exec_ctx, elem, op, 0);
}
static void watch_lb_policy (channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state, grpc_closure_list * cl);
static void
on_lb_policy_state_changed_locked (lb_policy_connectivity_watcher * w, grpc_closure_list * cl)
{
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy)
return;
grpc_connectivity_state_set (&w->chand->state_tracker, w->state, "lb_changed", cl);
if (w->state != GRPC_CHANNEL_FATAL_FAILURE)
{
watch_lb_policy (w->chand, w->lb_policy, w->state, cl);
}
}
static void
on_lb_policy_state_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock (&w->chand->mu_config);
on_lb_policy_state_changed_locked (exec_ctx, w);
gpr_mu_unlock (&w->chand->mu_config);
GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->chand->master, "watch_lb_policy");
gpr_free (w);
}
static void
watch_lb_policy (grpc_exec_ctx * exec_ctx, channel_data * chand, grpc_lb_policy * lb_policy, grpc_connectivity_state current_state)
{
lb_policy_connectivity_watcher *w = gpr_malloc (sizeof (*w));
GRPC_CHANNEL_INTERNAL_REF (chand->master, "watch_lb_policy");
w->chand = chand;
grpc_closure_init (&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change (exec_ctx, lb_policy, &w->state, &w->on_changed);
}
static void
cc_on_config_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success)
{
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
if (chand->incoming_configuration != NULL)
{
lb_policy = grpc_client_config_get_lb_policy (chand->incoming_configuration);
if (lb_policy != NULL)
{
GRPC_LB_POLICY_REF (lb_policy, "channel");
GRPC_LB_POLICY_REF (lb_policy, "config_change");
state = grpc_lb_policy_check_connectivity (exec_ctx, lb_policy);
}
grpc_client_config_unref (exec_ctx, chand->incoming_configuration);
}
chand->incoming_configuration = NULL;
gpr_mu_lock (&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */ )
{
grpc_closure_list_move (exec_ctx, &chand->waiting_for_config_closures);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives)
{
GRPC_LB_POLICY_REF (lb_policy, "exit_idle");
exit_idle = 1;
chand->exit_idle_when_lb_policy_arrives = 0;
}
if (iomgr_success && chand->resolver)
{
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF (resolver, "channel-next");
grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, state, "new_lb+resolver");
if (lb_policy != NULL)
{
watch_lb_policy (exec_ctx, chand, lb_policy, state);
}
gpr_mu_unlock (&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed);
GRPC_RESOLVER_UNREF (exec_ctx, resolver, "channel-next");
}
else
{
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock (&chand->mu_config);
if (old_resolver != NULL)
{
grpc_resolver_shutdown (exec_ctx, old_resolver);
GRPC_RESOLVER_UNREF (exec_ctx, old_resolver, "channel");
}
}
if (exit_idle)
{
grpc_lb_policy_exit_idle (exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "exit_idle");
}
if (old_lb_policy != NULL)
{
grpc_lb_policy_shutdown (exec_ctx, old_lb_policy);
GRPC_LB_POLICY_UNREF (exec_ctx, old_lb_policy, "channel");
}
if (lb_policy != NULL)
{
GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "config_change");
}
GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, chand->master, "resolver");
}
static void
cc_start_transport_op (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_transport_op * op)
{
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
grpc_closure_list_add (closure_list, op->on_consumed, 1);
GPR_ASSERT (op->set_accept_stream == NULL);
GPR_ASSERT (op->bind_pollset == NULL);
gpr_mu_lock (&chand->mu_config);
if (op->on_connectivity_state_change != NULL)
{
grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
if (!is_empty (op, sizeof (*op)))
{
lb_policy = chand->lb_policy;
if (lb_policy)
{
GRPC_LB_POLICY_REF (lb_policy, "broadcast");
}
}
if (op->disconnect && chand->resolver != NULL)
{
grpc_connectivity_state_set (exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL)
{
grpc_lb_policy_shutdown (exec_ctx, chand->lb_policy);
GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
}
gpr_mu_unlock (&chand->mu_config);
if (destroy_resolver)
{
grpc_resolver_shutdown (exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF (exec_ctx, destroy_resolver, "channel");
}
if (lb_policy)
{
grpc_lb_policy_broadcast (exec_ctx, lb_policy, op);
GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "broadcast");
}
}
/* Constructor for call_data */
static void
init_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem, const void *server_transport_data, grpc_transport_stream_op * initial_op)
{
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
GPR_ASSERT (initial_op == NULL);
GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
GPR_ASSERT (server_transport_data == NULL);
gpr_mu_init (&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future (GPR_CLOCK_REALTIME);
}
/* Destructor for call_data */
static void
destroy_call_elem (grpc_exec_ctx * exec_ctx, grpc_call_element * elem)
{
call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
gpr_mu_lock (&calld->mu_state);
switch (calld->state)
{
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock (&calld->mu_state);
GRPC_SUBCHANNEL_CALL_UNREF (exec_ctx, subchannel_call, "client_channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
gpr_mu_unlock (&calld->mu_state);
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND:
gpr_log (GPR_ERROR, "should never reach here");
abort ();
break;
}
}
/* Constructor for channel_data */
static void
init_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_channel * master, const grpc_channel_args * args, grpc_mdctx * metadata_context, int is_first, int is_last)
{
channel_data *chand = elem->channel_data;
memset (chand, 0, sizeof (*chand));
GPR_ASSERT (is_last);
GPR_ASSERT (elem->filter == &grpc_client_channel_filter);
gpr_mu_init (&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
grpc_pollset_set_init (&chand->pollset_set);
grpc_closure_init (&chand->on_config_changed, cc_on_config_changed, chand);
grpc_connectivity_state_init (&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
}
/* Destructor for channel_data */
static void
destroy_channel_elem (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem)
{
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL)
{
grpc_resolver_shutdown (exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF (exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL)
{
GRPC_LB_POLICY_UNREF (exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy (exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy (&chand->pollset_set);
gpr_mu_destroy (&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op,
cc_start_transport_op,
sizeof (call_data),
init_call_elem,
destroy_call_elem,
sizeof (channel_data),
init_channel_elem,
destroy_channel_elem,
cc_get_peer,
"client-channel",
};
void
grpc_client_channel_set_resolver (grpc_exec_ctx * exec_ctx, grpc_channel_stack * channel_stack, grpc_resolver * resolver)
{
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element (channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock (&chand->mu_config);
GPR_ASSERT (!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF (resolver, "channel");
if (!grpc_closure_list_empty (chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives)
{
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
grpc_resolver_next (exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed);
}
gpr_mu_unlock (&chand->mu_config);
}
grpc_connectivity_state
grpc_client_channel_check_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, int try_to_connect)
{
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock (&chand->mu_config);
out = grpc_connectivity_state_check (&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect)
{
if (chand->lb_policy != NULL)
{
grpc_lb_policy_exit_idle (exec_ctx, chand->lb_policy);
}
else
{
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL)
{
GRPC_CHANNEL_INTERNAL_REF (chand->master, "resolver");
chand->started_resolving = 1;
grpc_resolver_next (exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
}
}
}
gpr_mu_unlock (&chand->mu_config);
return out;
}
void
grpc_client_channel_watch_connectivity_state (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_connectivity_state * state, grpc_closure * on_complete)
{
channel_data *chand = elem->channel_data;
gpr_mu_lock (&chand->mu_config);
grpc_connectivity_state_notify_on_state_change (exec_ctx, &chand->state_tracker, state, on_complete);
gpr_mu_unlock (&chand->mu_config);
}
grpc_pollset_set *
grpc_client_channel_get_connecting_pollset_set (grpc_channel_element * elem)
{
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
void
grpc_client_channel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset)
{
channel_data *chand = elem->channel_data;
grpc_pollset_set_add_pollset (exec_ctx, &chand->pollset_set, pollset);
}
void
grpc_client_channel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_channel_element * elem, grpc_pollset * pollset)
{
channel_data *chand = elem->channel_data;
grpc_pollset_set_del_pollset (exec_ctx, &chand->pollset_set, pollset);
}