Further server cq affinity work
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 698b2be..26b0f00 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -52,7 +52,7 @@
 #include "src/core/lib/surface/api_trace.h"
 #include "src/core/lib/surface/server.h"
 
-typedef struct grpc_server_secure_state {
+typedef struct server_secure_state {
   grpc_server *server;
   grpc_tcp_server *tcp;
   grpc_server_security_connector *sc;
@@ -62,13 +62,16 @@
   gpr_refcount refcount;
   grpc_closure destroy_closure;
   grpc_closure *destroy_callback;
-} grpc_server_secure_state;
+} server_secure_state;
 
-static void state_ref(grpc_server_secure_state *state) {
-  gpr_ref(&state->refcount);
-}
+typedef struct server_secure_connect {
+  server_secure_state *state;
+  grpc_pollset *accepting_pollset;
+} server_secure_connect;
 
-static void state_unref(grpc_server_secure_state *state) {
+static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); }
+
+static void state_unref(server_secure_state *state) {
   if (gpr_unref(&state->refcount)) {
     /* ensure all threads have unlocked */
     gpr_mu_lock(&state->mu);
@@ -80,67 +83,66 @@
   }
 }
 
-static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep,
-                            grpc_transport *transport,
-                            grpc_auth_context *auth_context) {
-  grpc_server_secure_state *state = statep;
-  grpc_channel_args *args_copy;
-  grpc_arg args_to_add[2];
-  args_to_add[0] = grpc_server_credentials_to_arg(state->creds);
-  args_to_add[1] = grpc_auth_context_to_arg(auth_context);
-  args_copy = grpc_channel_args_copy_and_add(
-      grpc_server_get_channel_args(state->server), args_to_add,
-      GPR_ARRAY_SIZE(args_to_add));
-  grpc_server_setup_transport(exec_ctx, state->server, transport, args_copy);
-  grpc_channel_args_destroy(args_copy);
-}
-
 static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
                                      grpc_security_status status,
                                      grpc_endpoint *secure_endpoint,
                                      grpc_auth_context *auth_context) {
-  grpc_server_secure_state *state = statep;
+  server_secure_connect *state = statep;
   grpc_transport *transport;
   if (status == GRPC_SECURITY_OK) {
     if (secure_endpoint) {
-      gpr_mu_lock(&state->mu);
-      if (!state->is_shutdown) {
+      gpr_mu_lock(&state->state->mu);
+      if (!state->state->is_shutdown) {
         transport = grpc_create_chttp2_transport(
-            exec_ctx, grpc_server_get_channel_args(state->server),
+            exec_ctx, grpc_server_get_channel_args(state->state->server),
             secure_endpoint, 0);
-        setup_transport(exec_ctx, state, transport, auth_context);
+        grpc_channel_args *args_copy;
+        grpc_arg args_to_add[2];
+        args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds);
+        args_to_add[1] = grpc_auth_context_to_arg(auth_context);
+        args_copy = grpc_channel_args_copy_and_add(
+            grpc_server_get_channel_args(state->state->server), args_to_add,
+            GPR_ARRAY_SIZE(args_to_add));
+        grpc_server_setup_transport(exec_ctx, state->state->server, transport,
+                                    state->accepting_pollset, args_copy);
+        grpc_channel_args_destroy(args_copy);
         grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
       } else {
         /* We need to consume this here, because the server may already have
          * gone away. */
         grpc_endpoint_destroy(exec_ctx, secure_endpoint);
       }
-      gpr_mu_unlock(&state->mu);
+      gpr_mu_unlock(&state->state->mu);
     }
   } else {
     gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
   }
-  state_unref(state);
+  state_unref(state->state);
+  gpr_free(state);
 }
 
 static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
+                      grpc_pollset *accepting_pollset,
                       grpc_tcp_server_acceptor *acceptor) {
-  grpc_server_secure_state *state = statep;
-  state_ref(state);
-  grpc_server_security_connector_do_handshake(
-      exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state);
+  server_secure_connect *state = gpr_malloc(sizeof(*state));
+  state->state = statep;
+  state_ref(state->state);
+  state->accepting_pollset = accepting_pollset;
+  grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc,
+                                              acceptor, tcp,
+                                              on_secure_handshake_done, state);
 }
 
 /* Server callback: start listening on our ports */
 static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
                   grpc_pollset **pollsets, size_t pollset_count) {
-  grpc_server_secure_state *state = statep;
+  server_secure_state *state = statep;
   grpc_tcp_server_start(exec_ctx, state->tcp, pollsets, pollset_count,
                         on_accept, state);
 }
 
 static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
-  grpc_server_secure_state *state = statep;
+  server_secure_state *state = statep;
   if (state->destroy_callback != NULL) {
     state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
                                 success);
