Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 12c8483..7a8962f 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -165,13 +165,12 @@
#ifndef NDEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define PI_UNREF(exec_ctx, p, r) \
- pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
+#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
#else
#define PI_ADD_REF(p, r) pi_add_ref((p))
-#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
+#define PI_UNREF(p, r) pi_unref((p))
#endif
@@ -270,7 +269,7 @@
static __thread polling_island* g_current_thread_polling_island;
/* Forward declaration */
-static void polling_island_delete(grpc_exec_ctx* exec_ctx, polling_island* pi);
+static void polling_island_delete(polling_island* pi);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -284,7 +283,7 @@
#endif /* defined(GRPC_TSAN) */
static void pi_add_ref(polling_island* pi);
-static void pi_unref(grpc_exec_ctx* exec_ctx, polling_island* pi);
+static void pi_unref(polling_island* pi);
#ifndef NDEBUG
static void pi_add_ref_dbg(polling_island* pi, const char* reason,
@@ -299,8 +298,8 @@
pi_add_ref(pi);
}
-static void pi_unref_dbg(grpc_exec_ctx* exec_ctx, polling_island* pi,
- const char* reason, const char* file, int line) {
+static void pi_unref_dbg(polling_island* pi, const char* reason,
+ const char* file, int line) {
if (grpc_polling_trace.enabled()) {
gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
gpr_log(GPR_DEBUG,
@@ -308,7 +307,7 @@
" (%s) - (%s, %d)",
pi, old_cnt, (old_cnt - 1), reason, file, line);
}
- pi_unref(exec_ctx, pi);
+ pi_unref(pi);
}
#endif
@@ -316,7 +315,7 @@
gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
}
-static void pi_unref(grpc_exec_ctx* exec_ctx, polling_island* pi) {
+static void pi_unref(polling_island* pi) {
/* If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
@@ -327,9 +326,9 @@
*/
if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
polling_island* next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(exec_ctx, pi);
+ polling_island_delete(pi);
if (next != nullptr) {
- PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+ PI_UNREF(next, "pi_delete"); /* Recursive call */
}
}
}
@@ -465,8 +464,7 @@
}
/* Might return NULL in case of an error */
-static polling_island* polling_island_create(grpc_exec_ctx* exec_ctx,
- grpc_fd* initial_fd,
+static polling_island* polling_island_create(grpc_fd* initial_fd,
grpc_error** error) {
polling_island* pi = nullptr;
const char* err_desc = "polling_island_create";
@@ -482,7 +480,7 @@
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
+ gpr_atm_rel_store(&pi->merged_to, (gpr_atm) nullptr);
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
@@ -497,13 +495,13 @@
done:
if (*error != GRPC_ERROR_NONE) {
- polling_island_delete(exec_ctx, pi);
+ polling_island_delete(pi);
pi = nullptr;
}
return pi;
}
-static void polling_island_delete(grpc_exec_ctx* exec_ctx, polling_island* pi) {
+static void polling_island_delete(polling_island* pi) {
GPR_ASSERT(pi->fd_cnt == 0);
if (pi->epoll_fd >= 0) {
@@ -862,8 +860,7 @@
return ret_fd;
}
-static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
- grpc_closure* on_done, int* release_fd,
+static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr;
@@ -902,7 +899,7 @@
fd->orphaned = true;
- GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -911,7 +908,7 @@
The polling island owns a workqueue which owns an fd, and unreffing
inside the lock can cause an eventual lock loop that makes TSAN very
unhappy. */
- PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
+ PI_UNREF(unref_pi, "fd_orphan");
}
if (error != GRPC_ERROR_NONE) {
const char* msg = grpc_error_string(error);
@@ -920,8 +917,7 @@
GRPC_ERROR_UNREF(error);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
- grpc_fd* fd) {
+static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
return (grpc_pollset*)notifier;
}
@@ -931,22 +927,20 @@
}
/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
- if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) {
+static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
+ if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
- fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why));
+ fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
-static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
- grpc_closure* closure) {
- fd->read_closure->NotifyOn(exec_ctx, closure);
+static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
+ fd->read_closure->NotifyOn(closure);
}
-static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
- grpc_closure* closure) {
- fd->write_closure->NotifyOn(exec_ctx, closure);
+static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
+ fd->write_closure->NotifyOn(closure);
}
/*******************************************************************************
@@ -1028,11 +1022,11 @@
}
/* p->mu must be held before calling this function */
-static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* p,
+static grpc_error* pollset_kick(grpc_pollset* p,
grpc_pollset_worker* specific_worker) {
GPR_TIMER_BEGIN("pollset_kick", 0);
grpc_error* error = GRPC_ERROR_NONE;
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
+ GRPC_STATS_INC_POLLSET_KICK();
const char* err_desc = "Kick Failure";
grpc_pollset_worker* worker = specific_worker;
if (worker != nullptr) {
@@ -1096,10 +1090,9 @@
pollset->shutdown_done = nullptr;
}
-static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
- grpc_millis millis) {
+static int poll_deadline_to_millis_timeout(grpc_millis millis) {
if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
- grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
+ grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
if (delta > INT_MAX)
return INT_MAX;
else if (delta < 0)
@@ -1108,9 +1101,8 @@
return (int)delta;
}
-static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
- grpc_pollset* notifier) {
- fd->read_closure->SetReady(exec_ctx);
+static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
+ fd->read_closure->SetReady();
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1121,39 +1113,34 @@
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
-static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
- fd->write_closure->SetReady(exec_ctx);
-}
+static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-static void pollset_release_polling_island(grpc_exec_ctx* exec_ctx,
- grpc_pollset* ps,
+static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
- PI_UNREF(exec_ctx, ps->po.pi, reason);
+ PI_UNREF(ps->po.pi, reason);
}
ps->po.pi = nullptr;
}
-static void finish_shutdown_locked(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset) {
+static void finish_shutdown_locked(grpc_pollset* pollset) {
/* The pollset cannot have any workers if we are at this stage */
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->po.pi to NULL */
- pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ pollset_release_polling_island(pollset, "ps_shutdown");
+ GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
-static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_closure* closure) {
+static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = true;
pollset->shutdown_done = closure;
- pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
/* If the pollset has any workers, we cannot call finish_shutdown_locked()
because it would release the underlying polling island. In such a case, we
@@ -1161,7 +1148,7 @@
if (!pollset_has_workers(pollset)) {
GPR_ASSERT(!pollset->finish_shutdown_called);
GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
+ finish_shutdown_locked(pollset);
}
GPR_TIMER_END("pollset_shutdown", 0);
}
@@ -1169,15 +1156,14 @@
/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
* than destroying the mutexes, there is nothing special that needs to be done
* here */
-static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) {
+static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
gpr_mu_destroy(&pollset->po.mu);
}
#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
-static void pollset_work_and_unlock(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset,
+static void pollset_work_and_unlock(grpc_pollset* pollset,
grpc_pollset_worker* worker, int timeout_ms,
sigset_t* sig_mask, grpc_error** error) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
@@ -1199,7 +1185,7 @@
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->po.pi == nullptr) {
- pollset->po.pi = polling_island_create(exec_ctx, nullptr, error);
+ pollset->po.pi = polling_island_create(nullptr, error);
if (pollset->po.pi == nullptr) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
@@ -1219,7 +1205,7 @@
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(exec_ctx, pollset->po.pi, "ps");
+ PI_UNREF(pollset->po.pi, "ps");
pollset->po.pi = pi;
}
@@ -1233,10 +1219,10 @@
g_current_thread_polling_island = pi;
GRPC_SCHEDULING_START_BLOCKING_REGION;
- GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
+ GRPC_STATS_INC_SYSCALL_POLL();
ep_rv =
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_asprintf(&err_msg,
@@ -1274,10 +1260,10 @@
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
+ fd_become_readable(fd, pollset);
}
if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
+ fd_become_writable(fd);
}
}
}
@@ -1292,7 +1278,7 @@
that we got before releasing the polling island lock). This is because
pollset->po.pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
- PI_UNREF(exec_ctx, pi, "ps_work");
+ PI_UNREF(pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
@@ -1301,12 +1287,12 @@
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+static grpc_error* pollset_work(grpc_pollset* pollset,
grpc_pollset_worker** worker_hdl,
grpc_millis deadline) {
GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error* error = GRPC_ERROR_NONE;
- int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline);
+ int timeout_ms = poll_deadline_to_millis_timeout(deadline);
sigset_t new_mask;
@@ -1364,9 +1350,9 @@
push_front_worker(pollset, &worker); /* Add worker to pollset */
- pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
- &g_orig_sigmask, &error);
- grpc_exec_ctx_flush(exec_ctx);
+ pollset_work_and_unlock(pollset, &worker, timeout_ms, &g_orig_sigmask,
+ &error);
+ grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&pollset->po.mu);
@@ -1386,10 +1372,10 @@
if (pollset->shutting_down && !pollset_has_workers(pollset) &&
!pollset->finish_shutdown_called) {
GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
+ finish_shutdown_locked(pollset);
gpr_mu_unlock(&pollset->po.mu);
- grpc_exec_ctx_flush(exec_ctx);
+ grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&pollset->po.mu);
}
@@ -1404,9 +1390,8 @@
return error;
}
-static void add_poll_object(grpc_exec_ctx* exec_ctx, poll_obj* bag,
- poll_obj_type bag_type, poll_obj* item,
- poll_obj_type item_type) {
+static void add_poll_object(poll_obj* bag, poll_obj_type bag_type,
+ poll_obj* item, poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
#ifndef NDEBUG
@@ -1456,7 +1441,7 @@
keeping TSAN happy outweigh any performance advantage we might have
by keeping the lock held. */
gpr_mu_unlock(&item->mu);
- pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
+ pi_new = polling_island_create(FD_FROM_PO(item), &error);
gpr_mu_lock(&item->mu);
/* Need to reverify any assumptions made between the initial lock and
@@ -1475,11 +1460,11 @@
/* Ref and unref so that the polling island gets deleted during unref
*/
PI_ADD_REF(pi_new, "dance_of_destruction");
- PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ PI_UNREF(pi_new, "dance_of_destruction");
goto retry;
}
} else {
- pi_new = polling_island_create(exec_ctx, nullptr, &error);
+ pi_new = polling_island_create(nullptr, &error);
}
GRPC_POLLING_TRACE(
@@ -1533,7 +1518,7 @@
if (item->pi != pi_new) {
PI_ADD_REF(pi_new, poll_obj_string(item_type));
if (item->pi != nullptr) {
- PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
+ PI_UNREF(item->pi, poll_obj_string(item_type));
}
item->pi = pi_new;
}
@@ -1541,7 +1526,7 @@
if (bag->pi != pi_new) {
PI_ADD_REF(pi_new, poll_obj_string(bag_type));
if (bag->pi != nullptr) {
- PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
+ PI_UNREF(bag->pi, poll_obj_string(bag_type));
}
bag->pi = pi_new;
}
@@ -1553,10 +1538,8 @@
GPR_TIMER_END("add_poll_object", 0);
}
-static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_fd* fd) {
- add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
- POLL_OBJ_FD);
+static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
+ add_poll_object(&pollset->po, POLL_OBJ_POLLSET, &fd->po, POLL_OBJ_FD);
}
/*******************************************************************************
@@ -1573,48 +1556,39 @@
return pss;
}
-static void pollset_set_destroy(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pss) {
+static void pollset_set_destroy(grpc_pollset_set* pss) {
gpr_mu_destroy(&pss->po.mu);
if (pss->po.pi != nullptr) {
- PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
+ PI_UNREF(pss->po.pi, "pss_destroy");
}
gpr_free(pss);
}
-static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
- grpc_fd* fd) {
- add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
- POLL_OBJ_FD);
+static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
+ add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &fd->po, POLL_OBJ_FD);
}
-static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
- grpc_fd* fd) {
+static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
/* Nothing to do */
}
-static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pss, grpc_pollset* ps) {
- add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
- POLL_OBJ_POLLSET);
+static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
+ add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &ps->po, POLL_OBJ_POLLSET);
}
-static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pss, grpc_pollset* ps) {
+static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
/* Nothing to do */
}
-static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* bag,
+static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) {
- add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
+ add_poll_object(&bag->po, POLL_OBJ_POLLSET_SET, &item->po,
POLL_OBJ_POLLSET_SET);
}
-static void pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* bag,
+static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) {
/* Nothing to do */
}
@@ -1760,7 +1734,7 @@
bool explicit_request) {
gpr_log(GPR_ERROR,
"Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined.");
- return NULL;
+ return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */