Fix a few bugs in ev_epoll_linux.c
  1. pollset_add_fd: Add fd to epoll set if fd->polling_island == NULL
  2. close(fd) in fd_orphan instead of polling_island_remove_fd_locked()
  since fd->polling_island may be NULL
  3. If pollset work() is interrupted, do a zero timeout epoll_wait().
  pollset_work may be called without a polling island
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 0fb1ccf..3aa2610 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -72,9 +72,14 @@
   gpr_atm refst;
 
   gpr_mu mu;
+
+  /* Indicates that the fd is shutdown and that any pending read/write closures
+     should fail */
   bool shutdown;
-  int closed;
-  bool released;
+
+  /* The fd is either closed or we relinquished control of it. In either cases,
+     this indicates that the 'fd' on this structure is no longer valid */
+  bool orphaned;
 
   grpc_closure *read_closure;
   grpc_closure *write_closure;
@@ -251,16 +256,13 @@
 
 /* The caller is expected to hold pi->mu lock before calling this function */
 static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
-                                            bool close_fd, bool remove_fd_ref) {
+                                            bool is_fd_closed) {
   int err;
   size_t i;
 
-  /* Calling close() on the fd will automatically remove it from the epoll set.
-     If not calling close(), the fd must be explicitly removed from the epoll
-     set */
-  if (close_fd) {
-    close(fd->fd);
-  } else {
+  /* If fd is already closed, then it would have been automatically been removed
+     from the epoll set */
+  if (!is_fd_closed) {
     err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
     if (err < 0 && errno != ENOENT) {
       gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s",
@@ -271,9 +273,7 @@
   for (i = 0; i < pi->fd_cnt; i++) {
     if (pi->fds[i] == fd) {
       pi->fds[i] = pi->fds[--pi->fd_cnt];
-      if (remove_fd_ref) {
-        GRPC_FD_UNREF(fd, "polling_island");
-      }
+      GRPC_FD_UNREF(fd, "polling_island");
       break;
     }
   }
@@ -644,8 +644,7 @@
   new_fd->polling_island = NULL;
   new_fd->freelist_next = NULL;
   new_fd->on_done_closure = NULL;
-  new_fd->closed = 0;
-  new_fd->released = false;
+  new_fd->orphaned = false;
 
   gpr_mu_unlock(&new_fd->mu);
 
@@ -666,7 +665,7 @@
 static int fd_wrapped_fd(grpc_fd *fd) {
   int ret_fd = -1;
   gpr_mu_lock(&fd->mu);
-  if (!fd->released && !fd->closed) {
+  if (!fd->orphaned) {
     ret_fd = fd->fd;
   }
   gpr_mu_unlock(&fd->mu);
@@ -678,34 +677,35 @@
                       grpc_closure *on_done, int *release_fd,
                       const char *reason) {
   /* TODO(sreek) In ev_poll_posix.c,the lock is acquired a little later. Why? */
+  bool is_fd_closed = false;
   gpr_mu_lock(&fd->mu);
   fd->on_done_closure = on_done;
 
   /* If release_fd is not NULL, we should be relinquishing control of the file
      descriptor fd->fd (but we still own the grpc_fd structure). */
-  fd->released = release_fd != NULL;
-  if (!fd->released) {
-    shutdown(fd->fd, SHUT_RDWR);
-  } else {
+  if (release_fd != NULL) {
     *release_fd = fd->fd;
+  } else {
+    close(fd->fd);
+    is_fd_closed = true;
   }
 
-  REF_BY(fd, 1, reason); /* Remove active status, but keep referenced */
-  fd->closed = 1;
+  fd->orphaned = true;
+
+  /* Remove the active status but keep referenced. We want this grpc_fd struct
+     to be alive (and not added to freelist) until the end of this function */
+  REF_BY(fd, 1, reason);
 
   /* Remove the fd from the polling island:
      - Update the fd->polling_island to point to the latest polling island
-     - Remove the fd from the polling island. Also, call close() on the file
-       descriptor fd->fd ONLY if we haven't relinquised control (i.e
-       fd->released is 'false')
-     - Decrement the ref count on the polling island and det fd->polling_island
-       to NULL */
+     - Remove the fd from the polling island.
+     - Remove a ref to the polling island and set fd->polling_island to NULL */
   gpr_mu_lock(&fd->pi_mu);
   if (fd->polling_island != NULL) {
     fd->polling_island =
         polling_island_update_and_lock(fd->polling_island, 1, 0);
-    polling_island_remove_fd_locked(fd->polling_island, fd, !fd->released,
-                                    true);
+    polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
+
     polling_island_unref_and_unlock(fd->polling_island, 1);
     fd->polling_island = NULL;
   }
@@ -839,17 +839,20 @@
   grpc_pollset_worker *worker = specific_worker;
   if (worker != NULL) {
     if (worker == GRPC_POLLSET_KICK_BROADCAST) {
-      GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
+      gpr_log(GPR_DEBUG, "pollset_kick: broadcast!");
       if (pollset_has_workers(p)) {
+        GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
         for (worker = p->root_worker.next; worker != &p->root_worker;
              worker = worker->next) {
           pthread_kill(worker->pt_id, SIGUSR1);
         }
       } else {
+        gpr_log(GPR_DEBUG, "pollset_kick: (broadcast) Kicked without pollers");
         p->kicked_without_pollers = true;
       }
       GPR_TIMER_END("pollset_kick.broadcast", 0);
     } else {
+      gpr_log(GPR_DEBUG, "pollset_kick: kicked kicked_specifically");
       GPR_TIMER_MARK("kicked_specifically", 0);
       worker->kicked_specifically = true;
       pthread_kill(worker->pt_id, SIGUSR1);
@@ -860,9 +863,11 @@
     if (worker != NULL) {
       GPR_TIMER_MARK("finally_kick", 0);
       push_back_worker(p, worker);
+      gpr_log(GPR_DEBUG, "pollset_kick: anonymous kick");
       pthread_kill(worker->pt_id, SIGUSR1);
     } else {
       GPR_TIMER_MARK("kicked_no_pollers", 0);
+      gpr_log(GPR_DEBUG, "pollset_kick: kicked without pollers");
       p->kicked_without_pollers = true;
     }
   }
@@ -935,6 +940,7 @@
   struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
   int epoll_fd = -1;
   int ep_rv;
+  gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Entering..");
   GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
 
   /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@@ -949,6 +955,16 @@
     pollset->polling_island =
         polling_island_update_and_lock(pollset->polling_island, 1, 0);
     epoll_fd = pollset->polling_island->epoll_fd;
+    if (pollset->polling_island->fd_cnt == 0) {
+      gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
+              epoll_fd);
+    }
+    for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
+      gpr_log(GPR_DEBUG,
+              "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
+              epoll_fd, pollset->polling_island->fd_cnt, i,
+              pollset->polling_island->fds[i]->fd);
+    }
     gpr_mu_unlock(&pollset->polling_island->mu);
   }
 
@@ -958,36 +974,47 @@
   /* If epoll_fd == -1, this is a blank pollset and does not have any fds yet */
   if (epoll_fd != -1) {
     do {
+      gpr_timespec before_epoll = gpr_now(GPR_CLOCK_PRECISE);
+      gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_wait()....");
       ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
                           sig_mask);
+      gpr_timespec after_epoll = gpr_now(GPR_CLOCK_PRECISE);
+      int dur = gpr_time_to_millis(gpr_time_sub(after_epoll, before_epoll));
+      gpr_log(GPR_DEBUG,
+              "pollset_work_and_unlock: DONE epoll_wait() : %d ms, ep_rv: %d",
+              dur, ep_rv);
 
       if (ep_rv < 0) {
         if (errno != EINTR) {
           /* TODO (sreek) - Check for bad file descriptor error */
           gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
+        } else {
+          gpr_log(GPR_DEBUG, "pollset_work_and_unlock: 0-timeout epoll_wait()");
+          ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+          gpr_log(GPR_DEBUG, "pollset_work_and_unlock: ep_rv: %d", ep_rv);
         }
-      } else {
-        int i;
-        for (i = 0; i < ep_rv; ++i) {
-          grpc_fd *fd = ep_ev[i].data.ptr;
-          int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
-          int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
-          int write_ev = ep_ev[i].events & EPOLLOUT;
-          if (fd == NULL) {
-            grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
-          } else {
-            if (read_ev || cancel) {
-              fd_become_readable(exec_ctx, fd);
-            }
-            if (write_ev || cancel) {
-              fd_become_writable(exec_ctx, fd);
-            }
+      }
+
+      int i;
+      for (i = 0; i < ep_rv; ++i) {
+        grpc_fd *fd = ep_ev[i].data.ptr;
+        int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+        int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+        int write_ev = ep_ev[i].events & EPOLLOUT;
+        if (fd == NULL) {
+          grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+        } else {
+          if (read_ev || cancel) {
+            fd_become_readable(exec_ctx, fd);
+          }
+          if (write_ev || cancel) {
+            fd_become_writable(exec_ctx, fd);
           }
         }
       }
     } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
   }
-
+  gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Leaving..");
   GPR_TIMER_END("pollset_work_and_unlock", 0);
 }
 
@@ -1060,7 +1087,7 @@
                          grpc_pollset_worker **worker_hdl, gpr_timespec now,
                          gpr_timespec deadline) {
   GPR_TIMER_BEGIN("pollset_work", 0);
-
+  gpr_log(GPR_DEBUG, "pollset_work: enter");
   int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
 
   sigset_t new_mask;
@@ -1079,6 +1106,7 @@
        work that needs attention like an event on the completion queue or an
        alarm */
     GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+    gpr_log(GPR_INFO, "pollset_work: kicked without pollers..");
     pollset->kicked_without_pollers = 0;
   } else if (!pollset->shutting_down) {
     sigemptyset(&new_mask);
@@ -1113,12 +1141,14 @@
     gpr_mu_lock(&pollset->mu);
   }
 
+  gpr_log(GPR_DEBUG, "pollset_work(): leaving");
   *worker_hdl = NULL;
   GPR_TIMER_END("pollset_work", 0);
 }
 
 static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                            grpc_fd *fd) {
+  gpr_log(GPR_DEBUG, "pollset_add_fd: pollset: %p, fd: %d", pollset, fd->fd);
   /* TODO sreek - Check if we need to get a pollset->mu lock here */
   gpr_mu_lock(&pollset->pi_mu);
   gpr_mu_lock(&fd->pi_mu);
@@ -1146,6 +1176,7 @@
     }
   } else if (fd->polling_island == NULL) {
     pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
+    polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
     gpr_mu_unlock(&pi_new->mu);
   } else if (pollset->polling_island == NULL) {
     pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);