Try to avoid contention
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index a25168f..27f448f 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -314,6 +314,7 @@
GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
+static bool *g_neighbour_scan_state;
static size_t g_num_neighbourhoods;
/* Return true if first in list */
@@ -368,6 +369,8 @@
g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores());
g_neighbourhoods =
gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
+ g_neighbour_scan_state =
+ gpr_malloc(sizeof(*g_neighbour_scan_state) * g_num_neighbourhoods);
for (size_t i = 0; i < g_num_neighbourhoods; i++) {
gpr_mu_init(&g_neighbourhoods[i].mu);
g_neighbourhoods[i].seen_inactive = true;
@@ -383,6 +386,7 @@
gpr_mu_destroy(&g_neighbourhoods[i].mu);
}
gpr_free(g_neighbourhoods);
+ gpr_free(g_neighbour_scan_state);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -591,6 +595,42 @@
pollset->shutdown_closure == NULL;
}
+static bool check_neighbourhood_for_available_poller(
+ pollset_neighbourhood *neighbourhood, grpc_pollset_worker *avoid_worker) {
+ bool found_worker = false;
+ do {
+ grpc_pollset *inspect = neighbourhood->active_root;
+ if (inspect == NULL) {
+ break;
+ }
+ gpr_mu_lock(&inspect->mu);
+ GPR_ASSERT(!inspect->seen_inactive);
+ grpc_pollset_worker *inspect_worker = inspect->root_worker;
+ if (inspect_worker == avoid_worker) inspect_worker = inspect_worker->next;
+ if (inspect_worker == avoid_worker) inspect_worker = NULL;
+ if (inspect_worker != NULL) {
+ if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
+ (gpr_atm)inspect_worker)) {
+ inspect_worker->kick_state = KICKED_FOR_POLL;
+ if (inspect_worker->initialized_cv) {
+ gpr_cv_signal(&inspect_worker->cv);
+ }
+ }
+ // even if we didn't win the cas, there's a worker, we can stop
+ found_worker = true;
+ } else {
+ inspect->seen_inactive = true;
+ move_pollset_to_neighbourhood_list(inspect, &neighbourhood->active_root,
+ &neighbourhood->inactive_root);
+ }
+ gpr_mu_unlock(&inspect->mu);
+ } while (!found_worker);
+ if (!found_worker) {
+ neighbourhood->seen_inactive = true;
+ }
+ return found_worker;
+}
+
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
@@ -610,49 +650,35 @@
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
- pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
gpr_mu_unlock(&pollset->mu);
+ size_t poller_neighbourhood_idx =
+ (size_t)(pollset->neighbourhood - g_neighbourhoods);
bool found_worker = false;
- do {
- gpr_mu_lock(&neighbourhood->mu);
- do {
- grpc_pollset *inspect = neighbourhood->active_root;
- if (inspect == NULL) {
- break;
- }
- gpr_mu_lock(&inspect->mu);
- GPR_ASSERT(!inspect->seen_inactive);
- grpc_pollset_worker *inspect_worker = inspect->root_worker;
- if (inspect_worker == worker) inspect_worker = worker->next;
- if (inspect_worker == worker) inspect_worker = NULL;
- if (inspect_worker != NULL) {
- if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
- (gpr_atm)inspect_worker)) {
- inspect_worker->kick_state = KICKED_FOR_POLL;
- if (inspect_worker->initialized_cv) gpr_cv_signal(&inspect_worker->cv);
- }
- // even if we didn't win the cas, there's a worker, we can stop
- found_worker = true;
- } else {
- inspect->seen_inactive = true;
- move_pollset_to_neighbourhood_list(inspect,
- &neighbourhood->active_root,
- &neighbourhood->inactive_root);
- }
- gpr_mu_unlock(&inspect->mu);
- } while (!found_worker);
- if (!found_worker) {
- neighbourhood->seen_inactive = true;
+ for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
+ pollset_neighbourhood *neighbourhood =
+ &g_neighbourhoods[(poller_neighbourhood_idx + i) %
+ g_num_neighbourhoods];
+ if (gpr_mu_trylock(&neighbourhood->mu)) {
+ found_worker =
+ check_neighbourhood_for_available_poller(neighbourhood, worker);
+ gpr_mu_unlock(&neighbourhood->mu);
+ g_neighbour_scan_state[i] = true;
+ } else {
+ g_neighbour_scan_state[i] = false;
}
- gpr_mu_unlock(&neighbourhood->mu);
- ssize_t cur_neighbourhood_idx = neighbourhood - g_neighbourhoods;
- GPR_ASSERT(cur_neighbourhood_idx >= 0);
- GPR_ASSERT(g_num_neighbourhoods < INTPTR_MAX);
- GPR_ASSERT(cur_neighbourhood_idx < (ssize_t)g_neighbourhoods);
- size_t new_neighbourhood_idx =
- ((size_t)cur_neighbourhood_idx + 1) % g_num_neighbourhoods;
- neighbourhood = &g_neighbourhoods[new_neighbourhood_idx];
- } while (!found_worker && neighbourhood != pollset->neighbourhood);
+ }
+ if (!found_worker) {
+ for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
+ if (g_neighbour_scan_state[i]) continue;
+ pollset_neighbourhood *neighbourhood =
+ &g_neighbourhoods[(poller_neighbourhood_idx + i) %
+ g_num_neighbourhoods];
+ gpr_mu_lock(&neighbourhood->mu);
+ found_worker =
+ check_neighbourhood_for_available_poller(neighbourhood, worker);
+ gpr_mu_unlock(&neighbourhood->mu);
+ }
+ }
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
}