Epoll1 Work Distribution: Parallelize processing epoll events across multiple threads
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 90e0ce3..6fc3f46 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -48,7 +48,54 @@
 #include "src/core/lib/support/string.h"
 
 static grpc_wakeup_fd global_wakeup_fd;
-static int g_epfd;
+
+/*******************************************************************************
+ * Singleton epoll set related fields
+ */
+
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 5
+
+/* Note: Since fields in this struct are only modified by the designated poller,
+   we do not need any locks to protect the struct */
+typedef struct epoll_set {
+  int epfd;
+
+  /* The epoll_events after the last call to epoll_wait() */
+  struct epoll_event events[MAX_EPOLL_EVENTS];
+
+  /* The number of epoll_events after the last call to epoll_wait() */
+  int num_events;
+
+  /* Index of the first event in epoll_events that has to be processed. This
+   * field is only valid if num_events > 0 */
+  int cursor;
+} epoll_set;
+
+/* The global singleton epoll set */
+static epoll_set g_epoll_set;
+
+/* Must be called *only* once */
+static bool epoll_set_init() {
+  g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+  if (g_epoll_set.epfd < 0) {
+    gpr_log(GPR_ERROR, "epoll unavailable");
+    return false;
+  }
+
+  gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epoll_set.epfd);
+  g_epoll_set.num_events = 0;
+  g_epoll_set.cursor = 0;
+  return true;
+}
+
+/* epoll_set_init() MUST be called before calling this. */
+static void epoll_set_shutdown() {
+  if (g_epoll_set.epfd >= 0) {
+    close(g_epoll_set.epfd);
+    g_epoll_set.epfd = -1;
+  }
+}
 
 /*******************************************************************************
  * Fd Declarations
@@ -122,7 +169,7 @@
   bool kicked_without_poller;
 
   /* Set to true if the pollset is observed to have no workers available to
-   * poll */
+     poll */
   bool seen_inactive;
   bool shutting_down;             /* Is the pollset shutting down ? */
   grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@@ -228,7 +275,7 @@
 
   struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
                            .data.ptr = new_fd};
-  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
     gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
   }
 
@@ -326,7 +373,10 @@
 
 GPR_TLS_DECL(g_current_thread_pollset);
 GPR_TLS_DECL(g_current_thread_worker);
+
+/* The designated poller */
 static gpr_atm g_active_poller;
+
 static pollset_neighbourhood *g_neighbourhoods;
 static size_t g_num_neighbourhoods;
 
@@ -380,7 +430,8 @@
   if (err != GRPC_ERROR_NONE) return err;
   struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
                            .data.ptr = &global_wakeup_fd};
-  if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+  if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
+                &ev) != 0) {
     return GRPC_OS_ERROR(errno, "epoll_ctl");
   }
   g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@@ -486,8 +537,6 @@
   pollset_maybe_finish_shutdown(exec_ctx, pollset);
 }
 
