| /* |
| * |
| * Copyright 2015, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| #include "src/core/client_config/subchannel.h" |
| |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| |
| #include "src/core/channel/channel_args.h" |
| #include "src/core/channel/connected_channel.h" |
| |
| typedef struct { |
| gpr_refcount refs; |
| grpc_subchannel *subchannel; |
| } connection; |
| |
| typedef struct waiting_for_connect { |
| struct waiting_for_connect *next; |
| grpc_iomgr_closure *notify; |
| grpc_transport_stream_op *initial_op; |
| grpc_subchannel_call **target; |
| } waiting_for_connect; |
| |
| typedef struct connectivity_state_watcher { |
| struct connectivity_state_watcher *next; |
| grpc_iomgr_closure *notify; |
| grpc_connectivity_state *current; |
| } connectivity_state_watcher; |
| |
| struct grpc_subchannel { |
| gpr_refcount refs; |
| grpc_connector *connector; |
| |
| /** non-transport related channel filters */ |
| const grpc_channel_filter **filters; |
| size_t filter_count; |
| /** channel arguments */ |
| grpc_channel_args *args; |
| /** address to connect to */ |
| struct sockaddr *addr; |
| size_t addr_len; |
| /** metadata context */ |
| grpc_mdctx *mdctx; |
| |
| /** set during connection */ |
| grpc_connect_out_args connecting_result; |
| |
| /** callback for connection finishing */ |
| grpc_iomgr_closure connected; |
| |
| /** pollset_set tracking who's interested in a connection |
| being setup */ |
| grpc_pollset_set pollset_set; |
| |
| /** mutex protecting remaining elements */ |
| gpr_mu mu; |
| |
| /** active connection */ |
| connection *active; |
| /** are we connecting */ |
| int connecting; |
| /** things waiting for a connection */ |
| waiting_for_connect *waiting; |
| /** things watching the connectivity state */ |
| connectivity_state_watcher *watchers; |
| }; |
| |
| struct grpc_subchannel_call { |
| connection *connection; |
| gpr_refcount refs; |
| }; |
| |
| #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) |
| #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) |
| |
| static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); |
| static void connectivity_state_changed_locked(grpc_subchannel *c); |
| static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); |
| static gpr_timespec compute_connect_deadline(grpc_subchannel *c); |
| static void subchannel_connected(void *subchannel, int iomgr_success); |
| |
| /* |
| * grpc_subchannel implementation |
| */ |
| |
| void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); } |
| |
| void grpc_subchannel_unref(grpc_subchannel *c) { |
| if (gpr_unref(&c->refs)) { |
| gpr_free(c->filters); |
| grpc_channel_args_destroy(c->args); |
| gpr_free(c->addr); |
| grpc_mdctx_unref(c->mdctx); |
| grpc_pollset_set_destroy(&c->pollset_set); |
| gpr_free(c); |
| } |
| } |
| |
| void grpc_subchannel_add_interested_party(grpc_subchannel *c, |
| grpc_pollset *pollset) { |
| grpc_pollset_set_add_pollset(&c->pollset_set, pollset); |
| } |
| |
| void grpc_subchannel_del_interested_party(grpc_subchannel *c, |
| grpc_pollset *pollset) { |
| grpc_pollset_set_del_pollset(&c->pollset_set, pollset); |
| } |
| |
| grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, |
| grpc_subchannel_args *args) { |
| grpc_subchannel *c = gpr_malloc(sizeof(*c)); |
| memset(c, 0, sizeof(*c)); |
| gpr_ref_init(&c->refs, 1); |
| c->connector = connector; |
| grpc_connector_ref(c->connector); |
| c->filter_count = args->filter_count + 1; |
| c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count); |
| memcpy(c->filters, args->filters, |
| sizeof(grpc_channel_filter *) * args->filter_count); |
| c->filters[c->filter_count - 1] = &grpc_connected_channel_filter; |
| c->addr = gpr_malloc(args->addr_len); |
| memcpy(c->addr, args->addr, args->addr_len); |
| c->addr_len = args->addr_len; |
| c->args = grpc_channel_args_copy(args->args); |
| c->mdctx = args->mdctx; |
| grpc_mdctx_ref(c->mdctx); |
| grpc_pollset_set_init(&c->pollset_set); |
| grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); |
| gpr_mu_init(&c->mu); |
| return c; |
| } |
| |
| static void start_connect(grpc_subchannel *c) { |
| grpc_connect_in_args args; |
| |
| args.interested_parties = &c->pollset_set; |
| args.addr = c->addr; |
| args.addr_len = c->addr_len; |
| args.deadline = compute_connect_deadline(c); |
| args.channel_args = c->args; |
| args.metadata_context = c->mdctx; |
| |
| grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected); |
| } |
| |
| void grpc_subchannel_create_call(grpc_subchannel *c, |
| grpc_transport_stream_op *initial_op, |
| grpc_subchannel_call **target, |
| grpc_iomgr_closure *notify) { |
| connection *con; |
| gpr_mu_lock(&c->mu); |
| if (c->active != NULL) { |
| con = c->active; |
| gpr_ref(&con->refs); |
| gpr_mu_unlock(&c->mu); |
| |
| *target = create_call(con, initial_op); |
| notify->cb(notify->cb_arg, 1); |
| } else { |
| waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); |
| w4c->next = c->waiting; |
| w4c->notify = notify; |
| w4c->initial_op = initial_op; |
| w4c->target = target; |
| c->waiting = w4c; |
| grpc_subchannel_add_interested_party(c, initial_op->bind_pollset); |
| if (!c->connecting) { |
| c->connecting = 1; |
| connectivity_state_changed_locked(c); |
| grpc_subchannel_ref(c); |
| gpr_mu_unlock(&c->mu); |
| |
| start_connect(c); |
| } else { |
| gpr_mu_unlock(&c->mu); |
| } |
| } |
| } |
| |
| grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { |
| grpc_connectivity_state state; |
| gpr_mu_lock(&c->mu); |
| state = compute_connectivity_locked(c); |
| gpr_mu_unlock(&c->mu); |
| return state; |
| } |
| |
| void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, |
| grpc_connectivity_state *state, |
| grpc_iomgr_closure *notify) { |
| grpc_connectivity_state current; |
| int do_connect = 0; |
| connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); |
| w->current = state; |
| w->notify = notify; |
| gpr_mu_lock(&c->mu); |
| current = compute_connectivity_locked(c); |
| if (current == GRPC_CHANNEL_IDLE) { |
| current = GRPC_CHANNEL_CONNECTING; |
| c->connecting = 1; |
| do_connect = 1; |
| grpc_subchannel_ref(c); |
| connectivity_state_changed_locked(c); |
| } |
| if (current != *state) { |
| gpr_mu_unlock(&c->mu); |
| *state = current; |
| grpc_iomgr_add_callback(notify); |
| gpr_free(w); |
| } else { |
| w->next = c->watchers; |
| c->watchers = w; |
| gpr_mu_unlock(&c->mu); |
| } |
| if (do_connect) { |
| start_connect(c); |
| } |
| } |
| |
| static void publish_transport(grpc_subchannel *c) { |
| size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count); |
| connection *con = gpr_malloc(sizeof(connection) + channel_stack_size); |
| grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1); |
| waiting_for_connect *w4c; |
| gpr_ref_init(&con->refs, 1); |
| con->subchannel = c; |
| grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk); |
| grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); |
| memset(&c->connecting_result, 0, sizeof(c->connecting_result)); |
| |
| gpr_mu_lock(&c->mu); |
| GPR_ASSERT(c->active == NULL); |
| c->active = con; |
| c->connecting = 0; |
| connectivity_state_changed_locked(c); |
| while ((w4c = c->waiting)) { |
| abort(); /* not implemented */ |
| } |
| gpr_mu_unlock(&c->mu); |
| } |
| |
| static void subchannel_connected(void *arg, int iomgr_success) { |
| grpc_subchannel *c = arg; |
| if (c->connecting_result.transport) { |
| publish_transport(c); |
| } else { |
| grpc_subchannel_unref(c); |
| /* TODO(ctiller): retry after sleeping */ |
| abort(); |
| } |
| } |
| |
| static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { |
| return gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); |
| } |
| |
| static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { |
| if (c->connecting) { |
| return GRPC_CHANNEL_CONNECTING; |
| } |
| if (c->active) { |
| return GRPC_CHANNEL_READY; |
| } |
| return GRPC_CHANNEL_IDLE; |
| } |
| |
| static void connectivity_state_changed_locked(grpc_subchannel *c) { |
| grpc_connectivity_state current = compute_connectivity_locked(c); |
| connectivity_state_watcher *new = NULL; |
| connectivity_state_watcher *w; |
| while ((w = c->watchers)) { |
| c->watchers = w->next; |
| |
| if (current != *w->current) { |
| *w->current = current; |
| grpc_iomgr_add_callback(w->notify); |
| gpr_free(w); |
| } else { |
| w->next = new; |
| new = w; |
| } |
| } |
| c->watchers = new; |
| } |
| |
| /* |
| * grpc_subchannel_call implementation |
| */ |
| |
| void grpc_subchannel_call_ref(grpc_subchannel_call *call) { |
| gpr_ref(&call->refs); |
| } |
| |
| void grpc_subchannel_call_unref(grpc_subchannel_call *call) { |
| if (gpr_unref(&call->refs)) { |
| grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call)); |
| if (gpr_unref(&call->connection->refs)) { |
| gpr_free(call->connection); |
| } |
| gpr_free(call); |
| } |
| } |
| |
| void grpc_subchannel_call_process_op(grpc_subchannel_call *call, |
| grpc_transport_stream_op *op) { |
| grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); |
| top_elem->filter->start_transport_stream_op(top_elem, op); |
| } |
| |
| grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { |
| grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); |
| grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); |
| grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); |
| call->connection = con; |
| gpr_ref_init(&call->refs, 1); |
| grpc_call_stack_init(chanstk, NULL, initial_op, callstk); |
| return call; |
| } |