Fix several races
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 1284891..fcccccc 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -106,6 +106,8 @@
   gpr_cv cv;
 };
 
+#define MAX_NEIGHBOURHOODS 1024
+
 typedef struct pollset_neighbourhood {
   gpr_mu mu;
   grpc_pollset *active_root;
@@ -121,6 +123,7 @@
   bool shutting_down;          /* Is the pollset shutting down ? */
   bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
   grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
+  int begin_refs;
 
   grpc_pollset *next;
   grpc_pollset *prev;
@@ -312,7 +315,6 @@
 GPR_TLS_DECL(g_current_thread_worker);
 static gpr_atm g_active_poller;
 static pollset_neighbourhood *g_neighbourhoods;
-static bool *g_neighbour_scan_state;
 static size_t g_num_neighbourhoods;
 
 /* Return true if first in list */
@@ -352,6 +354,10 @@
   }
 }
 
+static size_t choose_neighbourhood(void) {
+  return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
+}
+
 static grpc_error *pollset_global_init(void) {
   gpr_tls_init(&g_current_thread_pollset);
   gpr_tls_init(&g_current_thread_worker);
@@ -364,11 +370,9 @@
   if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
     return GRPC_OS_ERROR(errno, "epoll_ctl");
   }
-  g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores());
+  g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
   g_neighbourhoods =
       gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
-  g_neighbour_scan_state =
-      gpr_malloc(sizeof(*g_neighbour_scan_state) * g_num_neighbourhoods);
   for (size_t i = 0; i < g_num_neighbourhoods; i++) {
     gpr_mu_init(&g_neighbourhoods[i].mu);
   }
@@ -383,26 +387,27 @@
     gpr_mu_destroy(&g_neighbourhoods[i].mu);
   }
   gpr_free(g_neighbourhoods);
-  gpr_free(g_neighbour_scan_state);
 }
 
 static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
   gpr_mu_init(&pollset->mu);
   *mu = &pollset->mu;
-  pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()];
+  pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
   pollset->seen_inactive = true;
   pollset->next = pollset->prev = pollset;
 }
 
 static void pollset_destroy(grpc_pollset *pollset) {
-  gpr_mu_lock(&pollset->neighbourhood->mu);
-  pollset->prev->next = pollset->next;
-  pollset->next->prev = pollset->prev;
-  if (pollset == pollset->neighbourhood->active_root) {
-    pollset->neighbourhood->active_root =
-        pollset->next == pollset ? NULL : pollset->next;
+  if (!pollset->seen_inactive) {
+    gpr_mu_lock(&pollset->neighbourhood->mu);
+    pollset->prev->next = pollset->next;
+    pollset->next->prev = pollset->prev;
+    if (pollset == pollset->neighbourhood->active_root) {
+      pollset->neighbourhood->active_root =
+          pollset->next == pollset ? NULL : pollset->next;
+    }
+    gpr_mu_unlock(&pollset->neighbourhood->mu);
   }
-  gpr_mu_unlock(&pollset->neighbourhood->mu);
   gpr_mu_destroy(&pollset->mu);
 }
 
@@ -428,7 +433,8 @@
 
 static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
                                           grpc_pollset *pollset) {
-  if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
+  if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
+      pollset->begin_refs == 0) {
     grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
     pollset->shutdown_closure = NULL;
   }
@@ -532,14 +538,16 @@
   if (worker_hdl != NULL) *worker_hdl = worker;
   worker->initialized_cv = false;
   worker->kick_state = UNKICKED;
+  pollset->begin_refs++;
 
   if (pollset->seen_inactive) {
     // pollset has been observed to be inactive, we need to move back to the
     // active list
-    pollset_neighbourhood *neighbourhood = pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()];
+    pollset_neighbourhood *neighbourhood = pollset->neighbourhood =
+        &g_neighbourhoods[choose_neighbourhood()];
     gpr_mu_unlock(&pollset->mu);
-    // pollset unlocked: state may change (even worker->kick_state)
-retry_lock_neighbourhood:
+  // pollset unlocked: state may change (even worker->kick_state)
+  retry_lock_neighbourhood:
     gpr_mu_lock(&neighbourhood->mu);
     gpr_mu_lock(&pollset->mu);
     if (pollset->seen_inactive) {
@@ -564,16 +572,18 @@
     gpr_mu_unlock(&neighbourhood->mu);
   }
   worker_insert(pollset, worker);
+  pollset->begin_refs--;
   if (worker->kick_state == UNKICKED) {
     GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
     worker->initialized_cv = true;
     gpr_cv_init(&worker->cv);
-    do {
+    while (worker->kick_state == UNKICKED &&
+           pollset->shutdown_closure == NULL) {
       if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
           worker->kick_state == UNKICKED) {
         worker->kick_state = KICKED;
       }
-    } while (worker->kick_state == UNKICKED);
+    }
     *now = gpr_now(now->clock_type);
   }
 
@@ -594,17 +604,24 @@
     grpc_pollset_worker *inspect_worker = inspect->root_worker;
     if (inspect_worker != NULL) {
       do {
-        if (inspect_worker->kick_state == UNKICKED) {
-          if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
-                                     (gpr_atm)inspect_worker)) {
-            inspect_worker->kick_state = DESIGNATED_POLLER;
-            if (inspect_worker->initialized_cv) {
-              gpr_cv_signal(&inspect_worker->cv);
+        switch (inspect_worker->kick_state) {
+          case UNKICKED:
+            if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
+                                       (gpr_atm)inspect_worker)) {
+              inspect_worker->kick_state = DESIGNATED_POLLER;
+              if (inspect_worker->initialized_cv) {
+                gpr_cv_signal(&inspect_worker->cv);
+              }
             }
-          }
-          // even if we didn't win the cas, there's a worker, we can stop
-          found_worker = true;
-          break;
+            // even if we didn't win the cas, there's a worker, we can stop
+            found_worker = true;
+            break;
+          case KICKED:
+            break;
+          case DESIGNATED_POLLER:
+            found_worker = true;  // ok, so someone else found the worker, but
+                                  // we'll accept that
+            break;
         }
         inspect_worker = inspect_worker->next;
       } while (inspect_worker != inspect->root_worker);
@@ -649,6 +666,7 @@
       size_t poller_neighbourhood_idx =
           (size_t)(pollset->neighbourhood - g_neighbourhoods);
       bool found_worker = false;
+      bool scan_state[MAX_NEIGHBOURHOODS];
       for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
         pollset_neighbourhood *neighbourhood =
             &g_neighbourhoods[(poller_neighbourhood_idx + i) %
@@ -657,19 +675,18 @@
           found_worker =
               check_neighbourhood_for_available_poller(neighbourhood);
           gpr_mu_unlock(&neighbourhood->mu);
-          g_neighbour_scan_state[i] = true;
+          scan_state[i] = true;
         } else {
-          g_neighbour_scan_state[i] = false;
+          scan_state[i] = false;
         }
       }
       for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
-        if (g_neighbour_scan_state[i]) continue;
+        if (scan_state[i]) continue;
         pollset_neighbourhood *neighbourhood =
             &g_neighbourhoods[(poller_neighbourhood_idx + i) %
                               g_num_neighbourhoods];
         gpr_mu_lock(&neighbourhood->mu);
-        found_worker =
-            check_neighbourhood_for_available_poller(neighbourhood);
+        found_worker = check_neighbourhood_for_available_poller(neighbourhood);
         gpr_mu_unlock(&neighbourhood->mu);
       }
       grpc_exec_ctx_flush(exec_ctx);