Fixes
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 3c7a16b..669a3df 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -36,7 +36,7 @@
 /* This polling engine is only relevant on linux kernels supporting epoll() */
 #ifdef GRPC_LINUX_EPOLL
 
-#include "src/core/lib/iomgr/ev_epoll_linux.h"
+#include "src/core/lib/iomgr/ev_epoll1_linux.h"
 
 #include <assert.h>
 #include <errno.h>
@@ -75,16 +75,10 @@
 struct grpc_fd {
   int fd;
 
-  /* The fd is either closed or we relinquished control of it. In either
-     cases, this indicates that the 'fd' on this structure is no longer
-     valid */
-  bool orphaned;
-
   gpr_atm read_closure;
   gpr_atm write_closure;
 
   struct grpc_fd *freelist_next;
-  grpc_closure *on_done_closure;
 
   /* The pollset that last noticed that the fd is readable. The actual type
    * stored in this is (grpc_pollset *) */
@@ -119,12 +113,12 @@
 };
 
 struct grpc_pollset {
-  grpc_pollset_worker root_worker;
-  bool kicked_without_pollers;
+  grpc_pollset_worker *root_worker;
+  bool kicked_without_poller;
 
   bool shutting_down;          /* Is the pollset shutting down ? */
   bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
-  grpc_closure *shutdown_done; /* Called after after shutdown is complete */
+  grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
 };
 
 /*******************************************************************************
@@ -171,66 +165,6 @@
 static grpc_fd *fd_freelist = NULL;
 static gpr_mu fd_freelist_mu;
 
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
-static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
-                   int line) {
-  gpr_log(GPR_DEBUG, "FD %d %p   ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
-          (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
-          gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
-#else
-#define REF_BY(fd, n, reason) ref_by(fd, n)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n)
-static void ref_by(grpc_fd *fd, int n) {
-#endif
-  GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
-}
-
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
-                     int line) {
-  gpr_atm old;
-  gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
-          (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
-          gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
-#else
-static void unref_by(grpc_fd *fd, int n) {
-  gpr_atm old;
-#endif
-  old = gpr_atm_full_fetch_add(&fd->refst, -n);
-  if (old == n) {
-    /* Add the fd to the freelist */
-    gpr_mu_lock(&fd_freelist_mu);
-    fd->freelist_next = fd_freelist;
-    fd_freelist = fd;
-    grpc_iomgr_unregister_object(&fd->iomgr_object);
-
-    grpc_lfev_destroy(&fd->read_closure);
-    grpc_lfev_destroy(&fd->write_closure);
-
-    gpr_mu_unlock(&fd_freelist_mu);
-  } else {
-    GPR_ASSERT(old > n);
-  }
-}
-
-/* Increment refcount by two to avoid changing the orphan bit */
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
-                   int line) {
-  ref_by(fd, 2, reason, file, line);
-}
-
-static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
-                     int line) {
-  unref_by(fd, 2, reason, file, line);
-}
-#else
-static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-#endif
-
 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
 
 static void fd_global_shutdown(void) {
@@ -239,7 +173,6 @@
   while (fd_freelist != NULL) {
     grpc_fd *fd = fd_freelist;
     fd_freelist = fd_freelist->freelist_next;
-    gpr_mu_destroy(&fd->po.mu);
     gpr_free(fd);
   }
   gpr_mu_destroy(&fd_freelist_mu);
@@ -257,29 +190,20 @@
 
   if (new_fd == NULL) {
     new_fd = gpr_malloc(sizeof(grpc_fd));
-    gpr_mu_init(&new_fd->po.mu);
   }
 
-  /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
-   * is a newly created fd (or an fd we got from the freelist), no one else
-   * would be holding a lock to it anyway. */
-  gpr_mu_lock(&new_fd->po.mu);
-  new_fd->po.pi = NULL;
-#ifdef PO_DEBUG
-  new_fd->po.obj_type = POLL_OBJ_FD;
-#endif
+  struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
+                           .data.ptr = new_fd};
+  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+    gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
+  }
 
-  gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
   new_fd->fd = fd;
-  new_fd->orphaned = false;
   grpc_lfev_init(&new_fd->read_closure);
   grpc_lfev_init(&new_fd->write_closure);
   gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
 
   new_fd->freelist_next = NULL;
-  new_fd->on_done_closure = NULL;
-
-  gpr_mu_unlock(&new_fd->po.mu);
 
   char *fd_name;
   gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -291,26 +215,12 @@
   return new_fd;
 }
 
-static int fd_wrapped_fd(grpc_fd *fd) {
-  int ret_fd = -1;
-  gpr_mu_lock(&fd->po.mu);
-  if (!fd->orphaned) {
-    ret_fd = fd->fd;
-  }
-  gpr_mu_unlock(&fd->po.mu);
-
-  return ret_fd;
-}
+static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
 
 static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                       grpc_closure *on_done, int *release_fd,
                       const char *reason) {
-  bool is_fd_closed = false;
   grpc_error *error = GRPC_ERROR_NONE;
-  polling_island *unref_pi = NULL;
-
-  gpr_mu_lock(&fd->po.mu);
-  fd->on_done_closure = on_done;
 
   /* If release_fd is not NULL, we should be relinquishing control of the file
      descriptor fd->fd (but we still own the grpc_fd structure). */
@@ -318,45 +228,18 @@
     *release_fd = fd->fd;
   } else {
     close(fd->fd);
-    is_fd_closed = true;
   }
 
-  fd->orphaned = true;
+  grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
 
-  /* Remove the active status but keep referenced. We want this grpc_fd struct
-     to be alive (and not added to freelist) until the end of this function */
-  REF_BY(fd, 1, reason);
+  grpc_iomgr_unregister_object(&fd->iomgr_object);
+  grpc_lfev_destroy(&fd->read_closure);
+  grpc_lfev_destroy(&fd->write_closure);
 
-  /* Remove the fd from the polling island:
-     - Get a lock on the latest polling island (i.e the last island in the
-       linked list pointed by fd->po.pi). This is the island that
-       would actually contain the fd
-     - Remove the fd from the latest polling island
-     - Unlock the latest polling island
-     - Set fd->po.pi to NULL (but remove the ref on the polling island
-       before doing this.) */
-  if (fd->po.pi != NULL) {
-    polling_island *pi_latest = polling_island_lock(fd->po.pi);
-    polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
-    gpr_mu_unlock(&pi_latest->mu);
-
-    unref_pi = fd->po.pi;
-    fd->po.pi = NULL;
-  }
-
-  grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
-
-  gpr_mu_unlock(&fd->po.mu);
-  UNREF_BY(fd, 2, reason); /* Drop the reference */
-  if (unref_pi != NULL) {
-    /* Unref stale polling island here, outside the fd lock above.
-       The polling island owns a workqueue which owns an fd, and unreffing
-       inside the lock can cause an eventual lock loop that makes TSAN very
-       unhappy. */
-    PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
-  }
-  GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
-  GRPC_ERROR_UNREF(error);
+  gpr_mu_lock(&fd_freelist_mu);
+  fd->freelist_next = fd_freelist;
+  fd_freelist = fd;
+  gpr_mu_unlock(&fd_freelist_mu);
 }
 
 static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
@@ -390,11 +273,24 @@
 }
 
 static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
-  gpr_mu_lock(&fd->po.mu);
-  grpc_workqueue *workqueue =
-      GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
-  gpr_mu_unlock(&fd->po.mu);
-  return workqueue;
+  return NULL; /* TODO(ctiller): add a global workqueue */
+}
+
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+                               grpc_pollset *notifier) {
+  grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+
+  /* Note, it is possible that fd_become_readable might be called twice with
+     different 'notifier's when an fd becomes readable and it is in two epoll
+     sets (This can happen briefly during polling island merges). In such cases
+     it does not really matter which notifer is set as the read_notifier_pollset
+     (They would both point to the same polling island anyway) */
+  /* Use release store to match with acquire load in fd_get_read_notifier */
+  gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
+}
+
+static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+  grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
 }
 
 /*******************************************************************************
@@ -442,6 +338,263 @@
 
 GPR_TLS_DECL(g_current_thread_pollset);
 GPR_TLS_DECL(g_current_thread_worker);
+static gpr_mu g_pollset_mu;
+static grpc_pollset_worker *g_root_worker;
+
+static grpc_error *pollset_global_init(void) {
+  gpr_mu_init(&g_pollset_mu);
+  gpr_tls_init(&g_current_thread_pollset);
+  gpr_tls_init(&g_current_thread_worker);
+  struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
+                           .data.ptr = &global_wakeup_fd};
+  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+    return GRPC_OS_ERROR(errno, "epoll_ctl");
+  }
+  return GRPC_ERROR_NONE;
+}
+
+static void pollset_global_shutdown(void) {
+  gpr_mu_destroy(&g_pollset_mu);
+  gpr_tls_destroy(&g_current_thread_pollset);
+  gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+  *mu = &g_pollset_mu;
+}
+
+static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
+  grpc_error *error = GRPC_ERROR_NONE;
+  if (pollset->root_worker != NULL) {
+    grpc_pollset_worker *worker = pollset->root_worker;
+    do {
+      if (worker->initialized_cv) {
+        worker->kicked = true;
+        gpr_cv_signal(&worker->cv);
+      } else {
+        append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
+                     "pollset_shutdown");
+      }
+
+      worker = worker->links[PWL_POLLSET].next;
+    } while (worker != pollset->root_worker);
+  }
+  return error;
+}
+
+static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
+                                          grpc_pollset *pollset) {
+  if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
+    grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+    pollset->shutdown_closure = NULL;
+  }
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+                             grpc_closure *closure) {
+  GPR_ASSERT(pollset->shutdown_closure == NULL);
+  pollset->shutdown_closure = closure;
+  GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
+  pollset_maybe_finish_shutdown(exec_ctx, pollset);
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {}
+
+#define MAX_EPOLL_EVENTS 100
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+                                           gpr_timespec now) {
+  gpr_timespec timeout;
+  if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+    return -1;
+  }
+
+  if (gpr_time_cmp(deadline, now) <= 0) {
+    return 0;
+  }
+
+  static const gpr_timespec round_up = {
+      .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
+  timeout = gpr_time_sub(deadline, now);
+  int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
+  return millis >= 1 ? millis : 1;
+}
+
+static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+                                 gpr_timespec now, gpr_timespec deadline) {
+  struct epoll_event events[MAX_EPOLL_EVENTS];
+  static const char *err_desc = "pollset_poll";
+
+  int timeout = poll_deadline_to_millis_timeout(deadline, now);
+
+  if (timeout != 0) {
+    GRPC_SCHEDULING_START_BLOCKING_REGION;
+  }
+  int r;
+  do {
+    r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
+  } while (r < 0 && errno == EINTR);
+  if (timeout != 0) {
+    GRPC_SCHEDULING_END_BLOCKING_REGION;
+  }
+
+  if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+  grpc_error *error = GRPC_ERROR_NONE;
+  for (int i = 0; i < r; i++) {
+    void *data_ptr = events[i].data.ptr;
+    if (data_ptr == &global_wakeup_fd) {
+      grpc_timer_consume_kick();
+      append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+                   err_desc);
+    } else {
+      grpc_fd *fd = (grpc_fd *)(data_ptr);
+      bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
+      bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+      bool write_ev = (events[i].events & EPOLLOUT) != 0;
+      if (read_ev || cancel) {
+        fd_become_readable(exec_ctx, fd, pollset);
+      }
+      if (write_ev || cancel) {
+        fd_become_writable(exec_ctx, fd);
+      }
+    }
+  }
+
+  return error;
+}
+
+static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
+                         grpc_pollset_worker **worker_hdl, gpr_timespec *now,
+                         gpr_timespec deadline) {
+  bool do_poll = true;
+  if (worker_hdl != NULL) *worker_hdl = worker;
+  worker->initialized_cv = false;
+  worker->kicked = false;
+
+  worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
+  if (!worker_insert(&g_root_worker, PWL_POLLABLE, worker)) {
+    worker->initialized_cv = true;
+    gpr_cv_init(&worker->cv);
+    while (do_poll && g_root_worker != worker) {
+      if (gpr_cv_wait(&worker->cv, &g_pollset_mu, deadline)) {
+        do_poll = false;
+      } else if (worker->kicked) {
+        do_poll = false;
+      }
+    }
+    *now = gpr_now(now->clock_type);
+  }
+
+  return do_poll && pollset->shutdown_closure == NULL;
+}
+
+static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+                       grpc_pollset_worker *worker,
+                       grpc_pollset_worker **worker_hdl) {
+  if (NEW_ROOT == worker_remove(&g_root_worker, PWL_POLLABLE, worker)) {
+    gpr_cv_signal(&g_root_worker->cv);
+  }
+  if (worker->initialized_cv) {
+    gpr_cv_destroy(&worker->cv);
+  }
+  if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
+    pollset_maybe_finish_shutdown(exec_ctx, pollset);
+  }
+}
+
+/* pollset->po.mu lock must be held by the caller before calling this.
+   The function pollset_work() may temporarily release the lock (pollset->po.mu)
+   during the course of its execution but it will always re-acquire the lock and
+   ensure that it is held by the time the function returns */
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+                                grpc_pollset_worker **worker_hdl,
+                                gpr_timespec now, gpr_timespec deadline) {
+  grpc_pollset_worker worker;
+  grpc_error *error = GRPC_ERROR_NONE;
+  static const char *err_desc = "pollset_work";
+  if (pollset->kicked_without_poller) {
+    pollset->kicked_without_poller = false;
+    return GRPC_ERROR_NONE;
+  }
+  if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+    gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+    gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+    GPR_ASSERT(!pollset->shutdown_closure);
+    gpr_mu_unlock(&g_pollset_mu);
+    append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
+                 err_desc);
+    grpc_exec_ctx_flush(exec_ctx);
+    gpr_mu_lock(&g_pollset_mu);
+    gpr_tls_set(&g_current_thread_pollset, 0);
+    gpr_tls_set(&g_current_thread_worker, 0);
+    pollset_maybe_finish_shutdown(exec_ctx, pollset);
+  }
+  end_worker(exec_ctx, pollset, &worker, worker_hdl);
+  return error;
+}
+
+static grpc_error *pollset_kick(grpc_pollset *pollset,
+                                grpc_pollset_worker *specific_worker) {
+  if (specific_worker == NULL) {
+    if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
+      if (pollset->root_worker == NULL) {
+        pollset->kicked_without_poller = true;
+        return GRPC_ERROR_NONE;
+      } else {
+        return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+      }
+    } else {
+      return GRPC_ERROR_NONE;
+    }
+  } else if (specific_worker->kicked) {
+    return GRPC_ERROR_NONE;
+  } else if (gpr_tls_get(&g_current_thread_worker) ==
+             (intptr_t)specific_worker) {
+    specific_worker->kicked = true;
+    return GRPC_ERROR_NONE;
+  } else if (specific_worker == g_root_worker) {
+    specific_worker->kicked = true;
+    return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+  } else {
+    specific_worker->kicked = true;
+    gpr_cv_signal(&specific_worker->cv);
+    return GRPC_ERROR_NONE;
+  }
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+                           grpc_fd *fd) {}
+
+static grpc_error *kick_poller(void) {
+  return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+}
+
+/*******************************************************************************
+ * Workqueue Definitions
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+                                     const char *file, int line,
+                                     const char *reason) {
+  return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+  return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+                            grpc_workqueue *workqueue) {}
+#endif
+
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+  return grpc_schedule_on_exec_ctx;
+}
 
 /*******************************************************************************
  * Pollset-set Definitions
@@ -481,7 +634,6 @@
 static void shutdown_engine(void) {
   fd_global_shutdown();
   pollset_global_shutdown();
-  polling_island_global_shutdown();
 }
 
 static const grpc_event_engine_vtable vtable = {
@@ -524,45 +676,22 @@
 
 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
  * Create a dummy epoll_fd to make sure epoll support is available */
-static bool is_epoll_available() {
-  int fd = epoll_create1(EPOLL_CLOEXEC);
-  if (fd < 0) {
-    gpr_log(
-        GPR_ERROR,
-        "epoll_create1 failed with error: %d. Not using epoll polling engine",
-        fd);
-    return false;
-  }
-  close(fd);
-  return true;
-}
-
 const grpc_event_engine_vtable *grpc_init_epoll1_linux(void) {
-  /* If use of signals is disabled, we cannot use epoll engine*/
-  if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
-    return NULL;
-  }
-
   if (!grpc_has_wakeup_fd()) {
     return NULL;
   }
 
-  if (!is_epoll_available()) {
+  g_epfd = epoll_create1(EPOLL_CLOEXEC);
+  if (g_epfd < 0) {
+    gpr_log(GPR_ERROR, "epoll unavailable");
     return NULL;
   }
 
-  if (!is_grpc_wakeup_signal_initialized) {
-    grpc_use_signal(SIGRTMIN + 6);
-  }
-
   fd_global_init();
 
   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
-    return NULL;
-  }
-
-  if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
-                         polling_island_global_init())) {
+    close(g_epfd);
+    fd_global_shutdown();
     return NULL;
   }