More scalable unary polling

- admit only one poller for read and one for write at a time
  (poll is level triggered, so this avoids a thundering herd on each event)
- wake only one poller when more pollers are needed, again avoiding a thundering herd
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 9c8133d..afd6ddc 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -98,6 +98,7 @@
   r->fd = fd;
   r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
   r->freelist_next = NULL;
+  r->read_watcher = r->write_watcher = NULL;
   return r;
 }
 
@@ -147,14 +148,24 @@
   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
 }
 
-static void wake_watchers(grpc_fd *fd) {
-  grpc_fd_watcher *watcher;
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+  if (fd->watcher_root.next != &fd->watcher_root) {
+    grpc_pollset_force_kick(fd->watcher_root.next->pollset);
+  }
+}
+
+static void maybe_wake_one_watcher(grpc_fd *fd) {
   gpr_mu_lock(&fd->watcher_mu);
+  maybe_wake_one_watcher_locked(fd);
+  gpr_mu_unlock(&fd->watcher_mu);
+}
+
+static void wake_all_watchers(grpc_fd *fd) {
+  grpc_fd_watcher *watcher;
   for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
        watcher = watcher->next) {
     grpc_pollset_force_kick(watcher->pollset);
   }
-  gpr_mu_unlock(&fd->watcher_mu);
 }
 
 void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@@ -162,7 +173,7 @@
   fd->on_done_user_data = user_data;
   shutdown(fd->fd, SHUT_RDWR);
   ref_by(fd, 1); /* remove active status, but keep referenced */
-  wake_watchers(fd);
+  wake_all_watchers(fd);
   unref_by(fd, 2); /* drop the reference */
 }
 
@@ -204,7 +215,7 @@
            set_ready call.  NOTE: we don't have an ABA problem here,
            since we should never have concurrent calls to the same
            notify_on function. */
-        wake_watchers(fd);
+        maybe_wake_one_watcher(fd);
         return;
       }
     /* swap was unsuccessful due to an intervening set_ready call.
@@ -290,29 +301,61 @@
 gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
                               gpr_uint32 read_mask, gpr_uint32 write_mask,
                               grpc_fd_watcher *watcher) {
+  gpr_uint32 mask = 0;
   /* keep track of pollers that have requested our events, in case they change
    */
   grpc_fd_ref(fd);
 
   gpr_mu_lock(&fd->watcher_mu);
-  watcher->next = &fd->watcher_root;
-  watcher->prev = watcher->next->prev;
-  watcher->next->prev = watcher->prev->next = watcher;
+  /* if there is nobody polling for read, but we need to, then start doing so */
+  if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+    fd->read_watcher = watcher;
+    mask |= read_mask;
+  }
+  /* if there is nobody polling for write, but we need to, then start doing so */
+  if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+    fd->write_watcher = watcher;
+    mask |= write_mask;
+  }
+  /* if not polling, remember this watcher in case we need someone to later */
+  if (mask == 0) {
+    watcher->next = &fd->watcher_root;
+    watcher->prev = watcher->next->prev;
+    watcher->next->prev = watcher->prev->next = watcher;
+  }
   watcher->pollset = pollset;
   watcher->fd = fd;
   gpr_mu_unlock(&fd->watcher_mu);
 
-  return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
-         (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
+  return mask;
 }
 
-void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
-  gpr_mu_lock(&watcher->fd->watcher_mu);
-  watcher->next->prev = watcher->prev;
-  watcher->prev->next = watcher->next;
-  gpr_mu_unlock(&watcher->fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+  int was_polling = 0;
+  int kick = 0;
+  grpc_fd *fd = watcher->fd;
 
-  grpc_fd_unref(watcher->fd);
+  gpr_mu_lock(&fd->watcher_mu);
+  if (watcher == fd->read_watcher) {
+    was_polling = 1;
+    kick |= !got_read;
+    fd->read_watcher = NULL;
+  }
+  if (watcher == fd->write_watcher) {
+    was_polling = 1;
+    kick |= !got_write;
+    fd->write_watcher = NULL;
+  }
+  if (!was_polling) {
+    watcher->next->prev = watcher->prev;
+    watcher->prev->next = watcher->next;
+  }
+  if (kick) {
+    maybe_wake_one_watcher_locked(fd);
+  }
+  gpr_mu_unlock(&fd->watcher_mu);
+
+  grpc_fd_unref(fd);
 }
 
 void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {