Merge github.com:grpc/grpc into serve_fries
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7ac2334..5a85776 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -195,7 +195,10 @@
 
   struct SyncServerSettings {
     SyncServerSettings()
-        : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
+        : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
+          min_pollers(1),
+          max_pollers(2),
+          cq_timeout_msec(10000) {}
 
     // Number of server completion queues to create to listen to incoming RPCs.
     int num_cqs;
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 5b93232..6a16e43 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -46,11 +46,12 @@
   GPR_ASSERT(q->tail == &q->stub);
 }
 
-void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
+bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
   gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
   gpr_mpscq_node *prev =
       (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
   gpr_atm_rel_store(&prev->next, (gpr_atm)n);
+  return prev == &q->stub;
 }
 
 gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
@@ -81,3 +82,25 @@
   // indicates a retry is in order: we're still adding
   return NULL;
 }
+
+void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
+  gpr_mpscq_init(&q->queue);
+  q->read_lock = GPR_SPINLOCK_INITIALIZER;
+}
+
+void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
+  gpr_mpscq_destroy(&q->queue);
+}
+
+bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
+  return gpr_mpscq_push(&q->queue, n);
+}
+
+gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
+  if (gpr_spinlock_trylock(&q->read_lock)) {
+    gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
+    gpr_spinlock_unlock(&q->read_lock);
+    return n;
+  }
+  return NULL;
+}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 977a117..cb6456f 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -35,7 +35,9 @@
 #define GRPC_CORE_LIB_SUPPORT_MPSCQ_H
 
 #include <grpc/support/atm.h>
+#include <stdbool.h>
 #include <stddef.h>
+#include "src/core/lib/support/spinlock.h"
 
 // Multiple-producer single-consumer lock free queue, based upon the
 // implementation from Dmitry Vyukov here:
@@ -57,9 +59,32 @@
 void gpr_mpscq_init(gpr_mpscq *q);
 void gpr_mpscq_destroy(gpr_mpscq *q);
 // Push a node
-void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
+// Thread safe - can be called from multiple threads concurrently
+// Returns true if this was possibly the first node (may return true
+// sporadically, will not return false sporadically)
+bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
 // Pop a node (returns NULL if no node is ready - which doesn't indicate that
 // the queue is empty!!)
+// Thread compatible - can only be called from one thread at a time
 gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
 
+// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
+// only one thread will succeed concurrently
+typedef struct gpr_locked_mpscq {
+  gpr_mpscq queue;
+  gpr_spinlock read_lock;
+} gpr_locked_mpscq;
+
+void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
+void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
+// Push a node
+// Thread safe - can be called from multiple threads concurrently
+// Returns true if this was possibly the first node (may return true
+// sporadically, will not return false sporadically)
+bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
+// Pop a node (returns NULL if no node is ready - which doesn't indicate that
+// the queue is empty!!)
+// Thread safe - can be called from multiple threads concurrently
+gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);
+
 #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 934ca04..4ab5381 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -47,7 +47,8 @@
 #include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/iomgr/iomgr.h"
 #include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/stack_lockfree.h"
+#include "src/core/lib/support/mpscq.h"
+#include "src/core/lib/support/spinlock.h"
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/surface/api_trace.h"
 #include "src/core/lib/surface/call.h"
@@ -76,6 +77,7 @@
 int grpc_server_channel_trace = 0;
 
 typedef struct requested_call {
+  gpr_mpscq_node request_link; /* must be first */
   requested_call_type type;
   size_t cq_idx;
   void *tag;
@@ -175,7 +177,7 @@
   grpc_server *server;
   call_data *pending_head;
   call_data *pending_tail;
-  gpr_stack_lockfree **requests_per_cq;
+  gpr_locked_mpscq *requests_per_cq;
 };
 
 struct registered_method {
@@ -220,11 +222,6 @@
   registered_method *registered_methods;
   /** one request matcher for unregistered methods */
   request_matcher unregistered_request_matcher;
-  /** free list of available requested_calls_per_cq indices */
-  gpr_stack_lockfree **request_freelist_per_cq;
-  /** requested call backing data */
-  requested_call **requested_calls_per_cq;
-  int max_requested_calls_per_cq;
 
   gpr_atm shutdown_flag;
   uint8_t shutdown_published;
@@ -324,21 +321,20 @@
  * request_matcher
  */
 
-static void request_matcher_init(request_matcher *rm, size_t entries,
-                                 grpc_server *server) {
+static void request_matcher_init(request_matcher *rm, grpc_server *server) {
   memset(rm, 0, sizeof(*rm));
   rm->server = server;
   rm->requests_per_cq =
       gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
   for (size_t i = 0; i < server->cq_count; i++) {
-    rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
+    gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
   }
 }
 
 static void request_matcher_destroy(request_matcher *rm) {
   for (size_t i = 0; i < rm->server->cq_count; i++) {
-    GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
-    gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
+    GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
+    gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
   }
   gpr_free(rm->requests_per_cq);
 }
@@ -368,13 +364,17 @@
                                           grpc_server *server,
                                           request_matcher *rm,
                                           grpc_error *error) {
-  int request_id;
+  requested_call *rc;
   for (size_t i = 0; i < server->cq_count; i++) {
-    while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-           -1) {
-      fail_call(exec_ctx, server, i,
-                &server->requested_calls_per_cq[i][request_id],
-                GRPC_ERROR_REF(error));
+    /* Here we know:
+       1. no requests are being added (since the server is shut down)
+       2. no other threads are pulling (since the shut down process is single
+          threaded)
+       So, we can ignore the queue lock and just pop, with the guarantee that a
+       NULL returned here truly means that the queue is empty */
+    while ((rc = (requested_call *)gpr_mpscq_pop(
+                &rm->requests_per_cq[i].queue)) != NULL) {
+      fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
     }
   }
   GRPC_ERROR_UNREF(error);
@@ -409,13 +409,7 @@
   }
   for (i = 0; i < server->cq_count; i++) {
     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
-    if (server->started) {
-      gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
-      gpr_free(server->requested_calls_per_cq[i]);
-    }
   }
-  gpr_free(server->request_freelist_per_cq);
-  gpr_free(server->requested_calls_per_cq);
   gpr_free(server->cqs);
   gpr_free(server->pollsets);
   gpr_free(server->shutdown_tags);
@@ -473,21 +467,7 @@
 
 static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
                                grpc_cq_completion *c) {
-  requested_call *rc = req;
-  grpc_server *server = rc->server;
-
-  if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
-      rc < server->requested_calls_per_cq[rc->cq_idx] +
-               server->max_requested_calls_per_cq) {
-    GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
-    gpr_stack_lockfree_push(
-        server->request_freelist_per_cq[rc->cq_idx],
-        (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
-  } else {
-    gpr_free(req);
-  }
-
-  server_unref(exec_ctx, server);
+  gpr_free(req);
 }
 
 static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -516,10 +496,6 @@
       GPR_UNREACHABLE_CODE(return );
   }
 
-  grpc_call_element *elem =
-      grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
-  channel_data *chand = elem->channel_data;
-  server_ref(chand->server);
   grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
                  done_request_event, rc, &rc->completion);
 }
@@ -547,15 +523,15 @@
 
   for (size_t i = 0; i < server->cq_count; i++) {
     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
-    int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
-    if (request_id == -1) {
+    requested_call *rc =
+        (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+    if (rc == NULL) {
       continue;
     } else {
       gpr_mu_lock(&calld->mu_state);
       calld->state = ACTIVATED;
       gpr_mu_unlock(&calld->mu_state);
-      publish_call(exec_ctx, server, calld, cq_idx,
-                   &server->requested_calls_per_cq[cq_idx][request_id]);
+      publish_call(exec_ctx, server, calld, cq_idx, rc);
       return; /* early out */
     }
   }
@@ -1029,8 +1005,6 @@
   server->root_channel_data.next = server->root_channel_data.prev =
       &server->root_channel_data;
 
-  /* TODO(ctiller): expose a channel_arg for this */
-  server->max_requested_calls_per_cq = 32768;
   server->channel_args = grpc_channel_args_copy(args);
 
   return server;
@@ -1103,29 +1077,15 @@
   server->started = true;
   server->pollset_count = 0;
   server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
-  server->request_freelist_per_cq =
-      gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
-  server->requested_calls_per_cq =
-      gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
   for (i = 0; i < server->cq_count; i++) {
     if (grpc_cq_can_listen(server->cqs[i])) {
       server->pollsets[server->pollset_count++] =
           grpc_cq_pollset(server->cqs[i]);
     }
-    server->request_freelist_per_cq[i] =
-        gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
-    for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
-      gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
-    }
-    server->requested_calls_per_cq[i] =
-        gpr_malloc((size_t)server->max_requested_calls_per_cq *
-                   sizeof(*server->requested_calls_per_cq[i]));
   }
-  request_matcher_init(&server->unregistered_request_matcher,
-                       (size_t)server->max_requested_calls_per_cq, server);
+  request_matcher_init(&server->unregistered_request_matcher, server);
   for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
-    request_matcher_init(&rm->request_matcher,
-                         (size_t)server->max_requested_calls_per_cq, server);
+    request_matcher_init(&rm->request_matcher, server);
   }
 
   server_ref(server);
@@ -1379,21 +1339,11 @@
                                           requested_call *rc) {
   call_data *calld = NULL;
   request_matcher *rm = NULL;
-  int request_id;
   if (gpr_atm_acq_load(&server->shutdown_flag)) {
     fail_call(exec_ctx, server, cq_idx, rc,
               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
     return GRPC_CALL_OK;
   }
-  request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
-  if (request_id == -1) {
-    /* out of request ids: just fail this one */
-    fail_call(exec_ctx, server, cq_idx, rc,
-              grpc_error_set_int(
-                  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
-                  GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
-    return GRPC_CALL_OK;
-  }
   switch (rc->type) {
     case BATCH_CALL:
       rm = &server->unregistered_request_matcher;
@@ -1402,15 +1352,13 @@
       rm = &rc->data.registered.registered_method->request_matcher;
       break;
   }
-  server->requested_calls_per_cq[cq_idx][request_id] = *rc;
-  gpr_free(rc);
-  if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
+  if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
     /* this was the first queued request: we need to lock and start
        matching calls */
     gpr_mu_lock(&server->mu_call);
     while ((calld = rm->pending_head) != NULL) {
-      request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
-      if (request_id == -1) break;
+      rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+      if (rc == NULL) break;
       rm->pending_head = calld->pending_next;
       gpr_mu_unlock(&server->mu_call);
       gpr_mu_lock(&calld->mu_state);
@@ -1426,8 +1374,7 @@
         GPR_ASSERT(calld->state == PENDING);
         calld->state = ACTIVATED;
         gpr_mu_unlock(&calld->mu_state);
-        publish_call(exec_ctx, server, calld, cq_idx,
-                     &server->requested_calls_per_cq[cq_idx][request_id]);
+        publish_call(exec_ctx, server, calld, cq_idx, rc);
       }
       gpr_mu_lock(&server->mu_call);
     }
@@ -1534,7 +1481,6 @@
   rc->initial_metadata->count = 0;
   GPR_ASSERT(error != GRPC_ERROR_NONE);
 
-  server_ref(server);
   grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
                  done_request_event, rc, &rc->completion);
 }