Most of the way to auto-cleanup subchannels
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 559ad0a..d2f6a90 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -106,6 +106,7 @@
const grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *channel_args,
+ const char *name,
grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
@@ -117,7 +118,7 @@
stack->count = filter_count;
GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
- "CHANNEL_STACK");
+ name);
elems = CHANNEL_ELEMS_FROM_STACK(stack);
user_data =
((char *)elems) +
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 766f543..bb7081b 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -183,6 +183,7 @@
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
+ const char *name,
grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5fec87c..5ad2e07 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -260,10 +260,6 @@
}
lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
- }
-
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
@@ -282,11 +278,6 @@
grpc_resolver_shutdown(exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
-
- if (lb_policy) {
- grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
- }
}
typedef struct {
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index c0f1d3f..d83f371 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -185,7 +185,6 @@
int iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
- grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
grpc_connected_subchannel *exclude_subchannel;
@@ -199,12 +198,6 @@
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (grpc_subchannel_get_connected_subchannel(subchannels[i]) !=
- exclude_subchannel) {
- memset(&op, 0, sizeof(op));
- op.disconnect = 1;
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
- }
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
@@ -323,41 +316,6 @@
gpr_mu_unlock(&p->mu);
}
-static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
- grpc_connected_subchannel *selected;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- selected = p->selected;
- if (selected) {
- GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
- }
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
- }
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) {
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- }
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
- }
- if (p->selected) {
- grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected,
- "pf_broadcast_to_selected");
- }
- gpr_free(subchannels);
-}
-
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -380,7 +338,7 @@
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
- pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 10688b3..16afd8c 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -451,29 +451,6 @@
}
}
-static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast");
- }
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast");
- }
- gpr_free(subchannels);
-}
-
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -497,7 +474,7 @@
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
- rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
+ rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 8d9287c..2b874da 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -61,6 +61,7 @@
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
#endif
if (gpr_unref(&policy->refs)) {
+ grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}
@@ -83,11 +84,6 @@
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op) {
- policy->vtable->broadcast(exec_ctx, policy, op);
-}
-
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 985c966..96b2bdf 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -66,10 +66,6 @@
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** broadcast a transport op to all subchannels */
- void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
-
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy);
@@ -118,9 +114,6 @@
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
-
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 1c66a73..434a37c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -238,7 +238,7 @@
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
- if ((old_refs & STRONG_REF_MASK) == 0) {
+ if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
disconnect(exec_ctx, c);
}
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
@@ -351,7 +351,7 @@
do_connect = 1;
c->connecting = 1;
/* released by connection */
- GRPC_SUBCHANNEL_REF(c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
}
gpr_mu_unlock(&c->mu);
@@ -369,40 +369,6 @@
gpr_mu_unlock(&c->mu);
}
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_transport_op *op) {
- grpc_connected_subchannel *con;
- int cancel_alarm = 0;
- gpr_mu_lock(&c->mu);
- con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
- if (con != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op");
- }
- if (op->disconnect) {
- c->disconnected = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
- if (c->have_alarm) {
- cancel_alarm = 1;
- }
- }
- gpr_mu_unlock(&c->mu);
-
- if (con != NULL) {
- grpc_connected_subchannel_process_transport_op(exec_ctx, con, op);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op");
- }
-
- if (cancel_alarm) {
- grpc_timer_cancel(exec_ctx, &c->alarm);
- }
-
- if (op->disconnect) {
- grpc_connector_shutdown(exec_ctx, c->connector);
- }
-}
-
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_transport_op *op) {
@@ -488,7 +454,7 @@
con = gpr_malloc(channel_stack_size);
stk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
- num_filters, c->args, stk);
+ num_filters, c->args, "CONNECTED_SUBCHANNEL", stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -507,7 +473,8 @@
gpr_free(sw_subchannel);
gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ gpr_free(con);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
return;
}
@@ -519,7 +486,7 @@
for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
@@ -588,17 +555,18 @@
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
grpc_subchannel *c = arg;
+
if (c->connecting_result.transport != NULL) {
publish_transport(exec_ctx, c);
} else if (c->disconnected) {
- /* do nothing */
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b64a265..66c1399 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -105,9 +105,6 @@
grpc_pollset *pollset);
/** process a transport level op */
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_transport_op *op);
void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
grpc_transport_op *op);
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 7ff80e6..81c19ca 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -43,6 +43,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#define CLOSURE_NOT_READY ((grpc_closure *)0)
@@ -158,7 +159,10 @@
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- grpc_iomgr_register_object(&r->iomgr_object, name);
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
#ifdef GRPC_FD_REF_COUNT_DEBUG
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
#endif
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index f29ef7c..7f0b34c 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -52,7 +52,7 @@
size_t i;
gpr_mu_destroy(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
}
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
@@ -74,7 +74,7 @@
pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} else {
grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
pollset_set->fds[j++] = pollset_set->fds[i];
@@ -107,12 +107,13 @@
gpr_mu_lock(&bag->mu);
if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
- bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ bag->pollset_sets = gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
}
bag->pollset_sets[bag->pollset_set_count++] = item;
for (i = 0, j = 0; i < bag->fd_count; i++) {
if (grpc_fd_is_orphaned(bag->fds[i])) {
- GRPC_FD_UNREF(bag->fds[i], "pollset");
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
} else {
grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
bag->fds[j++] = bag->fds[i];
@@ -130,7 +131,9 @@
for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--;
- GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]);
+ GPR_SWAP(grpc_pollset_set *,
+ bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
break;
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a78fd0a..92fd3da 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -152,7 +152,7 @@
}
grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
- num_filters, args,
+ num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 08f34ff..4de72d7 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -50,8 +50,6 @@
for a stream. */
typedef struct grpc_stream grpc_stream;
-#define GRPC_STREAM_REFCOUNT_DEBUG
-
typedef struct grpc_stream_refcount {
gpr_refcount refs;
grpc_closure destroy;
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index cab31bc..f1bb37c 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -116,7 +116,7 @@
channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1));
grpc_channel_stack_init(&exec_ctx, 1, free_channel, channel_stack, &filters,
- 1, &chan_args, channel_stack);
+ 1, &chan_args, "test", channel_stack);
GPR_ASSERT(channel_stack->count == 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = (int *)channel_elem->channel_data;
diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c
index 2554025..0c062d7 100644
--- a/test/core/end2end/fixtures/h2_uchannel.c
+++ b/test/core/end2end/fixtures/h2_uchannel.c
@@ -233,11 +233,12 @@
}
grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE;
+grpc_pollset_set g_interested_parties;
static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) {
if (g_state != GRPC_CHANNEL_READY) {
grpc_subchannel_notify_on_state_change(
- exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg));
+ exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg));
}
}
@@ -247,12 +248,11 @@
static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) {
grpc_pollset pollset;
- grpc_pollset_set interested_parties;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_init(&pollset);
- grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset);
- grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties);
- grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state,
+ grpc_pollset_set_init(&g_interested_parties);
+ grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset);
+ grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state,
grpc_closure_create(state_changed, c));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pollset));
@@ -267,8 +267,8 @@
}
grpc_pollset_shutdown(&exec_ctx, &pollset,
grpc_closure_create(destroy_pollset, &pollset));
+ grpc_pollset_set_destroy(&g_interested_parties);
gpr_mu_unlock(GRPC_POLLSET_MU(&pollset));
- grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties);
grpc_exec_ctx_finish(&exec_ctx);
return grpc_subchannel_get_connected_subchannel(c);
}