Merge github.com:grpc/grpc into grpc_millis
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 26455ec..f8f8b18 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -49,7 +49,60 @@
 #include "src/core/lib/support/string.h"
 
 static grpc_wakeup_fd global_wakeup_fd;
-static int g_epfd;
+
+/*******************************************************************************
+ * Singleton epoll set related fields
+ */
+
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
+
+/* NOTE ON SYNCHRONIZATION:
+ * - Fields in this struct are only modified by the designated poller. Hence
+ *   there is no need for any locks to protect the struct.
+ * - num_events and cursor fields have to be of atomic type to provide memory
+ *   visibility guarantees only. i.e In case of multiple pollers, the designated
+ *   polling thread keeps changing; the thread that wrote these values may be
+ *   different from the thread reading the values
+ */
+typedef struct epoll_set {
+  int epfd;
+
+  /* The epoll_events after the last call to epoll_wait() */
+  struct epoll_event events[MAX_EPOLL_EVENTS];
+
+  /* The number of epoll_events after the last call to epoll_wait() */
+  gpr_atm num_events;
+
+  /* Index of the first event in epoll_events that has to be processed. This
+   * field is only valid if num_events > 0 */
+  gpr_atm cursor;
+} epoll_set;
+
+/* The global singleton epoll set */
+static epoll_set g_epoll_set;
+
+/* Must be called *only* once */
+static bool epoll_set_init() {
+  g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+  if (g_epoll_set.epfd < 0) {
+    gpr_log(GPR_ERROR, "epoll unavailable");
+    return false;
+  }
+
+  gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
+  gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
+  gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
+  return true;
+}
+
+/* epoll_set_init() MUST be called before calling this. */
+static void epoll_set_shutdown() {
+  if (g_epoll_set.epfd >= 0) {
+    close(g_epoll_set.epfd);
+    g_epoll_set.epfd = -1;
+  }
+}
 
 /*******************************************************************************
  * Fd Declarations
@@ -123,7 +176,7 @@
   bool kicked_without_poller;
 
   /* Set to true if the pollset is observed to have no workers available to
-   * poll */
+     poll */
   bool seen_inactive;
   bool shutting_down;             /* Is the pollset shutting down ? */
   grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@@ -229,7 +282,7 @@
 
   struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
                            .data.ptr = new_fd};
-  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
     gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
   }
 
@@ -238,30 +291,43 @@
 
 static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
 
-/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+/* if 'releasing_fd' is true, it means that we are going to detach the internal
+ * fd from grpc_fd structure (i.e which means we should not be calling
+ * shutdown() syscall on that fd) */
+static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+                                 grpc_error *why, bool releasing_fd) {
   if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
                              GRPC_ERROR_REF(why))) {
-    shutdown(fd->fd, SHUT_RDWR);
+    if (!releasing_fd) {
+      shutdown(fd->fd, SHUT_RDWR);
+    }
     grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
   }
   GRPC_ERROR_UNREF(why);
 }
 
+/* Might be called multiple times */
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+  fd_shutdown_internal(exec_ctx, fd, why, false);
+}
+
 static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                       grpc_closure *on_done, int *release_fd,
-                      const char *reason) {
+                      bool already_closed, const char *reason) {
   grpc_error *error = GRPC_ERROR_NONE;
+  bool is_release_fd = (release_fd != NULL);
 
   if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
-    fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
+    fd_shutdown_internal(exec_ctx, fd,
+                         GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
+                         is_release_fd);
   }
 
   /* If release_fd is not NULL, we should be relinquishing control of the file
      descriptor fd->fd (but we still own the grpc_fd structure). */
-  if (release_fd != NULL) {
+  if (is_release_fd) {
     *release_fd = fd->fd;
-  } else {
+  } else if (!already_closed) {
     close(fd->fd);
   }
 
@@ -314,7 +380,10 @@
 
 GPR_TLS_DECL(g_current_thread_pollset);
 GPR_TLS_DECL(g_current_thread_worker);
+
+/* The designated poller */
 static gpr_atm g_active_poller;
+
 static pollset_neighbourhood *g_neighbourhoods;
 static size_t g_num_neighbourhoods;
 
@@ -368,7 +437,8 @@
   if (err != GRPC_ERROR_NONE) return err;
   struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
                            .data.ptr = &global_wakeup_fd};