@@ -153,7 +155,7 @@
    callbacks) */
 static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
                     grpc_closure *callback) {
-  grpc_server_secure_state *state = statep;
+  server_secure_state *state = statep;
   grpc_tcp_server *tcp;
   gpr_mu_lock(&state->mu);
   state->is_shutdown = 1;
@@ -167,7 +169,7 @@
                                       grpc_server_credentials *creds) {
   grpc_resolved_addresses *resolved = NULL;
   grpc_tcp_server *tcp = NULL;
-  grpc_server_secure_state *state = NULL;
+  server_secure_state *state = NULL;
   size_t i;
   unsigned count = 0;
   int port_num = -1;
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 1528ca4..3d0dd13 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -81,6 +81,7 @@
                     void *done_arg, grpc_cq_completion *storage);
 
 grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
 
 void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
 bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index f1a031b..d1fb3fc 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -81,7 +81,6 @@
   void *tag;
   grpc_server *server;
   grpc_completion_queue *cq_bound_to_call;
-  grpc_completion_queue *cq_for_notification;
   grpc_call **call;
   grpc_cq_completion completion;
   grpc_metadata_array *initial_metadata;
@@ -171,6 +170,7 @@
 
 struct request_matcher {
   grpc_server *server;
+  size_t cq_idx;
   call_data *pending_head;
   call_data *pending_tail;
   gpr_stack_lockfree *requests;
@@ -237,7 +237,7 @@
 
 static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                      requested_call *rc);
+                      size_t cq_idx, requested_call *rc);
 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
    hold mu_call */
 static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
@@ -312,9 +312,10 @@
  */
 
 static void request_matcher_init(request_matcher *rm, size_t entries,
-                                 grpc_server *server) {
+                                 size_t cq_idx, grpc_server *server) {
   memset(rm, 0, sizeof(*rm));
   rm->server = server;
+  rm->cq_idx = cq_idx;
   rm->requests = gpr_stack_lockfree_create(entries);
 }
 
@@ -347,7 +348,8 @@
                                           request_matcher *rm) {
   int request_id;
   while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
-    fail_call(exec_ctx, server, &server->requested_calls[request_id]);
+    fail_call(exec_ctx, server, rm->cq_idx,
+              &server->requested_calls[request_id]);
   }
 }
 
@@ -458,11 +460,11 @@
 }
 
 static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                         call_data *calld, requested_call *rc) {
+                         call_data *calld, size_t cq_idx, requested_call *rc) {
   grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
   grpc_call *call = calld->call;
   *rc->call = call;
-  calld->cq_new = rc->cq_for_notification;
+  calld->cq_new = server->cqs[cq_idx];
   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
   switch (rc->type) {
     case BATCH_CALL:
@@ -530,7 +532,8 @@
     gpr_mu_lock(&calld->mu_state);
     calld->state = ACTIVATED;
     gpr_mu_unlock(&calld->mu_state);
-    publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+    publish_call(exec_ctx, server, calld, rm->cq_idx,
+                 &server->requested_calls[request_id]);
   }
 }
 
@@ -972,8 +975,6 @@
   for (i = 0; i < (size_t)server->max_requested_calls; i++) {
     gpr_stack_lockfree_push(server->request_freelist, (int)i);
   }
-  request_matcher_init(&server->unregistered_request_matcher,
-                       server->max_requested_calls, server);
   server->requested_calls = gpr_malloc(server->max_requested_calls *
                                        sizeof(*server->requested_calls));
 
@@ -1017,8 +1018,6 @@
   }
   m = gpr_malloc(sizeof(registered_method));
   memset(m, 0, sizeof(*m));
-  request_matcher_init(&m->request_matcher, server->max_requested_calls,
-                       server);
   m->method = gpr_strdup(method);
   m->host = gpr_strdup(host);
   m->next = server->registered_methods;
@@ -1036,8 +1035,21 @@
   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
 
   server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+  server->unregistered_request_matchers = gpr_malloc(
+      sizeof(*server->unregistered_request_matchers) * server->cq_count);
   for (i = 0; i < server->cq_count; i++) {
     server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+    request_matcher_init(&server->unregistered_request_matchers[i],
+                         server->max_requested_calls, i, server);
+    for (registered_method *rm = server->registered_methods; rm;
+         rm = rm->next) {
+      if (i == 0) {
+        rm->request_matchers =
+            gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count);
+      }
+      request_matcher_init(&rm->request_matchers[i],
+                           server->max_requested_calls, i, server);
+    }
   }
 
   for (l = server->listeners; l; l = l->next) {
@@ -1074,6 +1086,17 @@
   server_ref(s);
   chand->channel = channel;
 
+  size_t cq_idx;
+  grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
+  for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
+    if (s->cqs[cq_idx] == accepting_cq) break;
+  }
+  if (cq_idx == s->cq_count) {
+    /* completion queue not found: pick a random one to publish new calls to */
+    cq_idx = (size_t)rand() % s->cq_count;
+  }
+  chand->cq_idx = cq_idx;
+
   num_registered_methods = 0;
   for (rm = s->registered_methods; rm; rm = rm->next) {
     num_registered_methods++;
@@ -1244,27 +1267,27 @@
 }
 
 static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
