Update epollers
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 52362a6..e50000d 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -59,7 +59,6 @@
 #include "src/core/lib/iomgr/lockfree_event.h"
 #include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
 #include "src/core/lib/profiling/timers.h"
 #include "src/core/lib/support/block_annotate.h"
 
@@ -177,7 +176,9 @@
  * Polling island Declarations
  */
 
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define PI_REFCOUNT_DEBUG
+
+#ifdef PI_REFCOUNT_DEBUG
 
 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
 #define PI_UNREF(exec_ctx, p, r) \
@@ -192,8 +193,6 @@
 
 /* This is also used as grpc_workqueue (by directly casing it) */
 typedef struct polling_island {
-  grpc_closure_scheduler workqueue_scheduler;
-
   gpr_mu mu;
   /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
      the refcount.
@@ -214,15 +213,6 @@
 
   /* Number of threads currently polling on this island */
   gpr_atm poller_count;
-  /* Mutex guarding the read end of the workqueue (must be held to pop from
-   * workqueue_items) */
-  gpr_mu workqueue_read_mu;
-  /* Queue of closures to be executed */
-  gpr_mpscq workqueue_items;
-  /* Count of items in workqueue_items */
-  gpr_atm workqueue_item_count;
-  /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
-  grpc_wakeup_fd workqueue_wakeup_fd;
 
   /* The fd of the underlying epoll set */
   int epoll_fd;
@@ -297,8 +287,6 @@
 
 /* Forward declaration */
 static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
-                              grpc_error *error);
 
 #ifdef GRPC_TSAN
 /* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -311,13 +299,10 @@
 gpr_atm g_epoll_sync;
 #endif /* defined(GRPC_TSAN) */
 
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
-    workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
 static void pi_add_ref(polling_island *pi);
 static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
 
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef PI_REFCOUNT_DEBUG
 static void pi_add_ref_dbg(polling_island *pi, const char *reason,
                            const char *file, int line) {
   long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -333,36 +318,6 @@
   gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
           (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
 }
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
-                                     const char *file, int line,
-                                     const char *reason) {
-  if (workqueue != NULL) {
-    pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
-  }
-  return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
-                            const char *file, int line, const char *reason) {
-  if (workqueue != NULL) {
-    pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
-  }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
-  if (workqueue != NULL) {
-    pi_add_ref((polling_island *)workqueue);
-  }
-  return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
-                            grpc_workqueue *workqueue) {
-  if (workqueue != NULL) {
-    pi_unref(exec_ctx, (polling_island *)workqueue);
-  }
-}
 #endif
 
 static void pi_add_ref(polling_island *pi) {
@@ -526,26 +481,16 @@
   *error = GRPC_ERROR_NONE;
 
   pi = gpr_malloc(sizeof(*pi));
-  pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
   gpr_mu_init(&pi->mu);
   pi->fd_cnt = 0;
   pi->fd_capacity = 0;
   pi->fds = NULL;
   pi->epoll_fd = -1;
 
-  gpr_mu_init(&pi->workqueue_read_mu);
-  gpr_mpscq_init(&pi->workqueue_items);
-  gpr_atm_rel_store(&pi->workqueue_item_count, 0);
-
   gpr_atm_rel_store(&pi->ref_count, 0);
   gpr_atm_rel_store(&pi->poller_count, 0);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
 
-  if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
-                    err_desc)) {
-    goto done;
-  }
-
   pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
 
   if (pi->epoll_fd < 0) {
@@ -553,8 +498,6 @@
     goto done;
   }
 
-  polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
-
   if (initial_fd != NULL) {
     polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
   }
@@ -573,11 +516,7 @@
   if (pi->epoll_fd >= 0) {
     close(pi->epoll_fd);
   }
-  GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
-  gpr_mu_destroy(&pi->workqueue_read_mu);
-  gpr_mpscq_destroy(&pi->workqueue_items);
   gpr_mu_destroy(&pi->mu);
-  grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
   gpr_free(pi->fds);
   gpr_free(pi);
 }
@@ -722,45 +661,6 @@
   }
 }
 
-static void workqueue_maybe_wakeup(polling_island *pi) {
-  /* If this thread is the current poller, then it may be that it's about to
-     decrement the current poller count, so we need to look past this thread */
-  bool is_current_poller = (g_current_thread_polling_island == pi);
-  gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
-  gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
-  /* Only issue a wakeup if it's likely that some poller could come in and take
-     it right now. Note that since we do an anticipatory mpscq_pop every poll
-     loop, it's ok if we miss the wakeup here, as we'll get the work item when
-     the next poller enters anyway. */
-  if (current_pollers > min_current_pollers_for_wakeup) {
-    GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
-                      grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
-  }
-}
-
-static void workqueue_move_items_to_parent(polling_island *q) {
-  polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
-  if (p == NULL) {
-    return;
-  }
-  gpr_mu_lock(&q->workqueue_read_mu);
-  int num_added = 0;
-  while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
-    gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
-    if (n != NULL) {
-      gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
-      gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
-      gpr_mpscq_push(&p->workqueue_items, n);
-      num_added++;
-    }
-  }
-  gpr_mu_unlock(&q->workqueue_read_mu);
-  if (num_added > 0) {
-    workqueue_maybe_wakeup(p);
-  }
-  workqueue_move_items_to_parent(p);
-}
-
 static polling_island *polling_island_merge(polling_island *p,
                                             polling_island *q,
                                             grpc_error **error) {
@@ -785,8 +685,6 @@
     /* Add the 'merged_to' link from p --> q */
     gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
     PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
-
-    workqueue_move_items_to_parent(p);
   }
   /* else if p == q, nothing needs to be done */
 
@@ -797,32 +695,6 @@
   return q;
 }
 
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
-                              grpc_error *error) {
-  GPR_TIMER_BEGIN("workqueue.enqueue", 0);
-  grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
-  /* take a ref to the workqueue: otherwise it can happen that whatever events
-   * this kicks off ends up destroying the workqueue before this function
-   * completes */
-  GRPC_WORKQUEUE_REF(workqueue, "enqueue");
-  polling_island *pi = (polling_island *)workqueue;
-  gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
-  closure->error_data.error = error;
-  gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
-  if (last == 0) {
-    workqueue_maybe_wakeup(pi);
-  }
-  workqueue_move_items_to_parent(pi);
-  GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
-  GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
-  polling_island *pi = (polling_island *)workqueue;
-  return workqueue == NULL ? grpc_schedule_on_exec_ctx
-                           : &pi->workqueue_scheduler;
-}
-
 static grpc_error *polling_island_global_init() {
   grpc_error *error = GRPC_ERROR_NONE;
 
@@ -1081,14 +953,6 @@
   grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
 }
 
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
-  gpr_mu_lock(&fd->po.mu);
-  grpc_workqueue *workqueue =
-      GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
-  gpr_mu_unlock(&fd->po.mu);
-  return workqueue;
-}
-
 /*******************************************************************************
  * Pollset Definitions
  */
@@ -1326,33 +1190,6 @@
   gpr_mu_destroy(&pollset->po.mu);
 }
 
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
-                                    polling_island *pi) {
-  if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
-    gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
-    gpr_mu_unlock(&pi->workqueue_read_mu);
-    if (n != NULL) {
-      if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
-        workqueue_maybe_wakeup(pi);
-      }
-      grpc_closure *c = (grpc_closure *)n;
-      grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
-      c->scheduled = false;
-#endif
-      c->cb(exec_ctx, c->cb_arg, error);
-      GRPC_ERROR_UNREF(error);
-      return true;
-    } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
-      /* n == NULL might mean there's work but it's not available to be popped
-       * yet - try to ensure another workqueue wakes up to check shortly if so
-       */
-      workqueue_maybe_wakeup(pi);
-    }
-  }
-  return false;
-}
-
 #define GRPC_EPOLL_MAX_EVENTS 100
 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
 static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
@@ -1408,72 +1245,61 @@
   PI_ADD_REF(pi, "ps_work");
   gpr_mu_unlock(&pollset->po.mu);
 
-  /* If we get some workqueue work to do, it might end up completing an item on
-     the completion queue, so there's no need to poll... so we skip that and
-     redo the complete loop to verify */
-  if (!maybe_do_workqueue_work(exec_ctx, pi)) {
-    gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
-    g_current_thread_polling_island = pi;
+  gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+  g_current_thread_polling_island = pi;
 
-    GRPC_SCHEDULING_START_BLOCKING_REGION;
-    ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
-                        sig_mask);
-    GRPC_SCHEDULING_END_BLOCKING_REGION;
-    if (ep_rv < 0) {
-      if (errno != EINTR) {
-        gpr_asprintf(&err_msg,
-                     "epoll_wait() epoll fd: %d failed with error: %d (%s)",
-                     epoll_fd, errno, strerror(errno));
-        append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
-      } else {
-        /* We were interrupted. Save an interation by doing a zero timeout
-           epoll_wait to see if there are any other events of interest */
-        GRPC_POLLING_TRACE(
-            "pollset_work: pollset: %p, worker: %p received kick",
-            (void *)pollset, (void *)worker);
-        ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
-      }
+  GRPC_SCHEDULING_START_BLOCKING_REGION;
+  ep_rv =
+      epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
+  GRPC_SCHEDULING_END_BLOCKING_REGION;
+  if (ep_rv < 0) {
+    if (errno != EINTR) {
+      gpr_asprintf(&err_msg,
+                   "epoll_wait() epoll fd: %d failed with error: %d (%s)",
+                   epoll_fd, errno, strerror(errno));
+      append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+    } else {
+      /* We were interrupted. Save an interation by doing a zero timeout
+         epoll_wait to see if there are any other events of interest */
+      GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
+                         (void *)pollset, (void *)worker);
+      ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
     }
+  }
 
 #ifdef GRPC_TSAN
-    /* See the definition of g_poll_sync for more details */
-    gpr_atm_acq_load(&g_epoll_sync);
+  /* See the definition of g_poll_sync for more details */
+  gpr_atm_acq_load(&g_epoll_sync);
 #endif /* defined(GRPC_TSAN) */
 
-    for (int i = 0; i < ep_rv; ++i) {
-      void *data_ptr = ep_ev[i].data.ptr;
-      if (data_ptr == &pi->workqueue_wakeup_fd) {
-        append_error(error,
-                     grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
-                     err_desc);
-        maybe_do_workqueue_work(exec_ctx, pi);
-      } else if (data_ptr == &polling_island_wakeup_fd) {
-        GRPC_POLLING_TRACE(
-            "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
-            "%d) got merged",
-            (void *)pollset, (void *)worker, epoll_fd);
-        /* This means that our polling island is merged with a different
-           island. We do not have to do anything here since the subsequent call
-           to the function pollset_work_and_unlock() will pick up the correct
-           epoll_fd */
-      } else {
-        grpc_fd *fd = data_ptr;
-        int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
-        int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
-        int write_ev = ep_ev[i].events & EPOLLOUT;
-        if (read_ev || cancel) {
-          fd_become_readable(exec_ctx, fd, pollset);
-        }
-        if (write_ev || cancel) {
-          fd_become_writable(exec_ctx, fd);
-        }
+  for (int i = 0; i < ep_rv; ++i) {
+    void *data_ptr = ep_ev[i].data.ptr;
+    if (data_ptr == &polling_island_wakeup_fd) {
+      GRPC_POLLING_TRACE(
+          "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
+          "%d) got merged",
+          (void *)pollset, (void *)worker, epoll_fd);
+      /* This means that our polling island is merged with a different
+         island. We do not have to do anything here since the subsequent call
+         to the function pollset_work_and_unlock() will pick up the correct
+         epoll_fd */
+    } else {
+      grpc_fd *fd = data_ptr;
+      int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+      int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+      int write_ev = ep_ev[i].events & EPOLLOUT;
+      if (read_ev || cancel) {
+        fd_become_readable(exec_ctx, fd, pollset);
+      }
+      if (write_ev || cancel) {
+        fd_become_writable(exec_ctx, fd);
       }
     }
-
-    g_current_thread_polling_island = NULL;
-    gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
   }
 
+  g_current_thread_polling_island = NULL;
+  gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+
   GPR_ASSERT(pi != NULL);
 
   /* Before leaving, release the extra ref we added to the polling island. It
@@ -1864,7 +1690,6 @@
     .fd_notify_on_read = fd_notify_on_read,
     .fd_notify_on_write = fd_notify_on_write,
     .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-    .fd_get_workqueue = fd_get_workqueue,
 
     .pollset_init = pollset_init,
     .pollset_shutdown = pollset_shutdown,
@@ -1882,10 +1707,6 @@
     .pollset_set_add_fd = pollset_set_add_fd,
     .pollset_set_del_fd = pollset_set_del_fd,
 
-    .workqueue_ref = workqueue_ref,
-    .workqueue_unref = workqueue_unref,
-    .workqueue_scheduler = workqueue_scheduler,
-
     .shutdown_engine = shutdown_engine,
 };