Add a test with multiple threads popping from the mpscq
diff --git a/test/core/support/mpscq_test.c b/test/core/support/mpscq_test.c
index 0b4bc25..491eb91 100644
--- a/test/core/support/mpscq_test.c
+++ b/test/core/support/mpscq_test.c
@@ -37,6 +37,7 @@
 
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
+#include <grpc/support/sync.h>
 #include <grpc/support/thd.h>
 #include <grpc/support/useful.h>
 #include "test/core/util/test_config.h"
@@ -72,12 +73,14 @@
 typedef struct {
   size_t ctr;
   gpr_mpscq *q;
+  gpr_event *start;
 } thd_args;
 
 #define THREAD_ITERATIONS 100000
 
 static void test_thread(void *args) {
   thd_args *a = args;
+  gpr_event_wait(a->start, gpr_inf_future(GPR_CLOCK_REALTIME));
   for (size_t i = 1; i <= THREAD_ITERATIONS; i++) {
     gpr_mpscq_push(a->q, &new_node(i, &a->ctr)->node);
   }
@@ -85,6 +88,8 @@
 
 static void test_mt(void) {
   gpr_log(GPR_DEBUG, "test_mt");
+  gpr_event start;
+  gpr_event_init(&start);
   gpr_thd_id thds[100];
   thd_args ta[GPR_ARRAY_SIZE(thds)];
   gpr_mpscq q;
@@ -94,10 +99,12 @@
     gpr_thd_options_set_joinable(&options);
     ta[i].ctr = 0;
     ta[i].q = &q;
+    ta[i].start = &start;
     GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options));
   }
   size_t num_done = 0;
   size_t spins = 0;
+  gpr_event_set(&start, (void *)1);
   while (num_done != GPR_ARRAY_SIZE(thds)) {
     gpr_mpscq_node *n;
     while ((n = gpr_mpscq_pop(&q)) == NULL) {
@@ -116,9 +123,84 @@
   gpr_mpscq_destroy(&q);
 }
 
+typedef struct {
+  thd_args *ta;
+  size_t num_thds;
+  gpr_mu mu;
+  size_t num_done;
+  size_t spins;
+  gpr_mpscq *q;
+  gpr_event *start;
+} pull_args;
+
+static void pull_thread(void *arg) {
+  pull_args *pa = arg;
+  gpr_event_wait(pa->start, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+  for (;;) {
+    gpr_mu_lock(&pa->mu);
+    if (pa->num_done == pa->num_thds) {
+      gpr_mu_unlock(&pa->mu);
+      return;
+    }
+    gpr_mpscq_node *n;
+    while ((n = gpr_mpscq_pop(pa->q)) == NULL) {
+      pa->spins++;
+    }
+    test_node *tn = (test_node *)n;
+    GPR_ASSERT(*tn->ctr == tn->i - 1);
+    *tn->ctr = tn->i;
+    if (tn->i == THREAD_ITERATIONS) pa->num_done++;
+    gpr_free(tn);
+    gpr_mu_unlock(&pa->mu);
+  }
+}
+
+static void test_mt_multipop(void) {
+  gpr_log(GPR_DEBUG, "test_mt_multipop");
+  gpr_event start;
+  gpr_event_init(&start);
+  gpr_thd_id thds[100];
+  gpr_thd_id pull_thds[100];
+  thd_args ta[GPR_ARRAY_SIZE(thds)];
+  gpr_mpscq q;
+  gpr_mpscq_init(&q);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+    gpr_thd_options options = gpr_thd_options_default();
+    gpr_thd_options_set_joinable(&options);
+    ta[i].ctr = 0;
+    ta[i].q = &q;
+    ta[i].start = &start;
+    GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options));
+  }
+  pull_args pa;
+  pa.ta = ta;
+  pa.num_thds = GPR_ARRAY_SIZE(thds);
+  pa.spins = 0;
+  pa.num_done = 0;
+  pa.q = &q;
+  pa.start = &start;
+  gpr_mu_init(&pa.mu);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
+    gpr_thd_options options = gpr_thd_options_default();
+    gpr_thd_options_set_joinable(&options);
+    GPR_ASSERT(gpr_thd_new(&pull_thds[i], pull_thread, &pa, &options));
+  }
+  gpr_event_set(&start, (void *)1);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
+    gpr_thd_join(pull_thds[i]);
+  }
+  gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
+    gpr_thd_join(thds[i]);
+  }
+  gpr_mpscq_destroy(&q);
+}
+
 int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
   test_serial();
   test_mt();
+  test_mt_multipop();
   return 0;
 }