-#define MAX_EPOLL_EVENTS 100
-
 static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
                                            gpr_timespec now) {
   gpr_timespec timeout;
@@ -506,40 +555,39 @@
   return millis >= 1 ? millis : 1;
 }
 
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
-                                 gpr_timespec now, gpr_timespec deadline) {
-  struct epoll_event events[MAX_EPOLL_EVENTS];
-  static const char *err_desc = "pollset_poll";
+/* Process the epoll events found by do_epoll_wait() function.
+   - g_epoll_set.cursor points to the index of the first event to be processed
+   - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
+     updates the g_epoll_set.cursor
 
-  int timeout = poll_deadline_to_millis_timeout(deadline, now);
-
-  if (timeout != 0) {
-    GRPC_SCHEDULING_START_BLOCKING_REGION;
-  }
-  int r;
-  do {
-    r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
-  } while (r < 0 && errno == EINTR);
-  if (timeout != 0) {
-    GRPC_SCHEDULING_END_BLOCKING_REGION;
-  }
-
-  if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
-
+   NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
+   called by g_active_poller thread. So there is no need for synchronization
+   when accessing fields in g_epoll_set */
+static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
+                                        grpc_pollset *pollset) {
+  static const char *err_desc = "process_events";
   grpc_error *error = GRPC_ERROR_NONE;
-  for (int i = 0; i < r; i++) {
-    void *data_ptr = events[i].data.ptr;
+
+  for (int idx = 0; (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) &&
+                    g_epoll_set.cursor != g_epoll_set.num_events;
+       idx++) {
+    int c = g_epoll_set.cursor++;
+    struct epoll_event *ev = &g_epoll_set.events[c];
+    void *data_ptr = ev->data.ptr;
+
     if (data_ptr == &global_wakeup_fd) {
       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
                    err_desc);
     } else {
       grpc_fd *fd = (grpc_fd *)(data_ptr);
-      bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
-      bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
-      bool write_ev = (events[i].events & EPOLLOUT) != 0;
+      bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+      bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+      bool write_ev = (ev->events & EPOLLOUT) != 0;
+
       if (read_ev || cancel) {
         fd_become_readable(exec_ctx, fd, pollset);
       }
+
       if (write_ev || cancel) {
         fd_become_writable(exec_ctx, fd);
       }
@@ -549,6 +597,40 @@
   return error;
 }
 
+/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
+   "process" any of the events yet; that is done in process_epoll_events().
+   *See process_epoll_events() function for more details.
+
+   NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
+   (i.e the designated poller thread) will be calling this function. So there is
+   no need for any synchronization when accesing fields in g_epoll_set */
+static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
+                                 gpr_timespec now, gpr_timespec deadline) {
+  int r;
+  int timeout = poll_deadline_to_millis_timeout(deadline, now);
+  if (timeout != 0) {
+    GRPC_SCHEDULING_START_BLOCKING_REGION;
+  }
+  do {
+    r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
+                   timeout);
+  } while (r < 0 && errno == EINTR);
+  if (timeout != 0) {
+    GRPC_SCHEDULING_END_BLOCKING_REGION;
+  }
+
+  if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+  if (GRPC_TRACER_ON(grpc_polling_trace)) {
+    gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
+  }
+
+  g_epoll_set.num_events = r;
+  g_epoll_set.cursor = 0;
+
+  return GRPC_ERROR_NONE;
+}
+
 static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
                          grpc_pollset_worker **worker_hdl, gpr_timespec *now,
                          gpr_timespec deadline) {
@@ -801,30 +883,53 @@
    The function pollset_work() may temporarily release the lock (pollset->po.mu)
    during the course of its execution but it will always re-acquire the lock and
    ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
                                 grpc_pollset_worker **worker_hdl,
                                 gpr_timespec now, gpr_timespec deadline) {
   grpc_pollset_worker worker;
   grpc_error *error = GRPC_ERROR_NONE;
   static const char *err_desc = "pollset_work";
-  if (pollset->kicked_without_poller) {
-    pollset->kicked_without_poller = false;
+  if (ps->kicked_without_poller) {
+    ps->kicked_without_poller = false;
     return GRPC_ERROR_NONE;
   }
-  if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
-    gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+
+  if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
+    gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
     gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
-    GPR_ASSERT(!pollset->shutting_down);
-    GPR_ASSERT(!pollset->seen_inactive);
-    gpr_mu_unlock(&pollset->mu);
-    append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
-                 err_desc);
-    gpr_mu_lock(&pollset->mu);
+    GPR_ASSERT(!ps->shutting_down);
+    GPR_ASSERT(!ps->seen_inactive);
+
+    gpr_mu_unlock(&ps->mu); /* unlock */
+
+    /* This is the designated polling thread at this point and should ideally do
+       polling. However, if there are unprocessed events left from a previous
+       call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
+       process the pending epoll events.
+
+       The reason for decoupling do_epoll_wait and process_epoll_events is to
+       better distrubute the work (i.e handling epoll events) across multiple
+       threads
+
+       process_epoll_events() returns very quickly: It just queues the work on
+       exec_ctx but does not execute it (the actual exectution or more
+       accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
+       a designated poller). So we are not waiting long periods without a
+       designated poller */
+    if (g_epoll_set.cursor == g_epoll_set.num_events) {
+      append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
+                   err_desc);
+    }
+    append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
+
+    gpr_mu_lock(&ps->mu); /* lock */
+
     gpr_tls_set(&g_current_thread_worker, 0);
   } else {
-    gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+    gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
   }
-  end_worker(exec_ctx, pollset, &worker, worker_hdl);
+  end_worker(exec_ctx, ps, &worker, worker_hdl);
+
   gpr_tls_set(&g_current_thread_pollset, 0);
   return error;
 }
@@ -1006,7 +1111,7 @@
 static void shutdown_engine(void) {
   fd_global_shutdown();
   pollset_global_shutdown();
-  close(g_epfd);
+  epoll_set_shutdown();
 }
 
 static const grpc_event_engine_vtable vtable = {
@@ -1041,7 +1146,8 @@
 };
 
 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
+ * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
+ * support is available */
 const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
   /* TODO(sreek): Temporarily disable this poller unless explicitly requested
    * via GRPC_POLL_STRATEGY */
@@ -1053,22 +1159,18 @@
     return NULL;
   }
 
-  g_epfd = epoll_create1(EPOLL_CLOEXEC);
-  if (g_epfd < 0) {
-    gpr_log(GPR_ERROR, "epoll unavailable");
+  if (!epoll_set_init()) {
     return NULL;
   }
 
   fd_global_init();
 
   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
-    close(g_epfd);
     fd_global_shutdown();
+    epoll_set_shutdown();
     return NULL;
   }
 
-  gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
-
   return &vtable;
 }