Add rich closure debug mode
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index d6877f6..b0559ad 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -126,7 +126,7 @@
/* Destroy per call data.
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
- \a then_schedule_closure that should be passed to grpc_closure_sched when
+ \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
destruction is complete. \a final_info contains data about the completed
call, mainly for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index a351e98..2cb83f4 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -190,7 +190,7 @@
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
- grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, &mgr->on_handshake_done, error);
mgr->shutdown = true;
} else {
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
@@ -245,13 +245,13 @@
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
mgr->acceptor = acceptor;
- grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr,
+ GRPC_CLOSURE_INIT(&mgr->call_next_handshaker, call_next_handshaker, mgr,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
+ GRPC_CLOSURE_INIT(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
- grpc_closure_init(&mgr->on_timeout, on_timeout, mgr,
+ GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 492eb5a..1b7e2cf 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -90,7 +90,7 @@
grpc_error *error) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
- grpc_closure_sched(exec_ctx, req->on_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != NULL) {
grpc_resolved_addresses_destroy(req->addresses);
@@ -213,7 +213,7 @@
return;
}
addr = &req->addresses->addrs[req->next_address++];
- grpc_closure_init(&req->connected, on_connected, req,
+ GRPC_CLOSURE_INIT(&req->connected, on_connected, req,
grpc_schedule_on_exec_ctx);
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
@@ -256,8 +256,8 @@
req->pollent = pollent;
req->overall_error = GRPC_ERROR_NONE;
req->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_init(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&req->done_write, done_write, req,
+ GRPC_CLOSURE_INIT(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&req->done_write, done_write, req,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&req->incoming);
grpc_slice_buffer_init(&req->outgoing);
@@ -271,7 +271,7 @@
grpc_resolve_address(
exec_ctx, request->host, req->handshaker->default_port,
req->context->pollset_set,
- grpc_closure_create(on_resolved, req, grpc_schedule_on_exec_ctx),
+ GRPC_CLOSURE_CREATE(on_resolved, req, grpc_schedule_on_exec_ctx),
&req->addresses);
}
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 6500192..34a77c3 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -85,7 +85,7 @@
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
}
- grpc_closure_sched(exec_ctx, on_peer_checked, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 72a1b20..719e2e8 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -24,14 +24,26 @@
#include "src/core/lib/profiling/timers.h"
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_init(const char *file, int line,
+ grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
+#else
grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg,
grpc_closure_scheduler *scheduler) {
+#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->scheduler = scheduler;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
closure->scheduled = false;
+ closure->file_initiated = NULL;
+ closure->line_initiated = 0;
+ closure->run = false;
+ closure->file_created = file;
+ closure->line_created = line;
#endif
return closure;
}
@@ -100,19 +112,39 @@
cb(exec_ctx, cb_arg, error);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_create(const char *file, int line,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
+#else
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
grpc_closure_scheduler *scheduler) {
+#endif
wrapped_closure *wc = gpr_malloc(sizeof(*wc));
wc->cb = cb;
wc->cb_arg = cb_arg;
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+ grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler);
+#else
grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
+#endif
return &wc->wrapper;
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *c, grpc_error *error) {
+#else
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
+#endif
GPR_TIMER_BEGIN("grpc_closure_run", 0);
if (c != NULL) {
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = true;
+#endif
assert(c->cb);
c->scheduler->vtable->run(exec_ctx, c, error);
} else {
@@ -121,13 +153,21 @@
GPR_TIMER_END("grpc_closure_run", 0);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *c, grpc_error *error) {
+#else
void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
+#endif
GPR_TIMER_BEGIN("grpc_closure_sched", 0);
if (c != NULL) {
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
GPR_ASSERT(!c->scheduled);
c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
#endif
assert(c->cb);
c->scheduler->vtable->sched(exec_ctx, c, error);
@@ -137,13 +177,21 @@
GPR_TIMER_END("grpc_closure_sched", 0);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_list_sched(const char *file, int line,
+ grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+#else
void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+#endif
grpc_closure *c = list->head;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
GPR_ASSERT(!c->scheduled);
c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
#endif
assert(c->cb);
c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 25086fa..3ecf9e9 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -59,6 +59,8 @@
const grpc_closure_scheduler_vtable *vtable;
};
+// #define GRPC_CLOSURE_RICH_DEBUG
+
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@@ -85,19 +87,47 @@
uintptr_t scratch;
} error_data;
-#ifndef NDEBUG
+// extra tracing and debugging for grpc_closure. This incurs a decent amount of
+// overhead per closure, so it must be enabled at compile time.
+#ifdef GRPC_CLOSURE_RICH_DEBUG
bool scheduled;
+ bool run; // true = run, false = scheduled
+ const char *file_created;
+ int line_created;
+ const char *file_initiated;
+ int line_initiated;
#endif
};
/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_init(const char *file, int line,
+ grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
+ grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler)
+#else
grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg,
grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
+ grpc_closure_init(closure, cb, cb_arg, scheduler)
+#endif
/* Create a heap allocated closure: try to avoid except for very rare events */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_create(const char *file, int line,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
+ grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler)
+#else
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
+ grpc_closure_create(cb, cb_arg, scheduler)
+#endif
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
@@ -123,16 +153,44 @@
/** Run a closure directly. Caller ensures that no locks are being held above.
* Note that calling this at the end of a closure callback function itself is
* by definition safe. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
+ grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error)
+#else
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
+ grpc_closure_run(exec_ctx, closure, error)
+#endif
/** Schedule a closure to be run. Does not need to be run from a safe point. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
+ grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error)
+#else
void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
+ grpc_closure_sched(exec_ctx, closure, error)
+#endif
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_list_sched(const char *file, int line,
+ grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *closure_list);
+#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
+ grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list)
+#else
void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx,
grpc_closure_list *closure_list);
+#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
+ grpc_closure_list_sched(exec_ctx, closure_list)
+#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index f6976c5..750ff10 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -81,7 +81,7 @@
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler);
+ GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -196,7 +196,7 @@
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock));
- grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -247,7 +247,7 @@
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
grpc_error *cl_err = cl->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
cl->scheduled = false;
#endif
cl->cb(exec_ctx, cl->cb_arg, cl_err);
@@ -264,7 +264,7 @@
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
@@ -332,8 +332,8 @@
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(enqueue_finally, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(enqueue_finally, closure,
grpc_combiner_scheduler(lock)),
error);
GPR_TIMER_END("combiner.execute_finally", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index e626845..1ce916f 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -149,7 +149,7 @@
grpc_error_create(__FILE__, __LINE__, grpc_slice_from_copied_string(desc), \
errs, count)
-//#define GRPC_ERROR_REFCOUNT_DEBUG
+// #define GRPC_ERROR_REFCOUNT_DEBUG
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
const char *func);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 63f6962..e0c0545 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -237,7 +237,7 @@
close(fd->fd);
}
- grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
@@ -427,7 +427,7 @@
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
pollset->begin_refs == 0) {
- grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index d7ae0d3..761e2ed 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -965,7 +965,7 @@
fd->po.pi = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1250,7 +1250,7 @@
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 8692b5b..0ab3594 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -515,7 +515,7 @@
fd->eps = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->mu);
@@ -716,7 +716,7 @@
/* Release the ref and set pollset->eps to NULL */
pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index c3627dc..abcf7b4 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -269,7 +269,7 @@
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd,
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
} else {
@@ -367,7 +367,7 @@
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->orphaned_mu);
gpr_mu_unlock(&fd->pollable.po.mu);
@@ -667,7 +667,7 @@
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
- grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
@@ -966,8 +966,8 @@
pollable_add_fd(&pollset->pollable, had_fd);
pollable_add_fd(&pollset->pollable, fd);
}
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(unref_fd_no_longer_poller, had_fd,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 29a86f7..fa4b4e8 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -893,7 +893,7 @@
fd->po.pi = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1147,7 +1147,7 @@
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 6145c9e..d1a27b8 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -386,7 +386,7 @@
if (!fd->released) {
close(fd->fd);
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -445,7 +445,7 @@
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_closure_sched(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
@@ -453,7 +453,7 @@
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -476,7 +476,7 @@
return 0;
} else {
/* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
+ GRPC_CLOSURE_SCHED(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -834,7 +834,7 @@
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
@@ -883,7 +883,7 @@
if (!pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
goto done;
}
/* If we're shutting down then we don't execute any extended work */
@@ -1056,7 +1056,7 @@
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1075,7 +1075,7 @@
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
}
if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
pollset->called_shutdown = 1;
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 6699be3..51c3621 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -62,7 +62,7 @@
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
did_something = true;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
@@ -85,7 +85,7 @@
static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
closure->scheduled = false;
#endif
closure->cb(exec_ctx, closure->cb_arg, error);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 7621a7f..fda274e 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -58,7 +58,7 @@
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c
index 6fa285b5f..c2ceecb 100644
--- a/src/core/lib/iomgr/lockfree_event.c
+++ b/src/core/lib/iomgr/lockfree_event.c
@@ -112,7 +112,7 @@
closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */
}
@@ -125,7 +125,7 @@
schedule the closure with the shutdown error */
if ((curr & FD_SHUTDOWN_BIT) > 0) {
grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
- grpc_closure_sched(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return;
@@ -177,7 +177,7 @@
happens-after on that edge), and a release to pair with anything
loading the shutdown state. */
if (gpr_atm_full_cas(state, curr, new_state)) {
- grpc_closure_sched(exec_ctx, (grpc_closure *)curr,
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return true;
@@ -226,7 +226,7 @@
spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) {
- grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
return;
}
/* else the state changed again (only possible by either a racing
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 8ff647f..07c7c04 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -88,7 +88,7 @@
// kick the loop once
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
}
void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index ab1a327..d5a0373 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -95,7 +95,7 @@
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@@ -143,7 +143,7 @@
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index a78de66..35dedc2 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -154,7 +154,7 @@
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
grpc_error *error) {
request *r = rp;
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, r->on_done,
grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
@@ -175,13 +175,13 @@
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r,
+ GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
- grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index f1ea516..45de289 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -124,7 +124,7 @@
/* Either no retry was attempted, or the retry failed. Either way, the
original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
- grpc_closure_sched(&exec_ctx, r->on_done, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
gpr_free(r);
@@ -225,7 +225,7 @@
int s;
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, on_done, err);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);
return;
}
r = gpr_malloc(sizeof(request));
@@ -252,7 +252,7 @@
err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed");
err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(s)));
- grpc_closure_sched(exec_ctx, on_done, err);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);
gpr_free(r);
gpr_free(req);
gpr_free(hints);
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 83adfd3..45cfd72 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -139,7 +139,7 @@
} else {
GRPC_ERROR_REF(error);
}
- grpc_closure_sched(exec_ctx, r->on_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -158,13 +158,13 @@
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r,
+ GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
- grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 7c3fb93..f2cc1be 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -259,7 +259,7 @@
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_quota->rq_step_closure,
GRPC_ERROR_NONE);
}
@@ -305,7 +305,7 @@
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
- grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
@@ -363,7 +363,7 @@
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = NULL;
- grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_NONE);
return true;
}
@@ -444,7 +444,7 @@
resource_user->new_reclaimers[destructive] = NULL;
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@@ -485,9 +485,9 @@
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
@@ -501,9 +501,9 @@
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
@@ -525,7 +525,7 @@
slice_allocator->length));
}
}
- grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
}
/*******************************************************************************
@@ -579,9 +579,9 @@
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota,
+ GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
grpc_combiner_finally_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
+ GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota,
grpc_combiner_scheduler(resource_quota->combiner));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
@@ -633,8 +633,8 @@
a->size = (int64_t)size;
gpr_atm_no_barrier_store(&resource_quota->last_size,
(gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
- grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
- grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -686,19 +686,19 @@
grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user));
resource_user->resource_quota =
grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
+ GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->add_to_free_pool_closure,
+ GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
&ru_add_to_free_pool, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->post_reclaimer_closure[0],
+ GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
&ru_post_benign_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->post_reclaimer_closure[1],
+ GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
&ru_post_destructive_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
+ GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
@@ -739,7 +739,7 @@
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
- grpc_closure_sched(exec_ctx, &resource_user->destroy_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->destroy_closure,
GRPC_ERROR_NONE);
}
}
@@ -756,9 +756,9 @@
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_create(
+ GRPC_CLOSURE_CREATE(
ru_shutdown, resource_user,
grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
GRPC_ERROR_NONE);
@@ -781,11 +781,11 @@
GRPC_ERROR_NONE);
if (!resource_user->allocating) {
resource_user->allocating = true;
- grpc_closure_sched(exec_ctx, &resource_user->allocate_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->allocate_closure,
GRPC_ERROR_NONE);
}
} else {
- grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
}
@@ -804,7 +804,7 @@
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
- grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->add_to_free_pool_closure,
GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
@@ -817,7 +817,7 @@
grpc_closure *closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
resource_user->new_reclaimers[destructive] = closure;
- grpc_closure_sched(exec_ctx,
+ GRPC_CLOSURE_SCHED(exec_ctx,
&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE);
}
@@ -828,7 +828,7 @@
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure,
GRPC_ERROR_NONE);
}
@@ -836,9 +836,9 @@
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
+ GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
slice_allocator, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&slice_allocator->on_done, cb, p,
+ GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
grpc_schedule_on_exec_ctx);
slice_allocator->resource_user = resource_user;
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index f73e33d..a0d731b 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -116,7 +116,7 @@
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@@ -139,7 +139,7 @@
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
- grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index abad3c2..21e320a 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -234,7 +234,7 @@
grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -263,7 +263,7 @@
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
@@ -272,7 +272,7 @@
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
return;
}
@@ -290,13 +290,13 @@
if (err >= 0) {
*ep =
grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
@@ -311,7 +311,7 @@
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
- grpc_closure_init(&ac->write_closure, on_writable, ac,
+ GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
@@ -321,7 +321,7 @@
}
gpr_mu_lock(&ac->mu);
- grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index f408433..ab68329 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -107,7 +107,7 @@
if (done) {
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
- grpc_closure_sched(&exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, closure, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -150,7 +150,7 @@
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,
uv_tc_on_connect);
- grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect,
+ GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &connect->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index e0913cf..fc62105 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -116,7 +116,7 @@
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- grpc_closure_sched(exec_ctx, on_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -201,9 +201,9 @@
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->channel_args = grpc_channel_args_copy(channel_args);
- grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect);
@@ -222,7 +222,7 @@
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_closure_sched(exec_ctx, on_done, final_error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, final_error);
}
// overridden by api_fuzzer.c
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index a2404f9..66e81bf 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -221,7 +221,7 @@
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- grpc_closure_run(exec_ctx, cb, error);
+ GRPC_CLOSURE_RUN(exec_ctx, cb, error);
}
#define MAX_READ_IOVEC 4
@@ -348,7 +348,7 @@
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
}
}
@@ -465,7 +465,7 @@
gpr_log(GPR_DEBUG, "write: %s", str);
}
- grpc_closure_run(exec_ctx, cb, error);
+ GRPC_CLOSURE_RUN(exec_ctx, cb, error);
TCP_UNREF(exec_ctx, tcp, "write");
}
}
@@ -491,7 +491,7 @@
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
@@ -515,7 +515,7 @@
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
}
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
GPR_TIMER_END("tcp_write", 0);
@@ -616,9 +616,9 @@
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
+ GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
+ GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 7bb8392..f304642 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -121,7 +121,7 @@
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -163,7 +163,7 @@
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
@@ -503,7 +503,7 @@
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
@@ -513,7 +513,7 @@
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
@@ -540,7 +540,7 @@
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 632a232..2de0ea9 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -117,7 +117,7 @@
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
while (s->head) {
@@ -171,7 +171,7 @@
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(&local_exec_ctx, &s->shutdown_starting);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index e8343a9..0162afc 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -134,10 +134,10 @@
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
- grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s,
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(destroy_server, s,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
@@ -176,7 +176,7 @@
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
@@ -437,7 +437,7 @@
sp->new_socket = INVALID_SOCKET;
sp->port = port;
sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
*listener = sp;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index a37a75c..ab5bb9f 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -161,7 +161,7 @@
// nread < 0: Error
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed");
}
- grpc_closure_sched(&exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -183,7 +183,7 @@
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(status)));
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
const char *str = grpc_error_string(error);
@@ -210,7 +210,7 @@
gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
- grpc_closure_sched(&exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -236,7 +236,7 @@
}
if (tcp->shutting_down) {
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TCP socket is shutting down"));
return;
}
@@ -247,7 +247,7 @@
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index dbae42e..161b397 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -179,7 +179,7 @@
tcp->read_cb = NULL;
TCP_UNREF(exec_ctx, tcp, "read");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -193,7 +193,7 @@
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
@@ -220,7 +220,7 @@
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -233,7 +233,7 @@
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_closure_sched(exec_ctx, &tcp->on_read,
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
@@ -265,7 +265,7 @@
}
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
/* Initiates a write. */
@@ -283,7 +283,7 @@
size_t len;
if (tcp->shutting_down) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
@@ -317,7 +317,7 @@
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -335,7 +335,7 @@
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
@@ -426,8 +426,8 @@
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
- grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 69b3cfd..bf73d2c 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -230,7 +230,7 @@
if (!g_shared_mutables.initialized) {
timer->pending = false;
- grpc_closure_sched(exec_ctx, timer->closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
return;
@@ -240,7 +240,7 @@
timer->pending = true;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = false;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
/* early out */
return;
@@ -310,7 +310,7 @@
timer->pending ? "true" : "false");
}
if (timer->pending) {
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -400,7 +400,7 @@
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index f814f0e..4f204cf 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -44,7 +44,7 @@
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(timer->pending);
timer->pending = 0;
- grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -57,7 +57,7 @@
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = 0;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
timer->pending = 1;
@@ -76,7 +76,7 @@
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (timer->pending) {
timer->pending = 0;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 200a722..54e7f41 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -156,7 +156,7 @@
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -201,13 +201,13 @@
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
if (!sp->orphan_notified) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
- grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
@@ -240,7 +240,7 @@
struct shutdown_fd_args *args = gpr_malloc(sizeof(*args));
args->fd = sp->emfd;
args->server_mu = &s->mu;
- grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, args,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
@@ -525,11 +525,11 @@
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
- grpc_closure_init(&sp->write_closure, on_write, sp,
+ GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 15288e1..3cbb399 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -123,8 +123,8 @@
if (c->is_async) {
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(on_simulated_token_fetch_done,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done,
cb_arg, grpc_executor_scheduler),
GRPC_ERROR_NONE);
} else {
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index aaa7d97..a2a8e28 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -115,7 +115,7 @@
grpc_httpcli_get(
exec_ctx, &context, &detector.pollent, resource_quota, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
- grpc_closure_create(on_compute_engine_detection_http_response, &detector,
+ GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector,
grpc_schedule_on_exec_ctx),
&detector.response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@@ -140,7 +140,7 @@
gpr_mu_unlock(g_polling_mu);
grpc_httpcli_context_destroy(exec_ctx, &context);
- grpc_closure_init(&destroy_closure, destroy_pollset,
+ GRPC_CLOSURE_INIT(&destroy_closure, destroy_pollset,
grpc_polling_entity_pollset(&detector.pollent),
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(exec_ctx,
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index f6db670..8c74708 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -668,7 +668,7 @@
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
+ GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS]);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_json_destroy(json);
@@ -771,7 +771,7 @@
gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss);
}
http_cb =
- grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_KEYS;
} else {
req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss);
@@ -783,7 +783,7 @@
gpr_asprintf(&req.http.path, "/%s%s", path_prefix,
GRPC_OPENID_CONFIG_URL_SUFFIX);
}
- http_cb = grpc_closure_create(on_openid_config_retrieved, ctx,
+ http_cb = GRPC_CLOSURE_CREATE(on_openid_config_retrieved, ctx,
grpc_schedule_on_exec_ctx);
rsp_idx = HTTP_RESPONSE_OPENID;
}
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index acf4c37..9de561b 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -300,7 +300,7 @@
grpc_resource_quota_create("oauth2_credentials");
grpc_httpcli_get(
exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline,
- grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+ GRPC_CLOSURE_CREATE(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
&metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
@@ -360,7 +360,7 @@
grpc_httpcli_post(
exec_ctx, httpcli_context, pollent, resource_quota, &request, body,
strlen(body), deadline,
- grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
+ GRPC_CLOSURE_CREATE(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
&metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
gpr_free(body);
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 643e057..140cf29 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -132,7 +132,7 @@
}
}
ep->read_buffer = NULL;
- grpc_closure_sched(exec_ctx, ep->read_cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, ep->read_cb, error);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
@@ -317,7 +317,7 @@
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer);
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
@@ -399,7 +399,7 @@
grpc_slice_buffer_init(&ep->output_buffer);
grpc_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_closure_init(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 519e253..30a7430 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -122,7 +122,7 @@
grpc_auth_context **auth_context,
grpc_closure *on_peer_checked) {
if (sc == NULL) {
- grpc_closure_sched(exec_ctx, on_peer_checked,
+ GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"cannot check peer -- no security connector"));
tsi_peer_destruct(&peer);
@@ -340,7 +340,7 @@
*auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
- grpc_closure_sched(exec_ctx, on_peer_checked, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -602,7 +602,7 @@
? c->overridden_target_name
: c->target_name,
&peer, auth_context);
- grpc_closure_sched(exec_ctx, on_peer_checked, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@@ -612,7 +612,7 @@
grpc_closure *on_peer_checked) {
grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context);
tsi_peer_destruct(&peer);
- grpc_closure_sched(exec_ctx, on_peer_checked, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
}
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index f39d10c..239a211 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -124,7 +124,7 @@
h->shutdown = true;
}
// Invoke callback.
- grpc_closure_sched(exec_ctx, h->on_handshake_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error);
}
static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -173,7 +173,7 @@
grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
grpc_channel_args_destroy(exec_ctx, tmp_args);
// Invoke callback.
- grpc_closure_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
@@ -408,13 +408,13 @@
gpr_ref_init(&h->refs, 1);
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
- grpc_closure_init(&h->on_handshake_data_sent_to_peer,
+ GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, h,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&h->on_handshake_data_received_from_peer,
+ GRPC_CLOSURE_INIT(&h->on_handshake_data_received_from_peer,
on_handshake_data_received_from_peer, h,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&h->on_peer_checked, on_peer_checked, h,
+ GRPC_CLOSURE_INIT(&h->on_peer_checked, on_peer_checked, h,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&h->outgoing);
return &h->base;
@@ -440,7 +440,7 @@
grpc_tcp_server_acceptor *acceptor,
grpc_closure *on_handshake_done,
grpc_handshaker_args *args) {
- grpc_closure_sched(exec_ctx, on_handshake_done,
+ GRPC_CLOSURE_SCHED(exec_ctx, on_handshake_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to create security handshaker"));
}
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index eb7b635..4e6914b 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -113,7 +113,7 @@
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- grpc_closure_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
} else {
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
@@ -128,7 +128,7 @@
&exec_ctx, calld->transport_op->payload->send_message.send_message);
calld->transport_op->payload->send_message.send_message = NULL;
}
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
&exec_ctx, calld->on_done_recv,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
GRPC_ERROR_INT_GRPC_STATUS, status));
@@ -151,7 +151,7 @@
return;
}
}
- grpc_closure_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
@@ -193,7 +193,7 @@
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem,
+ GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem,
grpc_schedule_on_exec_ctx);
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 4cd73a3..ef8405c 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -50,7 +50,7 @@
alarm->tag = tag;
grpc_cq_begin_op(cq, tag);
- grpc_closure_init(&alarm->on_alarm, alarm_cb, alarm,
+ GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index fd4ed72..b499219 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -520,7 +520,7 @@
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
- grpc_closure_init(&c->release_call, release_call, c,
+ GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
}
@@ -634,7 +634,7 @@
GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
execute_op(exec_ctx, c, op);
@@ -1170,7 +1170,7 @@
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
- grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error);
+ GRPC_CLOSURE_RUN(exec_ctx, bctl->completion_data.notify_tag.tag, error);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
@@ -1275,7 +1275,7 @@
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl,
+ GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
grpc_schedule_on_exec_ctx);
continue_receiving_slices(exec_ctx, bctl);
}
@@ -1390,11 +1390,11 @@
call->has_initial_md_been_received = true;
if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = grpc_closure_create(
+ grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
grpc_schedule_on_exec_ctx);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
finish_batch_step(exec_ctx, bctl);
@@ -1436,7 +1436,7 @@
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
} else {
- grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
@@ -1644,7 +1644,7 @@
call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
- grpc_closure_init(&call->receiving_initial_metadata_ready,
+ GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op->recv_initial_metadata = true;
@@ -1668,7 +1668,7 @@
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
bctl, grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
@@ -1734,7 +1734,7 @@
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 4de3a8a..80eb80a 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -56,7 +56,7 @@
GPR_ASSERT(reserved == NULL);
pr->tag = tag;
pr->cq = cq;
- grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 0728805..1a5c721 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -113,7 +113,7 @@
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
- grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = NULL;
}
@@ -146,7 +146,7 @@
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker *w = p->root;
do {
@@ -417,7 +417,7 @@
cqd->outstanding_tag_count = 0;
#endif
cq_event_queue_init(&cqd->queue);
- grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
+ GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index c9f498c..a079108 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -105,17 +105,17 @@
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
*op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- grpc_closure_sched(exec_ctx, op->on_connectivity_state_change,
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change,
GRPC_ERROR_NONE);
}
if (op->send_ping != NULL) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != NULL) {
- grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
}
@@ -128,7 +128,7 @@
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *then_schedule_closure) {
- grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 6cccd0d..8a2616b 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -269,7 +269,7 @@
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
bool send_goaway, grpc_error *send_disconnect) {
struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc,
+ GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
grpc_schedule_on_exec_ctx);
grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem;
@@ -337,11 +337,11 @@
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(
+ GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
}
}
@@ -432,7 +432,7 @@
orphan_channel(chand);
server_ref(chand->server);
maybe_finish_shutdown(exec_ctx, chand->server);
- grpc_closure_init(&chand->finish_destroy_channel_closure,
+ GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
if (GRPC_TRACER_ON(grpc_server_channel_trace) && error != GRPC_ERROR_NONE) {
@@ -497,11 +497,11 @@
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(
+ GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_REF(error));
return;
}
@@ -546,9 +546,9 @@
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
return;
}
@@ -563,7 +563,7 @@
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &calld->payload;
- grpc_closure_init(&calld->publish, publish_new_rpc, elem,
+ GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
&calld->publish);
@@ -740,7 +740,7 @@
GRPC_ERROR_UNREF(src_error);
}
- grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error);
}
static void server_mutate_op(grpc_call_element *elem,
@@ -779,9 +779,9 @@
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem,
+ GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
@@ -819,7 +819,7 @@
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata.recv_initial_metadata =
&calld->initial_metadata;
- grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem,
+ GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
&calld->got_initial_metadata);
@@ -855,7 +855,7 @@
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
- grpc_closure_init(&calld->server_on_recv_initial_metadata,
+ GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
@@ -895,7 +895,7 @@
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
- grpc_closure_init(&chand->channel_connectivity_changed,
+ GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
@@ -1075,7 +1075,7 @@
server_ref(server);
server->starting = true;
- grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
+ GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server,
grpc_executor_scheduler),
GRPC_ERROR_NONE);
@@ -1255,7 +1255,7 @@
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
- grpc_closure_init(&l->destroy_done, listener_destroy_done, server,
+ GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
grpc_schedule_on_exec_ctx);
l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
}
@@ -1349,11 +1349,11 @@
gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) {
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(
+ GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
GPR_ASSERT(calld->state == PENDING);
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index f1bbfc0..6fe40af 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -67,7 +67,7 @@
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
}
- grpc_closure_sched(exec_ctx, w->notify, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, w->notify, error);
gpr_free(w);
}
GRPC_ERROR_UNREF(tracker->current_error);
@@ -125,7 +125,7 @@
if (current == NULL) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == notify) {
- grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED);
tracker->watchers = w->next;
gpr_free(w);
return false;
@@ -133,7 +133,7 @@
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
- grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED);
w->next = w->next->next;
gpr_free(rm_candidate);
return false;
@@ -144,7 +144,7 @@
} else {
if (cur != *current) {
*current = cur;
- grpc_closure_sched(exec_ctx, notify,
+ GRPC_CLOSURE_SCHED(exec_ctx, notify,
GRPC_ERROR_REF(tracker->current_error));
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
@@ -197,7 +197,7 @@
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
w->notify);
}
- grpc_closure_sched(exec_ctx, w->notify,
+ GRPC_CLOSURE_SCHED(exec_ctx, w->notify,
GRPC_ERROR_REF(tracker->current_error));
gpr_free(w);
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index abc289c..c2afede 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -65,7 +65,7 @@
there. */
refcount->destroy.scheduler = grpc_executor_scheduler;
}
- grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -112,7 +112,7 @@
grpc_iomgr_cb_func cb, void *cb_arg) {
#endif
gpr_ref_init(&refcount->refs, initial_refs);
- grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
}
@@ -202,16 +202,16 @@
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
grpc_error *error) {
if (op->recv_message) {
- grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_initial_metadata) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
- grpc_closure_sched(exec_ctx, op->on_complete, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error);
if (op->cancel_stream) {
GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
}
@@ -226,13 +226,13 @@
static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_op *op = arg;
- grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
made_transport_op *op = gpr_malloc(sizeof(*op));
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op,
+ GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
@@ -252,14 +252,14 @@
made_transport_stream_op *op = arg;
grpc_closure *c = op->inner_on_complete;
gpr_free(op);
- grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op_batch *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_zalloc(sizeof(*op));
op->op.payload = &op->payload;
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
+ GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
op, grpc_schedule_on_exec_ctx);
op->inner_on_complete = on_complete;
op->op.on_complete = &op->outer_on_complete;