Write most of the methods in the new epoll implementation
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index f257ac8..0d30bb6 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -150,28 +150,84 @@
 static gpr_mu g_pi_freelist_mu;
 static polling_island *g_pi_freelist = NULL;
 
-/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ? */
-static void add_fd_to_polling_island_locked(polling_island *pi, grpc_fd *fd) {
+/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
+ * TODO: sreek - Should this add a ref to the grpc_fd ? */
+/* The caller is expected to hold pi->mu lock before calling this function */
+static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
+                                          size_t fd_count) {
   int err;
+  size_t i;
   struct epoll_event ev;
 
-  ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
-  ev.data.ptr = fd;
-  err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+  for (i = 0; i < fd_count; i++) {
+    ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+    ev.data.ptr = fds[i];
+    err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
 
-  if (err < 0 && errno != EEXIST) {
-    gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s", fd->fd,
-            strerror(errno));
-    return;
+    if (err < 0 && errno != EEXIST) {
+      gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
+              fds[i]->fd, strerror(errno));
+      /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
+       * better way to bubble up this error instead of doing an abort() */
+      continue;
+    }
+
+    if (pi->fd_cnt == pi->fd_capacity) {
+      pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
+      pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
+    }
+
+    pi->fds[pi->fd_cnt++] = fds[i];
   }
-
-  pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
-  pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
-  pi->fds[pi->fd_cnt++] = fd;
 }
 
-static polling_island *polling_island_create(int initial_ref_cnt,
-                                             grpc_fd *initial_fd) {
+/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
+ * TODO: sreek - Might have to unref the fds (assuming whether we add a ref to
+ * the fd when adding it to the epollset) */
+/* The caller is expected to hold pi->mu lock before calling this function */
+static void polling_island_clear_fds_locked(polling_island *pi) {
+  int err;
+  size_t i;
+
+  for (i = 0; i < pi->fd_cnt; i++) {
+    err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
+
+    if (err < 0 && errno != ENOENT) {
+      gpr_log(GPR_ERROR,
+              "epoll_ctl delete for fds[i]: %d failed with error: %s", i,
+              pi->fds[i]->fd, strerror(errno));
+      /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
+       * better way to bubble up this error instead of doing an abort() */
+      continue;
+    }
+  }
+
+  pi->fd_cnt = 0;
+}
+
+/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
+ * TODO: sreek - Might have to unref the fd (assuming whether we add a ref to
+ * the fd when adding it to the epollset) */
+/* The caller is expected to hold pi->mu lock before calling this function */
+static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd) {
+  int err;
+  size_t i;
+  err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
+  if (err < 0 && errno != ENOENT) {
+    gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s",
+            fd->fd, strerror(errno));
+  }
+
+  for (i = 0; i < pi->fd_cnt; i++) {
+    if (pi->fds[i] == fd) {
+      pi->fds[i] = pi->fds[--pi->fd_cnt];
+      break;
+    }
+  }
+}
+
+static polling_island *polling_island_create(grpc_fd *initial_fd,
+                                             int initial_ref_cnt) {
   polling_island *pi = NULL;
   gpr_mu_lock(&g_pi_freelist_mu);
   if (g_pi_freelist != NULL) {
@@ -202,17 +258,151 @@
   pi->next_free = NULL;
 
   if (initial_fd != NULL) {
-    /* add_fd_to_polling_island_locked() expects the caller to hold a pi->mu
+    /* polling_island_add_fds_locked() expects the caller to hold a pi->mu
      * lock. However, since this is a new polling island (and no one has a
      * reference to it yet), it is okay to not acquire pi->mu here */
-    add_fd_to_polling_island_locked(pi, initial_fd);
+    polling_island_add_fds_locked(pi, &initial_fd, 1);
   }
 
   return pi;
 }
 
+static void polling_island_delete(polling_island *pi) {
+  GPR_ASSERT(pi->ref_cnt == 0);
+  GPR_ASSERT(pi->fd_cnt == 0);
+
+  pi->merged_to = NULL;
+
+  gpr_mu_lock(&g_pi_freelist_mu);
+  pi->next_free = g_pi_freelist;
+  g_pi_freelist = pi;
+  gpr_mu_unlock(&g_pi_freelist_mu);
+}
+
+void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
+  pi->ref_cnt -= unref_by;
+  int ref_cnt = pi->ref_cnt;
+  GPR_ASSERT(ref_cnt >= 0);
+
+  gpr_mu_unlock(&pi->mu);
+
+  if (ref_cnt == 0) {
+    polling_island_delete(pi);
+  }
+}
+
+polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
+                                               int add_ref_by) {
+  polling_island *next = NULL;
+  gpr_mu_lock(&pi->mu);
+  while (pi->merged_to != NULL) {
+    next = pi->merged_to;
+    polling_island_unref_and_unlock(pi, unref_by);
+    pi = next;
+    gpr_mu_lock(&pi->mu);
+  }
+
+  pi->ref_cnt += add_ref_by;
+  return pi;
+}
+
+void polling_island_pair_update_and_lock(polling_island **p,
+                                         polling_island **q) {
+  polling_island *pi_1 = *p;
+  polling_island *pi_2 = *q;
+  polling_island *temp = NULL;
+  bool pi_1_locked = false;
+  bool pi_2_locked = false;
+  int num_swaps = 0;
+
+  while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
+    // pi_1 is NOT equal to pi_2
+    // pi_1 MAY be locked
+
+    if (pi_1 > pi_2) {
+      if (pi_1_locked) {
+        gpr_mu_unlock(&pi_1->mu);
+        pi_1_locked = false;
+      }
+
+      GPR_SWAP(polling_island *, pi_1, pi_2);
+      num_swaps++;
+    }
+
+    // p1 < p2
+    // p1 MAY BE locked
+    // p2 is NOT locked
+
+    if (!pi_1_locked) {
+      gpr_mu_lock(&pi_1->mu);
+      pi_1_locked = true;
+
+      if (pi_1->merged_to != NULL) {
+        temp = pi_1->merged_to;
+        polling_island_unref_and_unlock(pi_1, 1);
+        pi_1 = temp;
+        pi_1_locked = false;
+
+        continue;
+      }
+    }
+
+    // p1 is LOCKED
+    // p2 is UNLOCKED
+    // p1 != p2
+
+    gpr_mu_lock(&pi_2->mu);
+    pi_2_locked = true;
+
+    if (pi_2->merged_to != NULL) {
+      temp = pi_2->merged_to;
+      polling_island_unref_and_unlock(pi_2, 1);
+      pi_2 = temp;
+      pi_2_locked = false;
+    }
+  }
+
+  // Either pi_1 == pi_2 OR we got both locks!
+  if (pi_1 == pi_2) {
+    GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
+    if (!pi_1_locked) {
+      pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
+    }
+  } else {
+    GPR_ASSERT(pi_1_locked && pi_2_locked);
+    if (num_swaps % 2 > 0) {
+      GPR_SWAP(polling_island *, pi_1, pi_2);
+    }
+  }
+
+  *p = pi_1;
+  *q = pi_2;
+}
+
+polling_island *polling_island_merge(polling_island *p, polling_island *q) {
+  polling_island *merged = NULL;
+
+  polling_island_pair_update_and_lock(&p, &q);
+
+  /* TODO: sreek: Think about this scenario some more. Is it possible ?. what
+   * does it mean, when would this happen  */
+  if (p == q) {
+    merged = p;
+  }
+
+  // Move all the fds from polling_island p to polling_island q
+  polling_island_add_fds_locked(q, p->fds, p->fd_cnt);
+  polling_island_clear_fds_locked(p);
+
+  q->ref_cnt += p->ref_cnt;
+
+  gpr_mu_unlock(&p->mu);
+  gpr_mu_unlock(&q->mu);
+
+  return merged;
+}
+
 static void polling_island_global_init() {
-  polling_island_create(0, NULL); /* TODO(sreek): Delete this line */
   gpr_mu_init(&g_pi_freelist_mu);
   g_pi_freelist = NULL;
 }
@@ -245,7 +435,7 @@
 
   int epoll_fd;
 