-  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
+                &ev) != 0) {
     return GRPC_OS_ERROR(errno, "epoll_ctl");
   }
   g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@@ -394,7 +464,14 @@
   gpr_mu_init(&pollset->mu);
   *mu = &pollset->mu;
   pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
+  pollset->reassigning_neighbourhood = false;
+  pollset->root_worker = NULL;
+  pollset->kicked_without_poller = false;
   pollset->seen_inactive = true;
+  pollset->shutting_down = false;
+  pollset->shutdown_closure = NULL;
+  pollset->begin_refs = 0;
+  pollset->next = pollset->prev = NULL;
 }
 
 static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
@@ -426,6 +503,7 @@
 }
 
 static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
+  GPR_TIMER_BEGIN("pollset_kick_all", 0);
   grpc_error *error = GRPC_ERROR_NONE;
   if (pollset->root_worker != NULL) {
     grpc_pollset_worker *worker = pollset->root_worker;
@@ -451,7 +529,7 @@
   }
   // TODO: sreek.  Check if we need to set 'kicked_without_poller' to true here
   // in the else case
-
+  GPR_TIMER_END("pollset_kick_all", 0);
   return error;
 }
 
@@ -459,6 +537,7 @@
                                           grpc_pollset *pollset) {
   if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
       pollset->begin_refs == 0) {
+    GPR_TIMER_MARK("pollset_finish_shutdown", 0);
     GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
     pollset->shutdown_closure = NULL;
   }
@@ -466,41 +545,94 @@
 
 static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                              grpc_closure *closure) {
+  GPR_TIMER_BEGIN("pollset_shutdown", 0);
   GPR_ASSERT(pollset->shutdown_closure == NULL);
   GPR_ASSERT(!pollset->shutting_down);
   pollset->shutdown_closure = closure;
   pollset->shutting_down = true;
   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
   pollset_maybe_finish_shutdown(exec_ctx, pollset);
+  GPR_TIMER_END("pollset_shutdown", 0);
 }
 
