Merge branch 'master' into cq_limited_pollers
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7ac2334..a56f81d 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -195,7 +195,10 @@
 
   struct SyncServerSettings {
     SyncServerSettings()
-        : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
+        : num_cqs(gpr_cpu_num_cores()),
+          min_pollers(1),
+          max_pollers(2),
+          cq_timeout_msec(10000) {}
 
     // Number of server completion queues to create to listen to incoming RPCs.
     int num_cqs;
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index e603a75..64ea7b1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -40,6 +40,7 @@
 
 #include <assert.h>
 #include <errno.h>
+#include <limits.h>
 #include <poll.h>
 #include <pthread.h>
 #include <signal.h>
@@ -62,6 +63,7 @@
 #include "src/core/lib/iomgr/workqueue.h"
 #include "src/core/lib/profiling/timers.h"
 #include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/env.h"
 
 /* TODO: sreek - Move this to init.c and initialize this like other tracers. */
 static int grpc_polling_trace = 0; /* Disabled by default */
@@ -73,6 +75,10 @@
 /* Uncomment the following to enable extra checks on poll_object operations */
 /* #define PO_DEBUG */
 
+/* The maximum number of polling threads per polling island. By default no
+   limit */
+static int g_max_pollers_per_pi = INT_MAX;
+
 static int grpc_wakeup_signal = -1;
 static bool is_grpc_wakeup_signal_initialized = false;
 
@@ -195,6 +201,11 @@
 
 #endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
 
+typedef struct worker_node {
+  struct worker_node *next;
+  struct worker_node *prev;
+} worker_node;
+
 /* This is also used as grpc_workqueue (by directly casing it) */
 typedef struct polling_island {
   grpc_closure_scheduler workqueue_scheduler;
@@ -229,6 +240,10 @@
   /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
   grpc_wakeup_fd workqueue_wakeup_fd;
 
+  /* The list of workers waiting to do polling on this polling island */
+  gpr_mu worker_list_mu;
+  worker_node worker_list_head;
+
   /* The fd of the underlying epoll set */
   int epoll_fd;
 
@@ -241,14 +256,24 @@
 /*******************************************************************************
  * Pollset Declarations
  */
+#define WORKER_FROM_WORKER_LIST_NODE(p)          \
+  (struct grpc_pollset_worker *)(((char *)(p)) - \
+                                 offsetof(grpc_pollset_worker, pi_list_link))
 struct grpc_pollset_worker {
   /* Thread id of this worker */
   pthread_t pt_id;
 
   /* Used to prevent a worker from getting kicked multiple times */
   gpr_atm is_kicked;
+
   struct grpc_pollset_worker *next;
   struct grpc_pollset_worker *prev;
+
+  /* Indicates if it is this worker's turn to do epoll */
+  gpr_atm is_polling_turn;
+
+  /* Node in the polling island's worker list. */
+  worker_node pi_list_link;
 };
 
 struct grpc_pollset {
@@ -392,7 +417,47 @@
   }
 }
 
-/* The caller is expected to hold pi->mu lock before calling this function */
+static void worker_node_init(worker_node *node) {
+  node->next = node->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static void push_back_worker_node(worker_node *head, worker_node *node) {
+  node->next = head;
+  node->prev = head->prev;
+  head->prev->next = node;
+  head->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static void remove_worker_node(worker_node *node) {
+  node->next->prev = node->prev;
+  node->prev->next = node->next;
+  /* If node's next and prev point to itself, the node is considered detached
+   * from the list*/
+  node->next = node->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static worker_node *pop_front_worker_node(worker_node *head) {
+  worker_node *node = head->next;
+  if (node != head) {
+    remove_worker_node(node);
+  } else {
+    node = NULL;
+  }
+
+  return node;
+}
+
+/* Returns true if the node's next and prev are pointing to itself (which
+   indicates that the node is not in the list */
+static bool is_worker_node_detached(worker_node *node) {
+  return (node->next == node->prev && node->next == node);
+}
+
+/* The caller is expected to hold pi->mu lock before calling this function
+ */
 static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
                                           size_t fd_count, bool add_fd_refs,
                                           grpc_error **error) {
@@ -546,6 +611,9 @@
   gpr_atm_rel_store(&pi->poller_count, 0);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
 
+  gpr_mu_init(&pi->worker_list_mu);
+  worker_node_init(&pi->worker_list_head);
+
   if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
                     err_desc)) {
     goto done;
@@ -584,6 +652,9 @@
   gpr_mpscq_destroy(&pi->workqueue_items);
   gpr_mu_destroy(&pi->mu);
   grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
+  gpr_mu_destroy(&pi->worker_list_mu);
+  GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
+
   gpr_free(pi->fds);
   gpr_free(pi);
 }
@@ -1102,6 +1173,7 @@
 GPR_TLS_DECL(g_current_thread_worker);
 static __thread bool g_initialized_sigmask;
 static __thread sigset_t g_orig_sigmask;
+static __thread sigset_t g_wakeup_sig_set;
 
 static void sig_handler(int sig_num) {
 #ifdef GRPC_EPOLL_DEBUG
@@ -1109,6 +1181,14 @@
 #endif
 }
 
+static void pollset_worker_init(grpc_pollset_worker *worker) {
+  worker->pt_id = pthread_self();
+  worker->next = worker->prev = NULL;
+  gpr_atm_no_barrier_store(&worker->is_kicked, (gpr_atm)0);
+  gpr_atm_no_barrier_store(&worker->is_polling_turn, (gpr_atm)0);
+  worker_node_init(&worker->pi_list_link);
+}
+
 static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
 
 /* Global state management */
@@ -1125,11 +1205,12 @@
   gpr_tls_destroy(&g_current_thread_worker);
 }
 
-static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
+static grpc_error *worker_kick(grpc_pollset_worker *worker,
+                               gpr_atm *is_kicked) {
   grpc_error *err = GRPC_ERROR_NONE;
 
   /* Kick the worker only if it was not already kicked */
-  if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
+  if (gpr_atm_no_barrier_cas(is_kicked, (gpr_atm)0, (gpr_atm)1)) {
     GRPC_POLLING_TRACE(
         "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
         (void *)worker, (long int)worker->pt_id);
@@ -1141,6 +1222,14 @@
   return err;
 }
 
+static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
+  return worker_kick(worker, &worker->is_kicked);
+}
+
+static grpc_error *poller_kick(grpc_pollset_worker *worker) {
+  return worker_kick(worker, &worker->is_polling_turn);
+}
+
 /* Return 1 if the pollset has active threads in pollset_work (pollset must
  * be locked) */
 static int pollset_has_workers(grpc_pollset *p) {
@@ -1246,6 +1335,22 @@
   pollset->shutdown_done = NULL;
 }
 
+/* Convert millis to timespec (clock-type is assumed to be GPR_TIMESPAN) */
+static struct timespec millis_to_timespec(int millis) {
+  struct timespec linux_ts;
+  gpr_timespec gpr_ts;
+
+  if (millis == -1) {
+    gpr_ts = gpr_inf_future(GPR_TIMESPAN);
+  } else {
+    gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN);
+  }
+
+  linux_ts.tv_sec = (time_t)gpr_ts.tv_sec;
+  linux_ts.tv_nsec = gpr_ts.tv_nsec;
+  return linux_ts;
+}
+
 /* Convert a timespec to milliseconds:
    - Very small or negative poll times are clamped to zero to do a non-blocking
      poll (which becomes spin polling)
@@ -1364,35 +1469,200 @@
   return false;
 }
 
+/* NOTE: This function may modify 'now' */
+static bool acquire_polling_lease(grpc_pollset_worker *worker,
+                                  polling_island *pi, gpr_timespec deadline,
+                                  gpr_timespec *now) {
+  bool is_lease_acquired = false;
+
+  gpr_mu_lock(&pi->worker_list_mu);  //  LOCK
+  long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+
+  if (num_pollers >= g_max_pollers_per_pi) {
+    push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link);
+    gpr_mu_unlock(&pi->worker_list_mu);  // UNLOCK
+
+    bool is_timeout = false;
+    int ret;
+    int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now);
+    if (timeout_ms == -1) {
+      ret = sigwaitinfo(&g_wakeup_sig_set, NULL);
+    } else {
+      struct timespec sigwait_timeout = millis_to_timespec(timeout_ms);
+      GRPC_SCHEDULING_START_BLOCKING_REGION;
+      ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout);
+      GRPC_SCHEDULING_END_BLOCKING_REGION;
+    }
+
+    if (ret == -1) {
+      if (errno == EAGAIN) {
+        is_timeout = true;
+      } else {
+        /* NOTE: This should not happen. If we see these log messages, it means
+           we are most likely doing something incorrect in the setup * needed
+           for sigwaitinfo/sigtimedwait */
+        gpr_log(GPR_ERROR,
+                "sigtimedwait failed with retcode: %d (timeout_ms: %d)", errno,
+                timeout_ms);
+      }
+    }
+
+    /* Did the worker come out of sigtimedwait due to a thread that just
+       exited epoll and kicking it (in release_polling_lease function). */
+    bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn);
+
+    /* Did the worker come out of sigtimedwait due to a thread alerting it that
+       some completion event was (likely) available in the completion queue */
+    bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked);
+
+    if (is_kicked || is_timeout) {
+      *now = deadline; /* Essentially make the epoll timeout = 0 */
+    } else if (is_polling_turn) {
+      *now = gpr_now(GPR_CLOCK_MONOTONIC); /* Reduce the epoll timeout */
+    }
+
+    gpr_mu_lock(&pi->worker_list_mu);  // LOCK
+    /* The node might have already been removed from the list by the poller
+       that kicked this. However it is safe to call 'remove_worker_node' on
+       an already detached node */
+    remove_worker_node(&worker->pi_list_link);
+    /* It is important to read the num_pollers again under the lock so that we
+     * have the latest num_pollers value that doesn't change while we are doing
+     * the "(num_pollers < g_max_pollers_per_pi)" a a few lines below */
+    num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+  }
+
+  if (num_pollers < g_max_pollers_per_pi) {
+    gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+    is_lease_acquired = true;
+  }
+
+  gpr_mu_unlock(&pi->worker_list_mu);  // UNLOCK
+  return is_lease_acquired;
+}
+
+static void release_polling_lease(polling_island *pi, grpc_error **error) {
+  gpr_mu_lock(&pi->worker_list_mu);
+
+  gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+  worker_node *node = pop_front_worker_node(&pi->worker_list_head);
+  if (node != NULL) {
+    grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node);
+    append_error(error, poller_kick(next_worker), "poller kick error");
+  }
+
+  gpr_mu_unlock(&pi->worker_list_mu);
+}
+
 #define GRPC_EPOLL_MAX_EVENTS 100
+static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
+                                   grpc_pollset *pollset, polling_island *pi,
+                                   grpc_pollset_worker *worker,
+                                   gpr_timespec now, gpr_timespec deadline,
+                                   sigset_t *sig_mask, grpc_error **error) {
+  /* Only g_max_pollers_per_pi threads can be doing polling in parallel.
+     If we cannot get a lease, we cannot continue to do epoll_pwait() */
+  if (!acquire_polling_lease(worker, pi, deadline, &now)) {
+    return;
+  }
+
+  struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
+  int ep_rv;
+  char *err_msg;
+  const char *err_desc = "pollset_work_and_unlock";
+
+  /* timeout_ms is the time between 'now' and 'deadline' */
+  int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
+
+  GRPC_SCHEDULING_START_BLOCKING_REGION;
+  ep_rv =
+      epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
+  GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+  /* Give back the lease right away so that some other thread can enter */
+  release_polling_lease(pi, error);
+
+  if (ep_rv < 0) {
+    if (errno != EINTR) {
+      gpr_asprintf(&err_msg,
+                   "epoll_wait() epoll fd: %d failed with error: %d (%s)",
+                   epoll_fd, errno, strerror(errno));
+      append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+    } else {
+      /* We were interrupted. Save an interation by doing a zero timeout
+         epoll_wait to see if there are any other events of interest */
+      GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
+                         (void *)pollset, (void *)worker);
+      ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+    }
+  }
+
+#ifdef GRPC_TSAN
+  /* See the definition of g_poll_sync for more details */
+  gpr_atm_acq_load(&g_epoll_sync);
+#endif /* defined(GRPC_TSAN) */
+
+  for (int i = 0; i < ep_rv; ++i) {
+    void *data_ptr = ep_ev[i].data.ptr;
+    if (data_ptr == &global_wakeup_fd) {
+      grpc_timer_consume_kick();
+      append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+                   err_desc);
+    } else if (data_ptr == &pi->workqueue_wakeup_fd) {
+      append_error(error,
+                   grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
+                   err_desc);
+      maybe_do_workqueue_work(exec_ctx, pi);
+    } else if (data_ptr == &polling_island_wakeup_fd) {
+      GRPC_POLLING_TRACE(
+          "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
+          "%d) got merged",
+          (void *)pollset, (void *)worker, epoll_fd);
+      /* This means that our polling island is merged with a different
+         island. We do not have to do anything here since the subsequent call
+         to the function pollset_work_and_unlock() will pick up the correct
+         epoll_fd */
+    } else {
+      grpc_fd *fd = data_ptr;
+      int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+      int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+      int write_ev = ep_ev[i].events & EPOLLOUT;
+      if (read_ev || cancel) {
+        fd_become_readable(exec_ctx, fd, pollset);
+      }
+      if (write_ev || cancel) {
+        fd_become_writable(exec_ctx, fd);
+      }
+    }
+  }
+}
+
 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
 static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
                                     grpc_pollset *pollset,
-                                    grpc_pollset_worker *worker, int timeout_ms,
+                                    grpc_pollset_worker *worker,
+                                    gpr_timespec now, gpr_timespec deadline,
                                     sigset_t *sig_mask, grpc_error **error) {
-  struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
   int epoll_fd = -1;
-  int ep_rv;
   polling_island *pi = NULL;
-  char *err_msg;
-  const char *err_desc = "pollset_work_and_unlock";
   GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
 
   /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
      latest polling island pointed by pollset->po.pi
 
-     Since epoll_fd is immutable, we can read it without obtaining the polling
-     island lock. There is however a possibility that the polling island (from
-     which we got the epoll_fd) got merged with another island while we are
-     in this function. This is still okay because in such a case, we will wakeup
-     right-away from epoll_wait() and pick up the latest polling_island the next
-     this function (i.e pollset_work_and_unlock()) is called */
+     Since epoll_fd is immutable, it is safe to read it without a lock on the
+     polling island. There is however a possibility that the polling island from
+     which we got the epoll_fd, got merged with another island in the meantime.
+     This is okay because in such a case, we will wakeup right-away from
+     epoll_pwait() (because any merge will poison the old polling island's epoll
+     set 'polling_island_wakeup_fd') and then pick up the latest polling_island
+     the next time this function - pollset_work_and_unlock()) is called */
 
   if (pollset->po.pi == NULL) {
     pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
     if (pollset->po.pi == NULL) {
       GPR_TIMER_END("pollset_work_and_unlock", 0);
-      return; /* Fatal error. We cannot continue */
+      return; /* Fatal error. Cannot continue */
     }
 
     PI_ADD_REF(pollset->po.pi, "ps");
@@ -1423,70 +1693,10 @@
      the completion queue, so there's no need to poll... so we skip that and
      redo the complete loop to verify */
   if (!maybe_do_workqueue_work(exec_ctx, pi)) {
-    gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
     g_current_thread_polling_island = pi;
-
-    GRPC_SCHEDULING_START_BLOCKING_REGION;
-    ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
-                        sig_mask);
-    GRPC_SCHEDULING_END_BLOCKING_REGION;
-    if (ep_rv < 0) {
-      if (errno != EINTR) {
-        gpr_asprintf(&err_msg,
-                     "epoll_wait() epoll fd: %d failed with error: %d (%s)",
-                     epoll_fd, errno, strerror(errno));
-        append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
-      } else {
-        /* We were interrupted. Save an interation by doing a zero timeout
-           epoll_wait to see if there are any other events of interest */
-        GRPC_POLLING_TRACE(
-            "pollset_work: pollset: %p, worker: %p received kick",
-            (void *)pollset, (void *)worker);
-        ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
-      }
-    }
-
-#ifdef GRPC_TSAN
-    /* See the definition of g_poll_sync for more details */
-    gpr_atm_acq_load(&g_epoll_sync);
-#endif /* defined(GRPC_TSAN) */
-
-    for (int i = 0; i < ep_rv; ++i) {
-      void *data_ptr = ep_ev[i].data.ptr;
-      if (data_ptr == &global_wakeup_fd) {
-        grpc_timer_consume_kick();
-        append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
-                     err_desc);
-      } else if (data_ptr == &pi->workqueue_wakeup_fd) {
-        append_error(error,
-                     grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
-                     err_desc);
-        maybe_do_workqueue_work(exec_ctx, pi);
-      } else if (data_ptr == &polling_island_wakeup_fd) {
-        GRPC_POLLING_TRACE(
-            "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
-            "%d) got merged",
-            (void *)pollset, (void *)worker, epoll_fd);
-        /* This means that our polling island is merged with a different
-           island. We do not have to do anything here since the subsequent call
-           to the function pollset_work_and_unlock() will pick up the correct
-           epoll_fd */
-      } else {
-        grpc_fd *fd = data_ptr;
-        int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
-        int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
-        int write_ev = ep_ev[i].events & EPOLLOUT;
-        if (read_ev || cancel) {
-          fd_become_readable(exec_ctx, fd, pollset);
-        }
-        if (write_ev || cancel) {
-          fd_become_writable(exec_ctx, fd);
-        }
-      }
-    }
-
+    pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now,
+                           deadline, sig_mask, error);
     g_current_thread_polling_island = NULL;
-    gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
   }
 
   GPR_ASSERT(pi != NULL);
@@ -1510,14 +1720,9 @@
                                 gpr_timespec now, gpr_timespec deadline) {
   GPR_TIMER_BEGIN("pollset_work", 0);
   grpc_error *error = GRPC_ERROR_NONE;
-  int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
-
-  sigset_t new_mask;
 
   grpc_pollset_worker worker;
-  worker.next = worker.prev = NULL;
-  worker.pt_id = pthread_self();
-  gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
+  pollset_worker_init(&worker);
 
   if (worker_hdl) *worker_hdl = &worker;
 
@@ -1551,9 +1756,9 @@
        misses acting on a kick */
 
     if (!g_initialized_sigmask) {
-      sigemptyset(&new_mask);
-      sigaddset(&new_mask, grpc_wakeup_signal);
-      pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
+      sigemptyset(&g_wakeup_sig_set);
+      sigaddset(&g_wakeup_sig_set, grpc_wakeup_signal);
+      pthread_sigmask(SIG_BLOCK, &g_wakeup_sig_set, &g_orig_sigmask);
       sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
       g_initialized_sigmask = true;
       /* new_mask:       The new thread mask which blocks 'grpc_wakeup_signal'.
@@ -1568,7 +1773,7 @@
 
     push_front_worker(pollset, &worker); /* Add worker to pollset */
 
-    pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
+    pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline,
                             &g_orig_sigmask, &error);
     grpc_exec_ctx_flush(exec_ctx);
 
@@ -1921,6 +2126,24 @@
   return true;
 }
 
+/* This is mainly for testing purposes. Checks to see if environment variable
+ * GRPC_MAX_POLLERS_PER_PI is set and if so, assigns that value to
+ * g_max_pollers_per_pi (any negative value is considered INT_MAX) */
+static void set_max_pollers_per_island() {
+  char *s = gpr_getenv("GRPC_MAX_POLLERS_PER_PI");
+  if (s) {
+    g_max_pollers_per_pi = (int)strtol(s, NULL, 10);
+    if (g_max_pollers_per_pi < 0) {
+      g_max_pollers_per_pi = INT_MAX;
+    }
+  } else {
+    g_max_pollers_per_pi = INT_MAX;
+  }
+
+  gpr_log(GPR_INFO, "Max number of pollers per polling island: %d",
+          g_max_pollers_per_pi);
+}
+
 const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
   /* If use of signals is disabled, we cannot use epoll engine*/
   if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
@@ -1939,6 +2162,8 @@
     grpc_use_signal(SIGRTMIN + 6);
   }
 
+  set_max_pollers_per_island();
+
   fd_global_init();
 
   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {