Do not close epoll_fd while there are any pollers and add the ability to
wake up all pollers when an island is merged
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 69ab665..3a3c136 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -190,9 +190,18 @@
 };
 
 /*******************************************************************************
- * Polling-island Definitions
+ * Polling island Definitions
  */
 
+/* The wakeup fd that is used to wake up all threads in a Polling island. This
+   is useful in the polling island merge operation where we need to wakeup all
+   the threads currently polling the smaller polling island (so that they can
+   start polling the new/merged polling island)
+
+   NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
+   threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
+static grpc_wakeup_fd polling_island_wakeup_fd;
+
 /* Polling island freelist */
 static gpr_mu g_pi_freelist_mu;
 static polling_island *g_pi_freelist = NULL;
@@ -232,6 +241,25 @@
   }
 }
 
+/* The caller is expected to hold pi->mu before calling this */
+static void polling_island_add_wakeup_fd_locked(polling_island *pi,
+                                                grpc_wakeup_fd *wakeup_fd) {
+  struct epoll_event ev;
+  int err;
+
+  ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+  ev.data.ptr = wakeup_fd;
+  err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
+                  GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
+  if (err < 0) {
+    gpr_log(GPR_ERROR,
+            "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
+            ". Error: %s",
+            GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
+            strerror(errno));
+  }
+}
+
 /* The caller is expected to hold pi->mu lock before calling this function */
 static void polling_island_remove_all_fds_locked(polling_island *pi,
                                                  bool remove_fd_refs) {
@@ -283,8 +311,6 @@
 static polling_island *polling_island_create(grpc_fd *initial_fd,
                                              int initial_ref_cnt) {
   polling_island *pi = NULL;
-  struct epoll_event ev;
-  int err;
 
   /* Try to get one from the polling island freelist */
   gpr_mu_lock(&g_pi_freelist_mu);
@@ -311,17 +337,7 @@
   }
   GPR_ASSERT(pi->epoll_fd >= 0);
 
-  ev.events = (uint32_t)(EPOLLIN | EPOLLET);
-  ev.data.ptr = NULL;
-  err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
-                  GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
-  if (err < 0) {
-    gpr_log(GPR_ERROR,
-            "Failed to add grpc_global_wake_up_fd (%d) to the epoll set "
-            "(epoll_fd: %d) with error: %s",
-            GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
-            strerror(errno));
-  }
+  polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
 
   pi->ref_cnt = initial_ref_cnt;
   pi->merged_to = NULL;
@@ -496,13 +512,15 @@
     GPR_SWAP(polling_island *, p, q);
   }
 
-  /* "Merge" p with q i.e move all the fds from p (the polling_island with fewer
-     fds) to q.
-     Note: Not altering the ref counts on the affected fds here because they
-     would effectively remain unchanged */
+  /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
+      )Note that the refcounts on the fds being moved will not change here. This
+      is why the last parameter in the following two functions is 'false') */
   polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
   polling_island_remove_all_fds_locked(p, false);
 
+  /* Wakeup all the pollers (if any) on p so that they can pickup this change */
+  polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
+
   /* The merged polling island inherits all the ref counts of the island merging
      with it */
   q->ref_cnt += p->ref_cnt;
@@ -516,6 +534,8 @@
 static void polling_island_global_init() {
   gpr_mu_init(&g_pi_freelist_mu);
   g_pi_freelist = NULL;
+  grpc_wakeup_fd_init(&polling_island_wakeup_fd);
+  grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
 }
 
 static void polling_island_global_shutdown() {
@@ -529,8 +549,9 @@
     gpr_free(g_pi_freelist);
     g_pi_freelist = next;
   }
-
   gpr_mu_destroy(&g_pi_freelist_mu);
+
+  grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
 }
 
 /*******************************************************************************
@@ -973,6 +994,7 @@
   struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
   int epoll_fd = -1;
   int ep_rv;
+  polling_island *pi = NULL;
   GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
 
   /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@@ -983,13 +1005,19 @@
      - pollset->polling_island->mu */
   gpr_mu_lock(&pollset->pi_mu);
 
-  if (pollset->polling_island == NULL) {
-    pollset->polling_island = polling_island_create(NULL, 1);
+  pi = pollset->polling_island;
+  if (pi == NULL) {
+    pi = polling_island_create(NULL, 1);
   }
 
-  pollset->polling_island =
-      polling_island_update_and_lock(pollset->polling_island, 1, 0);
-  epoll_fd = pollset->polling_island->epoll_fd;
+  /* In addition to locking the polling island, add a ref so that the island
+     does not get destroyed (which means the epoll_fd won't be closed) while
+     we are are doing an epoll_wait() on the epoll_fd */
+  pi = polling_island_update_and_lock(pi, 1, 1);
+  epoll_fd = pi->epoll_fd;
+
+  /* Update the pollset->polling_island */
+  pollset->polling_island = pi;
 
 #ifdef GRPC_EPOLL_DEBUG
   if (pollset->polling_island->fd_cnt == 0) {
@@ -1013,25 +1041,29 @@
                         sig_mask);
     if (ep_rv < 0) {
       if (errno != EINTR) {
-        /* TODO (sreek) - Do not log an error in case of bad file descriptor
-         * (A bad file descriptor here would just mean that the epoll set was
-         * merged with another epoll set and that the current epoll_fd is
-         * closed) */
         gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
       } else {
+        /* We were interrupted. Save an interation by doing a zero timeout
+           epoll_wait to see if there are any other events of interest */
         ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
       }
     }
 
     int i;
     for (i = 0; i < ep_rv; ++i) {
-      grpc_fd *fd = ep_ev[i].data.ptr;
-      int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
-      int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
-      int write_ev = ep_ev[i].events & EPOLLOUT;
-      if (fd == NULL) {
+      void *data_ptr = ep_ev[i].data.ptr;
+      if (data_ptr == &grpc_global_wakeup_fd) {
         grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+      } else if (data_ptr == &polling_island_wakeup_fd) {
+        /* This means that our polling island is merged with a different
+           island. We do not have to do anything here since the subsequent call
+           to the function pollset_work_and_unlock() will pick up the correct
+           epoll_fd */
       } else {
+        grpc_fd *fd = data_ptr;
+        int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+        int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+        int write_ev = ep_ev[i].events & EPOLLOUT;
         if (read_ev || cancel) {
           fd_become_readable(exec_ctx, fd, pollset);
         }
@@ -1041,6 +1073,21 @@
       }
     }
   } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+
+  GPR_ASSERT(pi != NULL);
+
+  /* Before leaving, release the extra ref we added to the polling island */
+  /* It is important to note that at this point 'pi' may not be the same as
+   * pollset->polling_island. This is because pollset->polling_island pointer
+   * gets updated whenever the underlying polling island is merged with another
+   * island and while we are doing epoll_wait() above, the polling island may
+   * have been merged */
+
+  /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
+   * we do not have to do this here */
+  gpr_mu_lock(&pi->mu);
+  polling_island_unref_and_unlock(pi, 1);
+
   GPR_TIMER_END("pollset_work_and_unlock", 0);
 }