-                                          grpc_server *server,
+                                          grpc_server *server, size_t cq_idx,
                                           requested_call *rc) {
   call_data *calld = NULL;
   request_matcher *rm = NULL;
   int request_id;
   if (gpr_atm_acq_load(&server->shutdown_flag)) {
-    fail_call(exec_ctx, server, rc);
+    fail_call(exec_ctx, server, cq_idx, rc);
     return GRPC_CALL_OK;
   }
   request_id = gpr_stack_lockfree_pop(server->request_freelist);
   if (request_id == -1) {
     /* out of request ids: just fail this one */
-    fail_call(exec_ctx, server, rc);
+    fail_call(exec_ctx, server, cq_idx, rc);
     return GRPC_CALL_OK;
   }
   switch (rc->type) {
     case BATCH_CALL:
-      rm = &server->unregistered_request_matcher;
+      rm = &server->unregistered_request_matchers[cq_idx];
       break;
     case REGISTERED_CALL:
-      rm = &rc->data.registered.registered_method->request_matcher;
+      rm = &rc->data.registered.registered_method->request_matchers[cq_idx];
       break;
   }
   server->requested_calls[request_id] = *rc;
@@ -1290,7 +1313,7 @@
         GPR_ASSERT(calld->state == PENDING);
         calld->state = ACTIVATED;
         gpr_mu_unlock(&calld->mu_state);
-        publish_call(exec_ctx, server, calld,
+        publish_call(exec_ctx, server, calld, cq_idx,
                      &server->requested_calls[request_id]);
       }
       gpr_mu_lock(&server->mu_call);
@@ -1314,7 +1337,13 @@
       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
       7, (server, call, details, initial_metadata, cq_bound_to_call,
           cq_for_notification, tag));
-  if (!grpc_cq_is_server_cq(cq_for_notification)) {
+  size_t cq_idx;
+  for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+    if (server->cqs[cq_idx] == cq_for_notification) {
+      break;
+    }
+  }
+  if (cq_idx == server->cq_count) {
     gpr_free(rc);
     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
     goto done;
@@ -1325,11 +1354,10 @@
   rc->server = server;
   rc->tag = tag;
   rc->cq_bound_to_call = cq_bound_to_call;
-  rc->cq_for_notification = cq_for_notification;
   rc->call = call;
   rc->data.batch.details = details;
   rc->initial_metadata = initial_metadata;
-  error = queue_call_request(&exec_ctx, server, rc);
+  error = queue_call_request(&exec_ctx, server, cq_idx, rc);
 done:
   grpc_exec_ctx_finish(&exec_ctx);
   return error;
@@ -1351,7 +1379,14 @@
       "tag=%p)",
       9, (server, rmp, call, deadline, initial_metadata, optional_payload,
           cq_bound_to_call, cq_for_notification, tag));
-  if (!grpc_cq_is_server_cq(cq_for_notification)) {
+
+  size_t cq_idx;
+  for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+    if (server->cqs[cq_idx] == cq_for_notification) {
+      break;
+    }
+  }
+  if (cq_idx == server->cq_count) {
     gpr_free(rc);
     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
     goto done;
@@ -1367,26 +1402,25 @@
   rc->server = server;
   rc->tag = tag;
   rc->cq_bound_to_call = cq_bound_to_call;
-  rc->cq_for_notification = cq_for_notification;
   rc->call = call;
   rc->data.registered.registered_method = rm;
   rc->data.registered.deadline = deadline;
   rc->initial_metadata = initial_metadata;
   rc->data.registered.optional_payload = optional_payload;
-  error = queue_call_request(&exec_ctx, server, rc);
+  error = queue_call_request(&exec_ctx, server, cq_idx, rc);
 done:
   grpc_exec_ctx_finish(&exec_ctx);
   return error;
 }
 
 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                      requested_call *rc) {
+                      size_t cq_idx, requested_call *rc) {
   *rc->call = NULL;
   rc->initial_metadata->count = 0;
 
   server_ref(server);
-  grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
-                 done_request_event, rc, &rc->completion);
+  grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event,
+                 rc, &rc->completion);
 }
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {