Start fixing refcounting
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index a7dd967..2829134 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -469,6 +469,9 @@
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
+
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -476,8 +479,6 @@
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
-
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
@@ -485,6 +486,7 @@
}
op->transport_private.args[0] = elem;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->transport_private.closure,
@@ -670,44 +672,24 @@
GRPC_ERROR_UNREF(error);
}
-typedef struct {
- grpc_transport_stream_op **ops;
- size_t nops;
- grpc_subchannel_call *call;
-} retry_ops_args;
-
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
- retry_ops_args *a = args;
- size_t i;
- for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
- }
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
- gpr_free(a->ops);
- gpr_free(a);
-}
-
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
if (calld->waiting_ops_count == 0) {
return;
}
- retry_ops_args *a = gpr_malloc(sizeof(*a));
- a->ops = calld->waiting_ops;
- a->nops = calld->waiting_ops_count;
- a->call = GET_CALL(calld);
- if (a->call == CANCELLED_CALL) {
- gpr_free(a);
+ grpc_subchannel_call *call = GET_CALL(calld);
+ grpc_transport_stream_op **ops = calld->waiting_ops;
+ size_t nops = calld->waiting_ops_count;
+ if (call == CANCELLED_CALL) {
fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
- GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_closure_sched(
- exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ for (size_t i = 0; i < nops; i++) {
+ grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
+ }
}
static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -978,6 +960,8 @@
}
/* nothing to be done but wait */
add_waiting_locked(calld, op);
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "start_transport_stream_op");
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
@@ -1008,6 +992,7 @@
return;
}
/* we failed; lock and figure out what to do */
+ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
op->transport_private.args[0] = elem;
grpc_closure_sched(
exec_ctx,
@@ -1109,6 +1094,8 @@
// do not yet have service config data, then the timer may be reset
// later.
grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "initial_read_service_config");
}
/* Constructor for call_data */
@@ -1132,6 +1119,7 @@
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
calld->owning_call = args->call_stack;
calld->pollent = NULL;
+ GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&calld->read_service_config,
@@ -1204,6 +1192,7 @@
&chand->on_resolver_result_changed);
}
}
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
@@ -1212,6 +1201,7 @@
grpc_connectivity_state out;
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
grpc_closure_sched(
exec_ctx,
grpc_closure_create(try_to_connect_locked, chand,