Allow specific pollers to be woken

Currently, if two threads call grpc_completion_queue_pluck on the same
completion queue for different tags, there is a 50% chance that we
deliver the completion wakeup to the wrong poller - forcing the correct
poller to wait until its polling times out before it can return an event
up to the application.

This change tweaks our polling interfaces so that we can indeed wake a
specific poller.

Nothing has been performance tuned yet. It's definitely sub-optimal in a
number of places. Wakeup file-descriptors should be recycled. We should
have a path that avoids calling poll() followed by epoll(). We can
probably live without it right at the second though.

This code will fail on Windows at least (I'll do that port when I'm in the office and have a Windows
machine).
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 3f60b0b..732813f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,6 +45,13 @@
 #include <grpc/support/atm.h>
 #include <grpc/support/log.h>
 
+#define MAX_PLUCKERS 4
+
+typedef struct {
+  grpc_pollset_worker *worker;
+  void *tag;
+} plucker;
+
 /* Completion queue structure */
 struct grpc_completion_queue {
   /** completed events */
@@ -60,6 +67,8 @@
   int shutdown;
   int shutdown_called;
   int is_server_cq;
+  int num_pluckers;
+  plucker pluckers[MAX_PLUCKERS];
 };
 
 grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -117,6 +126,8 @@
                     void (*done)(void *done_arg, grpc_cq_completion *storage),
                     void *done_arg, grpc_cq_completion *storage) {
   int shutdown;
+  int i;
+  grpc_pollset_worker *pluck_worker;
 
   storage->tag = tag;
   storage->done = done;
@@ -130,7 +141,14 @@
     cc->completed_tail->next =
         ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
     cc->completed_tail = storage;
-    grpc_pollset_kick(&cc->pollset);
+    pluck_worker = NULL;
+    for (i = 0; i < cc->num_pluckers; i++) {
+      if (cc->pluckers[i].tag == tag) {
+        pluck_worker = cc->pluckers[i].worker;
+        break;
+      }
+    }
+    grpc_pollset_kick(&cc->pollset, pluck_worker);
     gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
   } else {
     cc->completed_tail->next =
@@ -147,6 +165,7 @@
 grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
                                       gpr_timespec deadline) {
   grpc_event ret;
+  grpc_pollset_worker worker;
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
 
@@ -172,7 +191,7 @@
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, deadline)) {
+    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
@@ -184,11 +203,34 @@
   return ret;
 }
 
+static void add_plucker(grpc_completion_queue *cc, void *tag,
+                        grpc_pollset_worker *worker) {
+  GPR_ASSERT(cc->num_pluckers != MAX_PLUCKERS);
+  cc->pluckers[cc->num_pluckers].tag = tag;
+  cc->pluckers[cc->num_pluckers].worker = worker;
+  cc->num_pluckers++;
+}
+
+static void del_plucker(grpc_completion_queue *cc, void *tag,
+                        grpc_pollset_worker *worker) {
+  int i;
+  for (i = 0; i < cc->num_pluckers; i++) {
+    if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
+      cc->num_pluckers--;
+      GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
+      return;
+    }
+  }
+  gpr_log(GPR_ERROR, "should never reach here");
+  abort();
+}
+
 grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
                                        gpr_timespec deadline) {
   grpc_event ret;
   grpc_cq_completion *c;
   grpc_cq_completion *prev;
+  grpc_pollset_worker worker;
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
 
@@ -219,12 +261,15 @@
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, deadline)) {
+    add_plucker(cc, tag, &worker);
+    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+      del_plucker(cc, tag, &worker);
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
+    del_plucker(cc, tag, &worker);
   }
 done:
   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -261,15 +306,6 @@
   return &cc->pollset;
 }
 
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
-  gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
-  grpc_pollset_kick(&cc->pollset);
-  grpc_pollset_work(&cc->pollset,
-                    gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
-                                 gpr_time_from_millis(100, GPR_TIMESPAN)));
-  gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
 void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
 
 int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }