Reduce contention on lock

Change the fd watcher from being O(active_pollers) to O(1), reducing time spent under the fd->watcher_mu lock, and ultimately scaling us much better.
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b67c6cd..737ee01 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -68,7 +68,6 @@
 static gpr_mu fd_freelist_mu;
 
 static void freelist_fd(grpc_fd *fd) {
-  gpr_free(fd->watchers);
   gpr_mu_lock(&fd_freelist_mu);
   fd->freelist_next = fd_freelist;
   fd_freelist = fd;
@@ -93,9 +92,7 @@
   gpr_atm_rel_store(&r->writest.state, NOT_READY);
   gpr_atm_rel_store(&r->shutdown, 0);
   r->fd = fd;
-  r->watchers = NULL;
-  r->watcher_count = 0;
-  r->watcher_capacity = 0;
+  r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
   r->freelist_next = NULL;
   return r;
 }
@@ -118,9 +115,7 @@
   }
 }
 
-void grpc_fd_global_init(void) {
-  gpr_mu_init(&fd_freelist_mu);
-}
+void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
 
 void grpc_fd_global_shutdown(void) {
   while (fd_freelist != NULL) {
@@ -145,11 +140,11 @@
 }
 
 static void wake_watchers(grpc_fd *fd) {
-  size_t i, n;
+  grpc_fd_watcher *watcher;
   gpr_mu_lock(&fd->watcher_mu);
-  n = fd->watcher_count;
-  for (i = 0; i < n; i++) {
-    grpc_pollset_force_kick(fd->watchers[i]);
+  for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
+       watcher = watcher->next) {
+    grpc_pollset_force_kick(watcher->pollset);
   }
   gpr_mu_unlock(&fd->watcher_mu);
 }
@@ -293,36 +288,27 @@
 }
 
 gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
-                              gpr_uint32 read_mask, gpr_uint32 write_mask) {
+                              gpr_uint32 read_mask, gpr_uint32 write_mask,
+                              grpc_fd_watcher *watcher) {
   /* keep track of pollers that have requested our events, in case they change
    */
   gpr_mu_lock(&fd->watcher_mu);
-  if (fd->watcher_capacity == fd->watcher_count) {
-    fd->watcher_capacity =
-        GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
-    fd->watchers = gpr_realloc(fd->watchers,
-                               fd->watcher_capacity * sizeof(grpc_pollset *));
-  }
-  fd->watchers[fd->watcher_count++] = pollset;
+  watcher->next = &fd->watcher_root;
+  watcher->prev = watcher->next->prev;
+  watcher->next->prev = watcher->prev->next = watcher;
+  watcher->pollset = pollset;
+  watcher->fd = fd;
   gpr_mu_unlock(&fd->watcher_mu);
 
   return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
          (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
 }
 
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
-  size_t r, w, n;
-
-  gpr_mu_lock(&fd->watcher_mu);
-  n = fd->watcher_count;
-  for (r = 0, w = 0; r < n; r++) {
-    if (fd->watchers[r] == pollset) {
-      fd->watcher_count--;
-      continue;
-    }
-    fd->watchers[w++] = fd->watchers[r];
-  }
-  gpr_mu_unlock(&fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
+  gpr_mu_lock(&watcher->fd->watcher_mu);
+  watcher->next->prev = watcher->prev;
+  watcher->prev->next = watcher->next;
+  gpr_mu_unlock(&watcher->fd->watcher_mu);
 }
 
 void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {