Merge branch 'master' into cq_mpsc_based
diff --git a/build.yaml b/build.yaml
index 95e9fc1..e270a0b 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3308,7 +3308,7 @@
   - gpr_test_util
   - gpr
   args:
-  - --benchmark_min_time=0
+  - --benchmark_min_time=4
   defaults: benchmark
   platforms:
   - mac
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index d9f5e07..468a8ed 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -585,7 +585,7 @@
 /** Specifies the type of APIs to use to pop events from the completion queue */
 typedef enum {
   /** Events are popped out by calling grpc_completion_queue_next() API ONLY */
-  GRPC_CQ_NEXT = 1,
+  GRPC_CQ_NEXT,
 
   /** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
   GRPC_CQ_PLUCK
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index eae3f10..cfe23ec 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -30,7 +30,6 @@
  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
  */
-
 #include "src/core/lib/surface/completion_queue.h"
 
 #include <stdio.h>
@@ -45,6 +44,7 @@
 #include "src/core/lib/iomgr/pollset.h"
 #include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/spinlock.h"
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/surface/api_trace.h"
 #include "src/core/lib/surface/call.h"
@@ -200,32 +200,66 @@
      .destroy = non_polling_poller_destroy},
 };
 
-/* Completion queue structure */
-struct grpc_completion_queue {
-  /** owned by pollset */
+typedef struct cq_vtable {
+  grpc_cq_completion_type cq_completion_type;
+  size_t (*size)();
+  void (*begin_op)(grpc_completion_queue *cc, void *tag);
+  void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
+                 grpc_error *error,
+                 void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
+                              grpc_cq_completion *storage),
+                 void *done_arg, grpc_cq_completion *storage);
+  grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
+                     void *reserved);
+  grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
+                      gpr_timespec deadline, void *reserved);
+} cq_vtable;
+
+/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
+ * (a lockfree multiproducer single consumer queue). It uses a queue_lock
+ * to support multiple consumers.
+ * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
+typedef struct grpc_cq_event_queue {
+  /* Spinlock to serialize consumers i.e pop() operations */
+  gpr_spinlock queue_lock;
+
+  gpr_mpscq queue;
+
+  /* A lazy counter of number of items in the queue. This is NOT atomically
+     incremented/decremented along with push/pop operations and hence is only
+     eventually consistent */
+  gpr_atm num_queue_items;
+} grpc_cq_event_queue;
+
+/* TODO: sreek Refactor this based on the completion_type. Put completion-type
+ * specific data in a different structure (and co-allocate memory for it along
+ * with completion queue + pollset )*/
+typedef struct cq_data {
   gpr_mu *mu;
 
-  grpc_cq_completion_type completion_type;
-
-  const cq_poller_vtable *poller_vtable;
-
-  /** completed events */
+  /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
   grpc_cq_completion completed_head;
   grpc_cq_completion *completed_tail;
+
+  /** Completed events for completion-queues of type GRPC_CQ_NEXT */
+  grpc_cq_event_queue queue;
+
   /** Number of pending events (+1 if we're not shutdown) */
   gpr_refcount pending_events;
+
   /** Once owning_refs drops to zero, we will destroy the cq */
   gpr_refcount owning_refs;
-  /** counter of how many things have ever been queued on this completion queue
+
+  /** Counter of how many things have ever been queued on this completion queue
       useful for avoiding locks to check the queue */
   gpr_atm things_queued_ever;
+
   /** 0 initially, 1 once we've begun shutting down */
-  int shutdown;
+  gpr_atm shutdown;
   int shutdown_called;
+
   int is_server_cq;
-  /** Can the server cq accept incoming channels */
-  /* TODO: sreek - This will no longer be needed. Use polling_type set */
-  int is_non_listening_server_cq;
+
   int num_pluckers;
   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
   grpc_closure pollset_shutdown_done;
@@ -235,8 +269,61 @@
   size_t outstanding_tag_count;
   size_t outstanding_tag_capacity;
 #endif
+} cq_data;
 
-  grpc_completion_queue *next_free;
+/* Completion queue structure */
+struct grpc_completion_queue {
+  cq_data data;
+  const cq_vtable *vtable;
+  const cq_poller_vtable *poller_vtable;
+};
+
+/* Forward declarations */
+static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
+                               grpc_completion_queue *cc);
+
+static size_t cq_size(grpc_completion_queue *cc);
+
+static void cq_begin_op(grpc_completion_queue *cc, void *tag);
+
+static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
+                               grpc_completion_queue *cc, void *tag,
+                               grpc_error *error,
+                               void (*done)(grpc_exec_ctx *exec_ctx,
+                                            void *done_arg,
+                                            grpc_cq_completion *storage),
+                               void *done_arg, grpc_cq_completion *storage);
+
+static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
+                                grpc_completion_queue *cc, void *tag,
+                                grpc_error *error,
+                                void (*done)(grpc_exec_ctx *exec_ctx,
+                                             void *done_arg,
+                                             grpc_cq_completion *storage),
+                                void *done_arg, grpc_cq_completion *storage);
+
+static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+                          void *reserved);
+
+static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+                           gpr_timespec deadline, void *reserved);
+
+/* Completion queue vtables based on the completion-type */
+static const cq_vtable g_cq_vtable[] = {
+    /* GRPC_CQ_NEXT */
+    {.cq_completion_type = GRPC_CQ_NEXT,
+     .size = cq_size,
+     .begin_op = cq_begin_op,
+     .end_op = cq_end_op_for_next,
+     .next = cq_next,
+     .pluck = NULL},
+    /* GRPC_CQ_PLUCK */
+    {.cq_completion_type = GRPC_CQ_PLUCK,
+     .size = cq_size,
+     .begin_op = cq_begin_op,
+     .end_op = cq_end_op_for_pluck,
+     .next = NULL,
+     .pluck = cq_pluck},
 };
 
 #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
@@ -256,6 +343,47 @@
 static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
                                      grpc_error *error);
 
+static void cq_event_queue_init(grpc_cq_event_queue *q) {
+  gpr_mpscq_init(&q->queue);
+  q->queue_lock = GPR_SPINLOCK_INITIALIZER;
+  gpr_atm_no_barrier_store(&q->num_queue_items, 0);
+}
+
+static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
+  gpr_mpscq_destroy(&q->queue);
+}
+
+static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
+  gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
+  gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1);
+}
+
+static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
+  grpc_cq_completion *c = NULL;
+  if (gpr_spinlock_trylock(&q->queue_lock)) {
+    c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
+    gpr_spinlock_unlock(&q->queue_lock);
+  }
+
+  if (c) {
+    gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
+  }
+
+  return c;
+}
+
+/* Note: The counter is not incremented/decremented atomically with push/pop.
+ * The count is only eventually consistent */
+static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
+  return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
+}
+
+static size_t cq_size(grpc_completion_queue *cc) {
+  /* Size of the completion queue and the size of the pollset whose memory is
+     allocated right after that of completion queue */
+  return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
+}
+
 grpc_completion_queue *grpc_completion_queue_create_internal(
     grpc_cq_completion_type completion_type,
     grpc_cq_polling_type polling_type) {
@@ -268,35 +396,39 @@
       "polling_type=%d)",
       2, (completion_type, polling_type));
 
+  const cq_vtable *vtable = &g_cq_vtable[completion_type];
   const cq_poller_vtable *poller_vtable =
       &g_poller_vtable_by_poller_type[polling_type];
 
   cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
-  poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
-#ifndef NDEBUG
-  cc->outstanding_tags = NULL;
-  cc->outstanding_tag_capacity = 0;
-#endif
+  cq_data *cqd = &cc->data;
 
-  cc->completion_type = completion_type;
+  cc->vtable = vtable;
   cc->poller_vtable = poller_vtable;
 
-  /* Initial ref is dropped by grpc_completion_queue_shutdown */
-  gpr_ref_init(&cc->pending_events, 1);
-  /* One for destroy(), one for pollset_shutdown */
-  gpr_ref_init(&cc->owning_refs, 2);
-  cc->completed_tail = &cc->completed_head;
-  cc->completed_head.next = (uintptr_t)cc->completed_tail;
-  cc->shutdown = 0;
-  cc->shutdown_called = 0;
-  cc->is_server_cq = 0;
-  cc->is_non_listening_server_cq = 0;
-  cc->num_pluckers = 0;
-  gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
+  poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
+
 #ifndef NDEBUG
-  cc->outstanding_tag_count = 0;
+  cqd->outstanding_tags = NULL;
+  cqd->outstanding_tag_capacity = 0;
 #endif
-  grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
+
+  /* Initial ref is dropped by grpc_completion_queue_shutdown */
+  gpr_ref_init(&cqd->pending_events, 1);
+  /* One for destroy(), one for pollset_shutdown */
+  gpr_ref_init(&cqd->owning_refs, 2);
+  cqd->completed_tail = &cqd->completed_head;
+  cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
+  gpr_atm_no_barrier_store(&cqd->shutdown, 0);
+  cqd->shutdown_called = 0;
+  cqd->is_server_cq = 0;
+  cqd->num_pluckers = 0;
+  gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+#ifndef NDEBUG
+  cqd->outstanding_tag_count = 0;
+#endif
+  cq_event_queue_init(&cqd->queue);
+  grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
                     grpc_schedule_on_exec_ctx);
 
   GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
@@ -305,18 +437,20 @@
 }
 
 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
-  return cc->completion_type;
+  return cc->vtable->cq_completion_type;
 }
 
 #ifdef GRPC_CQ_REF_COUNT_DEBUG
 void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
                           const char *file, int line) {
+  cq_data *cqd = &cc->data;
   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p   ref %d -> %d %s", cc,
-          (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
+          (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason);
 #else
 void grpc_cq_internal_ref(grpc_completion_queue *cc) {
+  cq_data *cqd = &cc->data;
 #endif
-  gpr_ref(&cc->owning_refs);
+  gpr_ref(&cqd->owning_refs);
 }
 
 static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
@@ -328,59 +462,154 @@
 #ifdef GRPC_CQ_REF_COUNT_DEBUG
 void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
                             const char *file, int line) {
+  cq_data *cqd = &cc->data;
   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
-          (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
+          (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason);
 #else
 void grpc_cq_internal_unref(grpc_completion_queue *cc) {
+  cq_data *cqd = &cc->data;
 #endif
-  if (gpr_unref(&cc->owning_refs)) {
-    GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
+  if (gpr_unref(&cqd->owning_refs)) {
+    GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
     cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc));
+    cq_event_queue_destroy(&cqd->queue);
 #ifndef NDEBUG
-    gpr_free(cc->outstanding_tags);
+    gpr_free(cqd->outstanding_tags);
 #endif
     gpr_free(cc);
   }
 }
 
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
+static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
+  cq_data *cqd = &cc->data;
 #ifndef NDEBUG
-  gpr_mu_lock(cc->mu);
-  GPR_ASSERT(!cc->shutdown_called);
-  if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
-    cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
-    cc->outstanding_tags =
-        gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) *
-                                              cc->outstanding_tag_capacity);
+  gpr_mu_lock(cqd->mu);
+  GPR_ASSERT(!cqd->shutdown_called);
+  if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
+    cqd->outstanding_tag_capacity =
+        GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
+    cqd->outstanding_tags =
+        gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
+                                               cqd->outstanding_tag_capacity);
   }
-  cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
-  gpr_mu_unlock(cc->mu);
+  cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
+  gpr_mu_unlock(cqd->mu);
 #endif
-  gpr_ref(&cc->pending_events);
+  gpr_ref(&cqd->pending_events);
 }
 
-/* Signal the end of an operation - if this is the last waiting-to-be-queued
-   event, then enter shutdown mode */
-/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
-                    void *tag, grpc_error *error,
-                    void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
-                                 grpc_cq_completion *storage),
-                    void *done_arg, grpc_cq_completion *storage) {
-  int shutdown;
-  int i;
-  grpc_pollset_worker *pluck_worker;
+void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
+  cc->vtable->begin_op(cc, tag);
+}
+
 #ifndef NDEBUG
+static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
+  cq_data *cqd = &cc->data;
   int found = 0;
+  if (lock_cq) {
+    gpr_mu_lock(cqd->mu);
+  }
+
+  for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
+    if (cqd->outstanding_tags[i] == tag) {
+      cqd->outstanding_tag_count--;
+      GPR_SWAP(void *, cqd->outstanding_tags[i],
+               cqd->outstanding_tags[cqd->outstanding_tag_count]);
+      found = 1;
+      break;
+    }
+  }
+
+  if (lock_cq) {
+    gpr_mu_unlock(cqd->mu);
+  }
+
+  GPR_ASSERT(found);
+}
+#else
+static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
 #endif
 
-  GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+ * type of GRPC_CQ_NEXT) */
+static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
+                               grpc_completion_queue *cc, void *tag,
+                               grpc_error *error,
+                               void (*done)(grpc_exec_ctx *exec_ctx,
+                                            void *done_arg,
+                                            grpc_cq_completion *storage),
+                               void *done_arg, grpc_cq_completion *storage) {
+  GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
+
   if (grpc_api_trace ||
       (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
     const char *errmsg = grpc_error_string(error);
     GRPC_API_TRACE(
-        "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
-        "done_arg=%p, storage=%p)",
+        "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+        "done=%p, done_arg=%p, storage=%p)",
+        7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+    if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
+      gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+    }
+  }
+
+  cq_data *cqd = &cc->data;
+  int is_success = (error == GRPC_ERROR_NONE);
+
+  storage->tag = tag;
+  storage->done = done;
+  storage->done_arg = done_arg;
+  storage->next = (uintptr_t)(is_success);
+
+  cq_check_tag(cc, tag, true); /* Used in debug builds only */
+
+  /* Add the completion to the queue */
+  cq_event_queue_push(&cqd->queue, storage);
+  gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+
+  int shutdown = gpr_unref(&cqd->pending_events);
+
+  gpr_mu_lock(cqd->mu);
+  if (!shutdown) {
+    grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
+    gpr_mu_unlock(cqd->mu);
+
+    if (kick_error != GRPC_ERROR_NONE) {
+      const char *msg = grpc_error_string(kick_error);
+      gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+
+      GRPC_ERROR_UNREF(kick_error);
+    }
+  } else {
+    cq_finish_shutdown(exec_ctx, cc);
+    gpr_mu_unlock(cqd->mu);
+  }
+
+  GPR_TIMER_END("cq_end_op_for_next", 0);
+
+  GRPC_ERROR_UNREF(error);
+}
+
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+ * type of GRPC_CQ_PLUCK) */
+static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
+                                grpc_completion_queue *cc, void *tag,
+                                grpc_error *error,
+                                void (*done)(grpc_exec_ctx *exec_ctx,
+                                             void *done_arg,
+                                             grpc_cq_completion *storage),
+                                void *done_arg, grpc_cq_completion *storage) {
+  cq_data *cqd = &cc->data;
+  int is_success = (error == GRPC_ERROR_NONE);
+
+  GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
+
+  if (grpc_api_trace ||
+      (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
+    const char *errmsg = grpc_error_string(error);
+    GRPC_API_TRACE(
+        "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+        "done=%p, done_arg=%p, storage=%p)",
         7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
     if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -390,38 +619,32 @@
   storage->tag = tag;
   storage->done = done;
   storage->done_arg = done_arg;
-  storage->next = ((uintptr_t)&cc->completed_head) |
-                  ((uintptr_t)(error == GRPC_ERROR_NONE));
+  storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
 
-  gpr_mu_lock(cc->mu);
-#ifndef NDEBUG
-  for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
-    if (cc->outstanding_tags[i] == tag) {
-      cc->outstanding_tag_count--;
-      GPR_SWAP(void *, cc->outstanding_tags[i],
-               cc->outstanding_tags[cc->outstanding_tag_count]);
-      found = 1;
-      break;
-    }
-  }
-  GPR_ASSERT(found);
-#endif
-  shutdown = gpr_unref(&cc->pending_events);
-  gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
+  gpr_mu_lock(cqd->mu);
+  cq_check_tag(cc, tag, false); /* Used in debug builds only */
+
+  /* Add to the list of completions */
+  gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+  cqd->completed_tail->next =
+      ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
+  cqd->completed_tail = storage;
+
+  int shutdown = gpr_unref(&cqd->pending_events);
   if (!shutdown) {
-    cc->completed_tail->next =
-        ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
-    cc->completed_tail = storage;
-    pluck_worker = NULL;
-    for (i = 0; i < cc->num_pluckers; i++) {
-      if (cc->pluckers[i].tag == tag) {
-        pluck_worker = *cc->pluckers[i].worker;
+    grpc_pollset_worker *pluck_worker = NULL;
+    for (int i = 0; i < cqd->num_pluckers; i++) {
+      if (cqd->pluckers[i].tag == tag) {
+        pluck_worker = *cqd->pluckers[i].worker;
         break;
       }
     }
+
     grpc_error *kick_error =
         cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
-    gpr_mu_unlock(cc->mu);
+
+    gpr_mu_unlock(cqd->mu);
+
     if (kick_error != GRPC_ERROR_NONE) {
       const char *msg = grpc_error_string(kick_error);
       gpr_log(GPR_ERROR, "Kick failed: %s", msg);
@@ -429,22 +652,23 @@
       GRPC_ERROR_UNREF(kick_error);
     }
   } else {
-    cc->completed_tail->next =
-        ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
-    cc->completed_tail = storage;
-    GPR_ASSERT(!cc->shutdown);
-    GPR_ASSERT(cc->shutdown_called);
-    cc->shutdown = 1;
-    cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
-                                &cc->pollset_shutdown_done);
-    gpr_mu_unlock(cc->mu);
+    cq_finish_shutdown(exec_ctx, cc);
+    gpr_mu_unlock(cqd->mu);
   }
 
-  GPR_TIMER_END("grpc_cq_end_op", 0);
+  GPR_TIMER_END("cq_end_op_for_pluck", 0);
 
   GRPC_ERROR_UNREF(error);
 }
 
+void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+                    void *tag, grpc_error *error,
+                    void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
+                                 grpc_cq_completion *storage),
+                    void *done_arg, grpc_cq_completion *storage) {
+  cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
+}
+
 typedef struct {
   gpr_atm last_seen_things_queued_ever;
   grpc_completion_queue *cq;
@@ -457,23 +681,24 @@
 static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
   cq_is_finished_arg *a = arg;
   grpc_completion_queue *cq = a->cq;
+  cq_data *cqd = &cq->data;
   GPR_ASSERT(a->stolen_completion == NULL);
+
   gpr_atm current_last_seen_things_queued_ever =
-      gpr_atm_no_barrier_load(&cq->things_queued_ever);
+      gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+
   if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
-    gpr_mu_lock(cq->mu);
     a->last_seen_things_queued_ever =
-        gpr_atm_no_barrier_load(&cq->things_queued_ever);
-    if (cq->completed_tail != &cq->completed_head) {
-      a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
-      cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
-      if (a->stolen_completion == cq->completed_tail) {
-        cq->completed_tail = &cq->completed_head;
-      }
-      gpr_mu_unlock(cq->mu);
+        gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+
+    /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
+     * might return NULL in some cases even if the queue is not empty; but that
+     * is ok and doesn't affect correctness. Might effect the tail latencies a
+     * bit) */
+    a->stolen_completion = cq_event_queue_pop(&cqd->queue);
+    if (a->stolen_completion != NULL) {
       return true;
     }
-    gpr_mu_unlock(cq->mu);
   }
   return !a->first_loop &&
          gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
@@ -483,16 +708,18 @@
 static void dump_pending_tags(grpc_completion_queue *cc) {
   if (!grpc_trace_pending_tags) return;
 
+  cq_data *cqd = &cc->data;
+
   gpr_strvec v;
   gpr_strvec_init(&v);
   gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
-  gpr_mu_lock(cc->mu);
-  for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
+  gpr_mu_lock(cqd->mu);
+  for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
     char *s;
-    gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
+    gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
     gpr_strvec_add(&v, s);
   }
-  gpr_mu_unlock(cc->mu);
+  gpr_mu_unlock(cqd->mu);
   char *out = gpr_strvec_flatten(&v, NULL);
   gpr_strvec_destroy(&v);
   gpr_log(GPR_DEBUG, "%s", out);
@@ -502,17 +729,11 @@
 static void dump_pending_tags(grpc_completion_queue *cc) {}
 #endif
 
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
-                                      gpr_timespec deadline, void *reserved) {
+static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+                          void *reserved) {
   grpc_event ret;
   gpr_timespec now;
-
-  if (cc->completion_type != GRPC_CQ_NEXT) {
-    gpr_log(GPR_ERROR,
-            "grpc_completion_queue_next() cannot be called on this completion "
-            "queue since its completion type is not GRPC_CQ_NEXT");
-    abort();
-  }
+  cq_data *cqd = &cc->data;
 
   GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
 
@@ -531,10 +752,10 @@
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
 
   GRPC_CQ_INTERNAL_REF(cc, "next");
-  gpr_mu_lock(cc->mu);
+
   cq_is_finished_arg is_finished_arg = {
       .last_seen_things_queued_ever =
-          gpr_atm_no_barrier_load(&cc->things_queued_ever),
+          gpr_atm_no_barrier_load(&cqd->things_queued_ever),
       .cq = cc,
       .deadline = deadline,
       .stolen_completion = NULL,
@@ -542,9 +763,11 @@
       .first_loop = true};
   grpc_exec_ctx exec_ctx =
       GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
+
   for (;;) {
+    gpr_timespec iteration_deadline = deadline;
+
     if (is_finished_arg.stolen_completion != NULL) {
-      gpr_mu_unlock(cc->mu);
       grpc_cq_completion *c = is_finished_arg.stolen_completion;
       is_finished_arg.stolen_completion = NULL;
       ret.type = GRPC_OP_COMPLETE;
@@ -553,28 +776,45 @@
       c->done(&exec_ctx, c->done_arg, c);
       break;
     }
-    if (cc->completed_tail != &cc->completed_head) {
-      grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
-      cc->completed_head.next = c->next & ~(uintptr_t)1;
-      if (c == cc->completed_tail) {
-        cc->completed_tail = &cc->completed_head;
-      }
-      gpr_mu_unlock(cc->mu);
+
+    grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue);
+
+    if (c != NULL) {
       ret.type = GRPC_OP_COMPLETE;
       ret.success = c->next & 1u;
       ret.tag = c->tag;
       c->done(&exec_ctx, c->done_arg, c);
       break;
+    } else {
+      /* If c == NULL it means either the queue is empty OR in an transient
+         inconsistent state. If it is the latter, we shold do a 0-timeout poll
+         so that the thread comes back quickly from poll to make a second
+         attempt at popping. Not doing this can potentially deadlock this thread
+         forever (if the deadline is infinity) */
+      if (cq_event_queue_num_items(&cqd->queue) > 0) {
+        iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
+      }
     }
-    if (cc->shutdown) {
-      gpr_mu_unlock(cc->mu);
+
+    if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+      /* Before returning, check if the queue has any items left over (since
+         gpr_mpscq_pop() can sometimes return NULL even if the queue is not
+         empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
+      if (cq_event_queue_num_items(&cqd->queue) > 0) {
+        /* Go to the beginning of the loop. No point doing a poll because
+           (cc->shutdown == true) is only possible when there is no pending work
+           (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
+           events are already queued on this cq */
+        continue;
+      }
+
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
     }
+
     now = gpr_now(GPR_CLOCK_MONOTONIC);
     if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
-      gpr_mu_unlock(cc->mu);
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       dump_pending_tags(cc);
@@ -582,32 +822,32 @@
     }
     /* Check alarms - these are a global resource so we just ping
        each time through on every pollset.
-       May update deadline to ensure timely wakeups.
-       TODO(ctiller): can this work be localized? */
-    gpr_timespec iteration_deadline = deadline;
+       May update deadline to ensure timely wakeups. */
     if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
       GPR_TIMER_MARK("alarm_triggered", 0);
-      gpr_mu_unlock(cc->mu);
       grpc_exec_ctx_flush(&exec_ctx);
-      gpr_mu_lock(cc->mu);
       continue;
-    } else {
-      grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
-                                                NULL, now, iteration_deadline);
-      if (err != GRPC_ERROR_NONE) {
-        gpr_mu_unlock(cc->mu);
-        const char *msg = grpc_error_string(err);
-        gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
+    }
 
-        GRPC_ERROR_UNREF(err);
-        memset(&ret, 0, sizeof(ret));
-        ret.type = GRPC_QUEUE_TIMEOUT;
-        dump_pending_tags(cc);
-        break;
-      }
+    /* The main polling work happens in grpc_pollset_work */
+    gpr_mu_lock(cqd->mu);
+    grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+                                              NULL, now, iteration_deadline);
+    gpr_mu_unlock(cqd->mu);
+
+    if (err != GRPC_ERROR_NONE) {
+      const char *msg = grpc_error_string(err);
+      gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
+
+      GRPC_ERROR_UNREF(err);
+      memset(&ret, 0, sizeof(ret));
+      ret.type = GRPC_QUEUE_TIMEOUT;
+      dump_pending_tags(cc);
+      break;
     }
     is_finished_arg.first_loop = false;
   }
+
   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
   GRPC_CQ_INTERNAL_UNREF(cc, "next");
   grpc_exec_ctx_finish(&exec_ctx);
@@ -618,24 +858,30 @@
   return ret;
 }
 
+grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
+                                      gpr_timespec deadline, void *reserved) {
+  return cc->vtable->next(cc, deadline, reserved);
+}
+
 static int add_plucker(grpc_completion_queue *cc, void *tag,
                        grpc_pollset_worker **worker) {
-  if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
+  cq_data *cqd = &cc->data;
+  if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
     return 0;
   }
-  cc->pluckers[cc->num_pluckers].tag = tag;
-  cc->pluckers[cc->num_pluckers].worker = worker;
-  cc->num_pluckers++;
+  cqd->pluckers[cqd->num_pluckers].tag = tag;
+  cqd->pluckers[cqd->num_pluckers].worker = worker;
+  cqd->num_pluckers++;
   return 1;
 }
 
 static void del_plucker(grpc_completion_queue *cc, void *tag,
                         grpc_pollset_worker **worker) {
-  int i;
-  for (i = 0; i < cc->num_pluckers; i++) {
-    if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
-      cc->num_pluckers--;
-      GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
+  cq_data *cqd = &cc->data;
+  for (int i = 0; i < cqd->num_pluckers; i++) {
+    if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
+      cqd->num_pluckers--;
+      GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
       return;
     }
   }
@@ -645,51 +891,47 @@
 static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
   cq_is_finished_arg *a = arg;
   grpc_completion_queue *cq = a->cq;
+  cq_data *cqd = &cq->data;
+
   GPR_ASSERT(a->stolen_completion == NULL);
   gpr_atm current_last_seen_things_queued_ever =
-      gpr_atm_no_barrier_load(&cq->things_queued_ever);
+      gpr_atm_no_barrier_load(&cqd->things_queued_ever);
   if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
-    gpr_mu_lock(cq->mu);
+    gpr_mu_lock(cqd->mu);
     a->last_seen_things_queued_ever =
-        gpr_atm_no_barrier_load(&cq->things_queued_ever);
+        gpr_atm_no_barrier_load(&cqd->things_queued_ever);
     grpc_cq_completion *c;
-    grpc_cq_completion *prev = &cq->completed_head;
+    grpc_cq_completion *prev = &cqd->completed_head;
     while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
-           &cq->completed_head) {
+           &cqd->completed_head) {
       if (c->tag == a->tag) {
         prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
-        if (c == cq->completed_tail) {
-          cq->completed_tail = prev;
+        if (c == cqd->completed_tail) {
+          cqd->completed_tail = prev;
         }
-        gpr_mu_unlock(cq->mu);
+        gpr_mu_unlock(cqd->mu);
         a->stolen_completion = c;
         return true;
       }
       prev = c;
     }
-    gpr_mu_unlock(cq->mu);
+    gpr_mu_unlock(cqd->mu);
   }
   return !a->first_loop &&
          gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
 }
 
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
-                                       gpr_timespec deadline, void *reserved) {
+static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+                     gpr_timespec deadline, void *reserved) {
   grpc_event ret;
   grpc_cq_completion *c;
   grpc_cq_completion *prev;
   grpc_pollset_worker *worker = NULL;
   gpr_timespec now;
+  cq_data *cqd = &cc->data;
 
   GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
 
-  if (cc->completion_type != GRPC_CQ_PLUCK) {
-    gpr_log(GPR_ERROR,
-            "grpc_completion_queue_pluck() cannot be called on this completion "
-            "queue since its completion type is not GRPC_CQ_PLUCK");
-    abort();
-  }
-
   if (grpc_cq_pluck_trace) {
     GRPC_API_TRACE(
         "grpc_completion_queue_pluck("
@@ -707,10 +949,10 @@
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
 
   GRPC_CQ_INTERNAL_REF(cc, "pluck");
-  gpr_mu_lock(cc->mu);
+  gpr_mu_lock(cqd->mu);
   cq_is_finished_arg is_finished_arg = {
       .last_seen_things_queued_ever =
-          gpr_atm_no_barrier_load(&cc->things_queued_ever),
+          gpr_atm_no_barrier_load(&cqd->things_queued_ever),
       .cq = cc,
       .deadline = deadline,
       .stolen_completion = NULL,
@@ -720,7 +962,7 @@
       GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
   for (;;) {
     if (is_finished_arg.stolen_completion != NULL) {
-      gpr_mu_unlock(cc->mu);
+      gpr_mu_unlock(cqd->mu);
       c = is_finished_arg.stolen_completion;
       is_finished_arg.stolen_completion = NULL;
       ret.type = GRPC_OP_COMPLETE;
@@ -729,15 +971,15 @@
       c->done(&exec_ctx, c->done_arg, c);
       break;
     }
-    prev = &cc->completed_head;
+    prev = &cqd->completed_head;
     while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
-           &cc->completed_head) {
+           &cqd->completed_head) {
       if (c->tag == tag) {
         prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
-        if (c == cc->completed_tail) {
-          cc->completed_tail = prev;
+        if (c == cqd->completed_tail) {
+          cqd->completed_tail = prev;
         }
-        gpr_mu_unlock(cc->mu);
+        gpr_mu_unlock(cqd->mu);
         ret.type = GRPC_OP_COMPLETE;
         ret.success = c->next & 1u;
         ret.tag = c->tag;
@@ -746,8 +988,8 @@
       }
       prev = c;
     }
-    if (cc->shutdown) {
-      gpr_mu_unlock(cc->mu);
+    if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+      gpr_mu_unlock(cqd->mu);
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
@@ -757,7 +999,7 @@
               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
               "is %d",
               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
-      gpr_mu_unlock(cc->mu);
+      gpr_mu_unlock(cqd->mu);
       memset(&ret, 0, sizeof(ret));
       /* TODO(ctiller): should we use a different result here */
       ret.type = GRPC_QUEUE_TIMEOUT;
@@ -767,7 +1009,7 @@
     now = gpr_now(GPR_CLOCK_MONOTONIC);
     if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
       del_plucker(cc, tag, &worker);
-      gpr_mu_unlock(cc->mu);
+      gpr_mu_unlock(cqd->mu);
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       dump_pending_tags(cc);
@@ -780,15 +1022,15 @@
     gpr_timespec iteration_deadline = deadline;
     if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
       GPR_TIMER_MARK("alarm_triggered", 0);
-      gpr_mu_unlock(cc->mu);
+      gpr_mu_unlock(cqd->mu);
       grpc_exec_ctx_flush(&exec_ctx);
-      gpr_mu_lock(cc->mu);
+      gpr_mu_lock(cqd->mu);
     } else {
       grpc_error *err = cc->poller_vtable->work(
           &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
       if (err != GRPC_ERROR_NONE) {
         del_plucker(cc, tag, &worker);
-        gpr_mu_unlock(cc->mu);
+        gpr_mu_unlock(cqd->mu);
         const char *msg = grpc_error_string(err);
         gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
 
@@ -813,26 +1055,48 @@
   return ret;
 }
 
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+                                       gpr_timespec deadline, void *reserved) {
+  return cc->vtable->pluck(cc, tag, deadline, reserved);
+}
+
+/* Finishes the completion queue shutdown. This means that there are no more
+   completion events / tags expected from the completion queue
+   - Must be called under completion queue lock
+   - Must be called only once in completion queue's lifetime
+   - grpc_completion_queue_shutdown() MUST have been called before calling this
+     function */
+static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
+                               grpc_completion_queue *cc) {
+  cq_data *cqd = &cc->data;
+
+  GPR_ASSERT(cqd->shutdown_called);
+  GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
+  gpr_atm_no_barrier_store(&cqd->shutdown, 1);
+
+  cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+                              &cqd->pollset_shutdown_done);
+}
+
 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
    to zero here, then enter shutdown mode and wake up any waiters */
 void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
   GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
-  gpr_mu_lock(cc->mu);
-  if (cc->shutdown_called) {
-    gpr_mu_unlock(cc->mu);
+  cq_data *cqd = &cc->data;
+
+  gpr_mu_lock(cqd->mu);
+  if (cqd->shutdown_called) {
+    gpr_mu_unlock(cqd->mu);
     GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
     return;
   }
-  cc->shutdown_called = 1;
-  if (gpr_unref(&cc->pending_events)) {
-    GPR_ASSERT(!cc->shutdown);
-    cc->shutdown = 1;
-    cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
-                                &cc->pollset_shutdown_done);
+  cqd->shutdown_called = 1;
+  if (gpr_unref(&cqd->pending_events)) {
+    cq_finish_shutdown(&exec_ctx, cc);
   }
-  gpr_mu_unlock(cc->mu);
+  gpr_mu_unlock(cqd->mu);
   grpc_exec_ctx_finish(&exec_ctx);
   GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
 }
@@ -841,6 +1105,13 @@
   GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
   GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
   grpc_completion_queue_shutdown(cc);
+
+  /* TODO (sreek): This should not ideally be here. Refactor it into the
+   * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
+  if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
+    GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
+  }
+
   GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
   GPR_TIMER_END("grpc_completion_queue_destroy", 0);
 }
@@ -853,22 +1124,12 @@
   return CQ_FROM_POLLSET(ps);
 }
 
-void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
-  /* TODO: sreek - use cc->polling_type field here and add a validation check
-     (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
-     polling_type is set to GRPC_CQ_NON_LISTENING */
-  cc->is_non_listening_server_cq = 1;
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
+  cc->data.is_server_cq = 1;
 }
 
-bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
-  /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
-  return (cc->is_non_listening_server_cq == 1);
-}
-
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
-
 bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
-  return cc->is_server_cq;
+  return cc->data.is_server_cq;
 }
 
 bool grpc_cq_can_listen(grpc_completion_queue *cc) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index a932087..f7eb148 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -49,6 +49,8 @@
 #endif
 
 typedef struct grpc_cq_completion {
+  gpr_mpscq_node node;
+
   /** user supplied tag */
   void *tag;
   /** done callback - called when this queue element is no longer
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index 9d7f65d..3362510 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -79,10 +79,16 @@
   gpr_free(cq_completion);
 }
 
-/* Queues a completion tag. ZERO polling overhead */
+/* Queues a completion tag if deadline is > 0.
+ * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
 static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
                                 grpc_pollset_worker** worker, gpr_timespec now,
                                 gpr_timespec deadline) {
+  if (gpr_time_cmp(deadline, gpr_time_0(GPR_CLOCK_MONOTONIC)) == 0) {
+    gpr_log(GPR_ERROR, "no-op");
+    return GRPC_ERROR_NONE;
+  }
+
   gpr_mu_unlock(&ps->mu);
   grpc_cq_begin_op(g_cq, g_tag);
   grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL,
@@ -113,6 +119,14 @@
 
 static void teardown() {
   grpc_completion_queue_shutdown(g_cq);
+
+  /* Drain any events */
+  gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
+  while (grpc_completion_queue_next(g_cq, deadline, NULL).type !=
+         GRPC_QUEUE_SHUTDOWN) {
+    /* Do nothing */
+  }
+
   grpc_completion_queue_destroy(g_cq);
 }
 
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 2928b87..1ec6304 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2805,7 +2805,7 @@
   }, 
   {
     "args": [
-      "--benchmark_min_time=0"
+      "--benchmark_min_time=4"
     ], 
     "ci_platforms": [
       "linux",