Tweak: trigger next poller before exec_ctx flush
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 27ed978..b1127d3 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -594,14 +594,21 @@
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
- if (worker->kick_state == KICKED_FOR_POLL) {
+ if (worker_hdl != NULL) *worker_hdl = NULL;
+ if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
GPR_ASSERT(!pollset->seen_inactive);
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker);
if (worker->next != worker) {
assert(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
+ gpr_log(GPR_DEBUG, "Picked sibling worker %p for poller", worker);
worker->next->kick_state = KICKED_FOR_POLL;
gpr_cv_signal(&worker->next->cv);
+ if (grpc_exec_ctx_has_work(exec_ctx)) {
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
@@ -648,6 +655,7 @@
((size_t)cur_neighbourhood_idx + 1) % g_num_neighbourhoods;
neighbourhood = &g_neighbourhoods[new_neighbourhood_idx];
} while (!found_worker && neighbourhood != pollset->neighbourhood);
+ grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
}
}
@@ -673,20 +681,18 @@
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
- grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
- gpr_tls_set(&g_current_thread_pollset, 0);
gpr_tls_set(&g_current_thread_worker, 0);
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
+ gpr_tls_set(&g_current_thread_pollset, 0);
return error;
}
@@ -705,10 +711,13 @@
&g_active_poller)) {
root_worker->kick_state = KICKED;
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
- } else {
+ } else if (next_worker->kick_state == UNKICKED) {
+ GPR_ASSERT(next_worker->initialized_cv);
next_worker->kick_state = KICKED;
gpr_cv_signal(&next_worker->cv);
return GRPC_ERROR_NONE;
+ } else {
+ return GRPC_ERROR_NONE;
}
} else {
return GRPC_ERROR_NONE;
@@ -723,10 +732,13 @@
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
specific_worker->kick_state = KICKED;
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
- } else {
+ } else if (specific_worker->initialized_cv) {
specific_worker->kick_state = KICKED;
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
+ } else {
+ specific_worker->kick_state = KICKED;
+ return GRPC_ERROR_NONE;
}
}