-#define MAX_EPOLL_EVENTS 100
-
 static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
                                            grpc_millis millis) {
   if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
   grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
-  if (delta > INT_MAX)
+  if (delta > INT_MAX) {
     return INT_MAX;
-  else if (delta < 0)
+  } else if (delta < 0) {
     return 0;
-  else
+  } else {
     return (int)delta;
+  }
 }
 
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+/* Process the epoll events found by do_epoll_wait() function.
+   - g_epoll_set.cursor points to the index of the first event to be processed
+   - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
+     updates the g_epoll_set.cursor
+
+   NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
+   called by g_active_poller thread. So there is no need for synchronization
+   when accessing fields in g_epoll_set */
+static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
+                                        grpc_pollset *pollset) {
+  static const char *err_desc = "process_events";
+  grpc_error *error = GRPC_ERROR_NONE;
+
+  GPR_TIMER_BEGIN("process_epoll_events", 0);
+  long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
+  long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
+  for (int idx = 0;
+       (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
+       idx++) {
+    long c = cursor++;
+    struct epoll_event *ev = &g_epoll_set.events[c];
+    void *data_ptr = ev->data.ptr;
+
+    if (data_ptr == &global_wakeup_fd) {
+      append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+                   err_desc);
+    } else {
+      grpc_fd *fd = (grpc_fd *)(data_ptr);
+      bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+      bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+      bool write_ev = (ev->events & EPOLLOUT) != 0;
+
+      if (read_ev || cancel) {
+        fd_become_readable(exec_ctx, fd, pollset);
+      }
+
+      if (write_ev || cancel) {
+        fd_become_writable(exec_ctx, fd);
+      }
+    }
+  }
+  gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
+  GPR_TIMER_END("process_epoll_events", 0);
+  return error;
+}
+
+/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
+   "process" any of the events yet; that is done in process_epoll_events().
+   *See process_epoll_events() function for more details.
+
+   NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
+   (i.e the designated poller thread) will be calling this function. So there is
+   no need for any synchronization when accesing fields in g_epoll_set */
+static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
                                  grpc_millis deadline) {
-  struct epoll_event events[MAX_EPOLL_EVENTS];
-  static const char *err_desc = "pollset_poll";
+  GPR_TIMER_BEGIN("do_epoll_wait", 0);
 
+  int r;
   int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
-
   if (timeout != 0) {
     GRPC_SCHEDULING_START_BLOCKING_REGION;
   }
-  int r;
   do {
-    r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
+    r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
+                   timeout);
   } while (r < 0 && errno == EINTR);
   if (timeout != 0) {
     GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
@@ -508,33 +640,22 @@
 
   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
 
-  grpc_error *error = GRPC_ERROR_NONE;
-  for (int i = 0; i < r; i++) {
-    void *data_ptr = events[i].data.ptr;
-    if (data_ptr == &global_wakeup_fd) {
-      append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
-                   err_desc);
-    } else {
-      grpc_fd *fd = (grpc_fd *)(data_ptr);
-      bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
-      bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
-      bool write_ev = (events[i].events & EPOLLOUT) != 0;
-      if (read_ev || cancel) {
-        fd_become_readable(exec_ctx, fd, pollset);
-      }
-      if (write_ev || cancel) {
-        fd_become_writable(exec_ctx, fd);
-      }
-    }
+  if (GRPC_TRACER_ON(grpc_polling_trace)) {
+    gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
   }
 
-  return error;
+  gpr_atm_rel_store(&g_epoll_set.num_events, r);
+  gpr_atm_rel_store(&g_epoll_set.cursor, 0);
+
+  GPR_TIMER_END("do_epoll_wait", 0);
+  return GRPC_ERROR_NONE;
 }
 
 static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                          grpc_pollset_worker *worker,
                          grpc_pollset_worker **worker_hdl,
                          grpc_millis deadline) {
+  GPR_TIMER_BEGIN("begin_worker", 0);
   if (worker_hdl != NULL) *worker_hdl = worker;
   worker->initialized_cv = false;
   SET_KICK_STATE(worker, UNKICKED);
@@ -640,14 +761,17 @@
 
   if (pollset->kicked_without_poller) {
     pollset->kicked_without_poller = false;
+    GPR_TIMER_END("begin_worker", 0);
     return false;
   }
 
+  GPR_TIMER_END("begin_worker", 0);
   return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
 }
 
 static bool check_neighbourhood_for_available_poller(
     pollset_neighbourhood *neighbourhood) {
+  GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0);
   bool found_worker = false;
   do {
     grpc_pollset *inspect = neighbourhood->active_root;
@@ -669,6 +793,7 @@
               }
               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
               if (inspect_worker->initialized_cv) {
+                GPR_TIMER_MARK("signal worker", 0);
                 gpr_cv_signal(&inspect_worker->cv);
               }
             } else {
@@ -704,12 +829,14 @@
     }
     gpr_mu_unlock(&inspect->mu);
   } while (!found_worker);
+  GPR_TIMER_END("check_neighbourhood_for_available_poller", 0);
   return found_worker;
 }
 
 static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                        grpc_pollset_worker *worker,
                        grpc_pollset_worker **worker_hdl) {
+  GPR_TIMER_BEGIN("end_worker", 0);
   if (GRPC_TRACER_ON(grpc_polling_trace)) {
     gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
   }
@@ -779,41 +906,70 @@
     pollset_maybe_finish_shutdown(exec_ctx, pollset);
   }
   GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
+  GPR_TIMER_END("end_worker", 0);
 }
 
 /* pollset->po.mu lock must be held by the caller before calling this.
    The function pollset_work() may temporarily release the lock (pollset->po.mu)
    during the course of its execution but it will always re-acquire the lock and
    ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
                                 grpc_pollset_worker **worker_hdl,
                                 grpc_millis deadline) {
   grpc_pollset_worker worker;
   grpc_error *error = GRPC_ERROR_NONE;
   static const char *err_desc = "pollset_work";
-  if (pollset->kicked_without_poller) {
-    pollset->kicked_without_poller = false;
+  GPR_TIMER_BEGIN("pollset_work", 0);
+  if (ps->kicked_without_poller) {
+    ps->kicked_without_poller = false;
+    GPR_TIMER_END("pollset_work", 0);
     return GRPC_ERROR_NONE;
   }
-  if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
-    gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+
+  if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) {
+    gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
     gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
-    GPR_ASSERT(!pollset->shutting_down);
-    GPR_ASSERT(!pollset->seen_inactive);
-    gpr_mu_unlock(&pollset->mu);
-    append_error(&error, pollset_epoll(exec_ctx, pollset, deadline), err_desc);
-    gpr_mu_lock(&pollset->mu);
+    GPR_ASSERT(!ps->shutting_down);
+    GPR_ASSERT(!ps->seen_inactive);
+
+    gpr_mu_unlock(&ps->mu); /* unlock */
+    /* This is the designated polling thread at this point and should ideally do
+       polling. However, if there are unprocessed events left from a previous
+       call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
+       process the pending epoll events.
+
+       The reason for decoupling do_epoll_wait and process_epoll_events is to
+       better distrubute the work (i.e handling epoll events) across multiple
+       threads
+
+       process_epoll_events() returns very quickly: It just queues the work on
+       exec_ctx but does not execute it (the actual exectution or more
+       accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
+       a designated poller). So we are not waiting long periods without a
+       designated poller */
+    if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
+        gpr_atm_acq_load(&g_epoll_set.num_events)) {
+      append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc);
+    }
+    append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
+
+    gpr_mu_lock(&ps->mu); /* lock */
+
     gpr_tls_set(&g_current_thread_worker, 0);
   } else {
-    gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+    gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
   }
-  end_worker(exec_ctx, pollset, &worker, worker_hdl);
+  end_worker(exec_ctx, ps, &worker, worker_hdl);
+
   gpr_tls_set(&g_current_thread_pollset, 0);
+  GPR_TIMER_END("pollset_work", 0);
   return error;
 }
 
 static grpc_error *pollset_kick(grpc_pollset *pollset,
                                 grpc_pollset_worker *specific_worker) {
+  GPR_TIMER_BEGIN("pollset_kick", 0);
+  grpc_error *ret_err = GRPC_ERROR_NONE;
   if (GRPC_TRACER_ON(grpc_polling_trace)) {
     gpr_strvec log;
     gpr_strvec_init(&log);
@@ -848,7 +1004,7 @@
         if (GRPC_TRACER_ON(grpc_polling_trace)) {
           gpr_log(GPR_ERROR, " .. kicked_without_poller");
         }
-        return GRPC_ERROR_NONE;
+        goto done;
       }
       grpc_pollset_worker *next_worker = root_worker->next;
       if (root_worker->kick_state == KICKED) {
@@ -856,13 +1012,13 @@
           gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
         }
         SET_KICK_STATE(root_worker, KICKED);
-        return GRPC_ERROR_NONE;
+        goto done;
       } else if (next_worker->kick_state == KICKED) {
         if (GRPC_TRACER_ON(grpc_polling_trace)) {
           gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
         }
         SET_KICK_STATE(next_worker, KICKED);
-        return GRPC_ERROR_NONE;
+        goto done;
       } else if (root_worker ==
                      next_worker &&  // only try and wake up a poller if
                                      // there is no next worker
@@ -872,7 +1028,8 @@
           gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
         }
         SET_KICK_STATE(root_worker, KICKED);
-        return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+        ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+        goto done;
       } else if (next_worker->kick_state == UNKICKED) {
         if (GRPC_TRACER_ON(grpc_polling_trace)) {
           gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
@@ -880,7 +1037,7 @@
         GPR_ASSERT(next_worker->initialized_cv);
         SET_KICK_STATE(next_worker, KICKED);
         gpr_cv_signal(&next_worker->cv);
-        return GRPC_ERROR_NONE;
+        goto done;
       } else if (next_worker->kick_state == DESIGNATED_POLLER) {
         if (root_worker->kick_state != DESIGNATED_POLLER) {
           if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -893,59 +1050,64 @@
           if (root_worker->initialized_cv) {
             gpr_cv_signal(&root_worker->cv);
           }
-          return GRPC_ERROR_NONE;
+          goto done;
         } else {
           if (GRPC_TRACER_ON(grpc_polling_trace)) {
             gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
                     root_worker);
           }
           SET_KICK_STATE(next_worker, KICKED);
-          return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+          ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+          goto done;
         }
       } else {
         GPR_ASSERT(next_worker->kick_state == KICKED);
         SET_KICK_STATE(next_worker, KICKED);
-        return GRPC_ERROR_NONE;
+        goto done;
       }
     } else {
       if (GRPC_TRACER_ON(grpc_polling_trace)) {
         gpr_log(GPR_ERROR, " .. kicked while waking up");
       }
-      return GRPC_ERROR_NONE;
+      goto done;
     }
   } else if (specific_worker->kick_state == KICKED) {
     if (GRPC_TRACER_ON(grpc_polling_trace)) {
       gpr_log(GPR_ERROR, " .. specific worker already kicked");
     }
-    return GRPC_ERROR_NONE;
+    goto done;
   } else if (gpr_tls_get(&g_current_thread_worker) ==
              (intptr_t)specific_worker) {
     if (GRPC_TRACER_ON(grpc_polling_trace)) {
       gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
     }
     SET_KICK_STATE(specific_worker, KICKED);
-    return GRPC_ERROR_NONE;
+    goto done;
   } else if (specific_worker ==
              (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
     if (GRPC_TRACER_ON(grpc_polling_trace)) {
       gpr_log(GPR_ERROR, " .. kick active poller");
     }
     SET_KICK_STATE(specific_worker, KICKED);
-    return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+    ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+    goto done;
   } else if (specific_worker->initialized_cv) {
     if (GRPC_TRACER_ON(grpc_polling_trace)) {
       gpr_log(GPR_ERROR, " .. kick waiting worker");
     }
     SET_KICK_STATE(specific_worker, KICKED);
     gpr_cv_signal(&specific_worker->cv);
-    return GRPC_ERROR_NONE;
+    goto done;
   } else {
     if (GRPC_TRACER_ON(grpc_polling_trace)) {
       gpr_log(GPR_ERROR, " .. kick non-waiting worker");
     }
     SET_KICK_STATE(specific_worker, KICKED);
-    return GRPC_ERROR_NONE;
+    goto done;
   }
+done:
+  GPR_TIMER_END("pollset_kick", 0);
+  return ret_err;
 }
 
 static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -989,7 +1151,7 @@
 static void shutdown_engine(void) {
   fd_global_shutdown();
   pollset_global_shutdown();
-  close(g_epfd);
+  epoll_set_shutdown();
 }
 
 static const grpc_event_engine_vtable vtable = {
@@ -1024,28 +1186,29 @@
 };
 
 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
+ * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
+ * support is available */
 const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
+  if (!explicit_request) {
+    return NULL;
+  }
+
   if (!grpc_has_wakeup_fd()) {
     return NULL;
   }
 
-  g_epfd = epoll_create1(EPOLL_CLOEXEC);
-  if (g_epfd < 0) {
-    gpr_log(GPR_ERROR, "epoll unavailable");
+  if (!epoll_set_init()) {
     return NULL;
   }
 
   fd_global_init();
 
   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
-    close(g_epfd);
     fd_global_shutdown();
+    epoll_set_shutdown();
     return NULL;
   }
 
-  gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
-
   return &vtable;
 }