-   /* Mutex protecting the 'polling_island' field */
+  /* Mutex protecting the 'polling_island' field */
   gpr_mu pi_mu;
 
   /* The polling island to which this fd belongs to. An fd belongs to exactly
@@ -319,7 +509,8 @@
  * fd_posix.c
  */
 
-/* We need to keep a freelist not because of any concerns of malloc performance
+/* We need to keep a freelist not because of any concerns of malloc
+ * performance
  * but instead so that implementations with multiple threads in (for example)
  * epoll_wait deal with the race between pollset removal and incoming poll
  * notifications.
@@ -434,6 +625,7 @@
 
 static grpc_fd *fd_create(int fd, const char *name) {
   grpc_fd *r = alloc_fd(fd);
+
   char *name2;
   gpr_asprintf(&name2, "%s fd=%d", name, fd);
   grpc_iomgr_register_object(&r->iomgr_object, name2);
@@ -453,6 +645,20 @@
   if (!fd->released) {
     close(fd->fd);
   } else {
+    /* TODO: sreek - Check for deadlocks */
+
+    gpr_mu_lock(&fd->pi_mu);
+    fd->polling_island =
+        polling_island_update_and_lock(fd->polling_island, 1, 0);
+
+    polling_island_remove_fd_locked(fd->polling_island, fd);
+    polling_island_unref_and_unlock(fd->polling_island, 1);
+
+    fd->polling_island = NULL;
+    gpr_mu_unlock(&fd->pi_mu);
+
+
+    /* TODO: sreek - This should be no longer needed */
     remove_fd_from_all_epoll_sets(fd->fd);
   }
   grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
@@ -752,7 +958,8 @@
   grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
 }
 
-/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration
+/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock
+ * declaration
  */
 static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
     grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
@@ -979,50 +1186,30 @@
  * finally_add_fd() in ev_poll_and_epoll_posix.c */
 static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                            grpc_fd *fd) {
-
   /* TODO sreek - Check if we need to get a pollset->mu lock here */
+  gpr_mu_lock(&pollset->pi_mu);
+  gpr_mu_lock(&fd->pi_mu);
 
-  struct epoll_event ev;
-  int err;
+  polling_island *pi_new = NULL;
 
-  /* Hold a ref to the fd to keep it from being closed during the add. This may
-     result in a spurious wakeup being assigned to this pollset whilst adding,
-     but that should be benign. */
-  /* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
-   * pollset..and how holding a reference will prevent the fd from being closed
-   * (and perhaps more importantly, see how can an fd be closed while being
-   * added to the epollset */
-  GRPC_FD_REF(fd, "add fd");
-
-  gpr_mu_lock(&fd->mu);
-  if (fd->shutdown) {
-    gpr_mu_unlock(&fd->mu);
-    GRPC_FD_UNREF(fd, "add fd");
-    return;
-  }
-  gpr_mu_unlock(&fd->mu);
-
-  ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
-  ev.data.ptr = fd;
-  err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
-  if (err < 0) {
-    /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
-    if (errno != EEXIST) {
-      gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
-              strerror(errno));
+  if (fd->polling_island == pollset->polling_island) {
+    pi_new = fd->polling_island;
+    if (pi_new == NULL) {
+      pi_new = polling_island_create(fd, 2);
     }
+  } else if (fd->polling_island == NULL) {
+    pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
+
+  } else if (pollset->polling_island == NULL) {
+    pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
+  } else {  // Non null and different
+    pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
   }
 
-  /* The fd might have been orphaned while we were adding it to the epoll set.
-     Close the fd in such a case (which will also take care of removing it from
-     the epoll set */
-  gpr_mu_lock(&fd->mu);
-  if (fd_is_orphaned(fd) && !fd->closed) {
-    close_fd_locked(exec_ctx, fd);
-  }
-  gpr_mu_unlock(&fd->mu);
+  fd->polling_island = pollset->polling_island = pi_new;
 
-  GRPC_FD_UNREF(fd, "add fd");
+  gpr_mu_unlock(&fd->pi_mu);
+  gpr_mu_unlock(&pollset->pi_mu);
 }
 
 /* Creates an epoll fd and initializes the pollset */