Work stealing between affinitized cqs
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 4d179d0..54b76d8 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -69,11 +69,6 @@
 typedef struct channel_data channel_data;
 typedef struct registered_method registered_method;
 
-typedef struct {
-  call_data *next;
-  call_data *prev;
-} call_link;
-
 typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
 
 typedef struct requested_call {
@@ -170,10 +165,9 @@
 
 struct request_matcher {
   grpc_server *server;
-  size_t cq_idx;
   call_data *pending_head;
   call_data *pending_tail;
-  gpr_stack_lockfree *requests;
+  gpr_stack_lockfree **requests_per_cq;
 };
 
 struct registered_method {
@@ -182,7 +176,7 @@
   grpc_server_register_method_payload_handling payload_handling;
   uint32_t flags;
   /* one request matcher per method per cq */
-  request_matcher *request_matchers;
+  request_matcher request_matcher;
   registered_method *next;
 };
 
@@ -211,7 +205,7 @@
 
   registered_method *registered_methods;
   /** one request matcher for unregistered methods per cq */
-  request_matcher *unregistered_request_matchers;
+  request_matcher unregistered_request_matcher;
   /** free list of available requested_calls indices */
   gpr_stack_lockfree *request_freelist;
   /** requested call backing data */
@@ -313,16 +307,22 @@
  */
 
 static void request_matcher_init(request_matcher *rm, size_t entries,
-                                 size_t cq_idx, grpc_server *server) {
+                                 grpc_server *server) {
   memset(rm, 0, sizeof(*rm));
   rm->server = server;
-  rm->cq_idx = cq_idx;
-  rm->requests = gpr_stack_lockfree_create(entries);
+  rm->requests_per_cq =
+      gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
+  for (size_t i = 0; i < server->cq_count; i++) {
+    rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
+  }
 }
 
 static void request_matcher_destroy(request_matcher *rm) {
-  GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
-  gpr_stack_lockfree_destroy(rm->requests);
+  for (size_t i = 0; i < rm->server->cq_count; i++) {
+    GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
+    gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
+  }
+  gpr_free(rm->requests_per_cq);
 }
 
 static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) {
@@ -348,9 +348,11 @@
                                           grpc_server *server,
                                           request_matcher *rm) {
   int request_id;
-  while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
-    fail_call(exec_ctx, server, rm->cq_idx,
-              &server->requested_calls[request_id]);
+  for (size_t i = 0; i < server->cq_count; i++) {
+    while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
+           -1) {
+      fail_call(exec_ctx, server, i, &server->requested_calls[request_id]);
+    }
   }
 }
 
@@ -371,23 +373,19 @@
   while ((rm = server->registered_methods) != NULL) {
     server->registered_methods = rm->next;
     if (server->started) {
-      for (i = 0; i < server->cq_count; i++) {
-        request_matcher_destroy(&rm->request_matchers[i]);
-      }
-      gpr_free(rm->request_matchers);
+      request_matcher_destroy(&rm->request_matcher);
     }
     gpr_free(rm->method);
     gpr_free(rm->host);
     gpr_free(rm);
   }
+  if (server->started) {
+    request_matcher_destroy(&server->unregistered_request_matcher);
+  }
   for (i = 0; i < server->cq_count; i++) {
     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
-    if (server->started) {
-      request_matcher_destroy(&server->unregistered_request_matchers[i]);
-    }
   }
   gpr_stack_lockfree_destroy(server->request_freelist);
-  gpr_free(server->unregistered_request_matchers);
   gpr_free(server->cqs);
   gpr_free(server->pollsets);
   gpr_free(server->shutdown_tags);
@@ -506,7 +504,9 @@
 }
 
 static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
-  call_data *calld = arg;
+  grpc_call_element *call_elem = arg;
+  call_data *calld = call_elem->call_data;
+  channel_data *chand = call_elem->channel_data;
   request_matcher *rm = calld->request_matcher;
   grpc_server *server = rm->server;
 
@@ -521,27 +521,34 @@
     return;
   }
 
-  int request_id = gpr_stack_lockfree_pop(rm->requests);
-  if (request_id == -1) {
-    gpr_mu_lock(&server->mu_call);
-    gpr_mu_lock(&calld->mu_state);
-    calld->state = PENDING;
-    gpr_mu_unlock(&calld->mu_state);
-    if (rm->pending_head == NULL) {
-      rm->pending_tail = rm->pending_head = calld;
+  for (size_t i = 0; i < server->cq_count; i++) {
+    size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
+    int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+    if (request_id == -1) {
+      continue;
     } else {
-      rm->pending_tail->pending_next = calld;
-      rm->pending_tail = calld;
+      gpr_mu_lock(&calld->mu_state);
+      calld->state = ACTIVATED;
+      gpr_mu_unlock(&calld->mu_state);
+      publish_call(exec_ctx, server, calld, cq_idx,
+                   &server->requested_calls[request_id]);
+      return; /* early out */
     }
-    calld->pending_next = NULL;
-    gpr_mu_unlock(&server->mu_call);
-  } else {
-    gpr_mu_lock(&calld->mu_state);
-    calld->state = ACTIVATED;
-    gpr_mu_unlock(&calld->mu_state);
-    publish_call(exec_ctx, server, calld, rm->cq_idx,
-                 &server->requested_calls[request_id]);
   }
+
+  /* no cq to take the request found: queue it on the slow list */
+  gpr_mu_lock(&server->mu_call);
+  gpr_mu_lock(&calld->mu_state);
+  calld->state = PENDING;
+  gpr_mu_unlock(&calld->mu_state);
+  if (rm->pending_head == NULL) {
+    rm->pending_tail = rm->pending_head = calld;
+  } else {
+    rm->pending_tail->pending_next = calld;
+    rm->pending_tail = calld;
+  }
+  calld->pending_next = NULL;
+  gpr_mu_unlock(&server->mu_call);
 }
 
 static void finish_start_new_rpc(
@@ -563,14 +570,14 @@
 
   switch (payload_handling) {
     case GRPC_SRM_PAYLOAD_NONE:
-      publish_new_rpc(exec_ctx, calld, true);
+      publish_new_rpc(exec_ctx, elem, true);
       break;
     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
       grpc_op op;
       memset(&op, 0, sizeof(op));
       op.op = GRPC_OP_RECV_MESSAGE;
       op.data.recv_message = &calld->payload;
-      grpc_closure_init(&calld->publish, publish_new_rpc, calld);
+      grpc_closure_init(&calld->publish, publish_new_rpc, elem);
       grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
                                         &calld->publish);
       break;
@@ -599,10 +606,9 @@
       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
           !calld->recv_idempotent_request)
         continue;
-      finish_start_new_rpc(
-          exec_ctx, server, elem,
-          &rm->server_registered_method->request_matchers[chand->cq_idx],
-          rm->server_registered_method->payload_handling);
+      finish_start_new_rpc(exec_ctx, server, elem,
+                           &rm->server_registered_method->request_matcher,
+                           rm->server_registered_method->payload_handling);
       return;
     }
     /* check for a wildcard method definition (no host set) */
@@ -616,15 +622,14 @@
       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
           !calld->recv_idempotent_request)
         continue;
-      finish_start_new_rpc(
-          exec_ctx, server, elem,
-          &rm->server_registered_method->request_matchers[chand->cq_idx],
-          rm->server_registered_method->payload_handling);
+      finish_start_new_rpc(exec_ctx, server, elem,
+                           &rm->server_registered_method->request_matcher,
+                           rm->server_registered_method->payload_handling);
       return;
     }
   }
   finish_start_new_rpc(exec_ctx, server, elem,
-                       &server->unregistered_request_matchers[chand->cq_idx],
+                       &server->unregistered_request_matcher,
                        GRPC_SRM_PAYLOAD_NONE);
 }
 
@@ -655,18 +660,14 @@
 static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
                                      grpc_server *server) {
   if (server->started) {
-    for (size_t i = 0; i < server->cq_count; i++) {
-      request_matcher_kill_requests(exec_ctx, server,
-                                    &server->unregistered_request_matchers[i]);
-      request_matcher_zombify_all_pending_calls(
-          exec_ctx, &server->unregistered_request_matchers[i]);
-      for (registered_method *rm = server->registered_methods; rm;
-           rm = rm->next) {
-        request_matcher_kill_requests(exec_ctx, server,
-                                      &rm->request_matchers[i]);
-        request_matcher_zombify_all_pending_calls(exec_ctx,
-                                                  &rm->request_matchers[i]);
-      }
+    request_matcher_kill_requests(exec_ctx, server,
+                                  &server->unregistered_request_matcher);
+    request_matcher_zombify_all_pending_calls(
+        exec_ctx, &server->unregistered_request_matcher);
+    for (registered_method *rm = server->registered_methods; rm;
+         rm = rm->next) {
+      request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
+      request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
     }
   }
 }
@@ -1046,21 +1047,14 @@
 
   server->started = true;
   server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
-  server->unregistered_request_matchers = gpr_malloc(
-      sizeof(*server->unregistered_request_matchers) * server->cq_count);
   for (i = 0; i < server->cq_count; i++) {
     server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
-    request_matcher_init(&server->unregistered_request_matchers[i],
-                         server->max_requested_calls, i, server);
-    for (registered_method *rm = server->registered_methods; rm;
-         rm = rm->next) {
-      if (i == 0) {
-        rm->request_matchers =
-            gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count);
-      }
-      request_matcher_init(&rm->request_matchers[i],
-                           server->max_requested_calls, i, server);
-    }
+  }
+  request_matcher_init(&server->unregistered_request_matcher,
+                       server->max_requested_calls, server);
+  for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
+    request_matcher_init(&rm->request_matcher, server->max_requested_calls,
+                         server);
   }
 
   for (l = server->listeners; l; l = l->next) {
@@ -1295,20 +1289,20 @@
   }
   switch (rc->type) {
     case BATCH_CALL:
-      rm = &server->unregistered_request_matchers[cq_idx];
+      rm = &server->unregistered_request_matcher;
       break;
     case REGISTERED_CALL:
-      rm = &rc->data.registered.registered_method->request_matchers[cq_idx];
+      rm = &rc->data.registered.registered_method->request_matcher;
       break;
   }
   server->requested_calls[request_id] = *rc;
   gpr_free(rc);
-  if (gpr_stack_lockfree_push(rm->requests, request_id)) {
+  if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
     /* this was the first queued request: we need to lock and start
        matching calls */
     gpr_mu_lock(&server->mu_call);
     while ((calld = rm->pending_head) != NULL) {
-      request_id = gpr_stack_lockfree_pop(rm->requests);
+      request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
       if (request_id == -1) break;
       rm->pending_head = calld->pending_next;
       gpr_mu_unlock(&server->mu_call);