Multi-completion-queue-server

Allow binding a different completion queue to each registered method.
This will allow multiplexing for the C++ server between sync & async
methods more easily.
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 80b248e..2258819 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -53,7 +53,7 @@
 
 typedef struct listener {
   void *arg;
-  void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+  void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
   void (*destroy)(grpc_server *server, void *arg);
   struct listener *next;
 } listener;
@@ -101,6 +101,7 @@
   char *host;
   call_data *pending;
   requested_call_array requested;
+  grpc_completion_queue *cq;
   registered_method *next;
 };
 
@@ -127,7 +128,11 @@
   size_t channel_filter_count;
   const grpc_channel_filter **channel_filters;
   grpc_channel_args *channel_args;
-  grpc_completion_queue *cq;
+  grpc_completion_queue *unregistered_cq;
+  
+  grpc_completion_queue **cqs;
+  grpc_pollset **pollsets;
+  size_t cq_count;
 
   gpr_mu mu;
 
@@ -169,6 +174,7 @@
   grpc_mdstr *host;
 
   legacy_data *legacy;
+  grpc_completion_queue *cq_new;
 
   call_data **root[CALL_LIST_COUNT];
   call_link links[CALL_LIST_COUNT];
@@ -496,7 +502,7 @@
 static void destroy_call_elem(grpc_call_element *elem) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
-  int i;
+  size_t i;
 
   gpr_mu_lock(&chand->server->mu);
   for (i = 0; i < CALL_LIST_COUNT; i++) {
@@ -504,7 +510,9 @@
   }
   if (chand->server->shutdown && chand->server->have_shutdown_tag &&
       chand->server->lists[ALL_CALLS] == NULL) {
-    grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
+    for (i = 0; i < chand->server->cq_count; i++) {
+      grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
+    }
   }
   gpr_mu_unlock(&chand->server->mu);
 
@@ -557,6 +565,16 @@
     sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
 };
 
+static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+  size_t i, n;
+  for (i = 0; i < server->cq_count; i++) {
+    if (server->cqs[i] == cq) return;
+  }
+  n = server->cq_count++;
+  server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
+  server->cqs[n] = cq;
+}
+
 grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
                                              grpc_channel_filter **filters,
                                              size_t filter_count,
@@ -566,10 +584,11 @@
 
   grpc_server *server = gpr_malloc(sizeof(grpc_server));
   memset(server, 0, sizeof(grpc_server));
+  if (cq) addcq(server, cq);
 
   gpr_mu_init(&server->mu);
 
-  server->cq = cq;
+  server->unregistered_cq = cq;
   /* decremented by grpc_server_destroy */
   gpr_ref_init(&server->internal_refcount, 1);
   server->root_channel_data.next = server->root_channel_data.prev =
@@ -605,7 +624,7 @@
 }
 
 void *grpc_server_register_method(grpc_server *server, const char *method,
-                                  const char *host) {
+                                  const char *host, grpc_completion_queue *cq_new_rpc) {
   registered_method *m;
   if (!method) {
     gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -618,20 +637,28 @@
       return NULL;
     }
   }
+  addcq(server, cq_new_rpc);
   m = gpr_malloc(sizeof(registered_method));
   memset(m, 0, sizeof(*m));
   m->method = gpr_strdup(method);
   m->host = gpr_strdup(host);
   m->next = server->registered_methods;
+  m->cq = cq_new_rpc;
   server->registered_methods = m;
   return m;
 }
 
 void grpc_server_start(grpc_server *server) {
   listener *l;
+  size_t i;
+
+  server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
+  for (i = 0; i < server->cq_count; i++) {
+    server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+  }
 
   for (l = server->listeners; l; l = l->next) {
-    l->start(server, l->arg, grpc_cq_pollset(server->cq));
+    l->start(server, l->arg, server->pollsets, server->cq_count);
   }
 }
 
@@ -664,7 +691,9 @@
   }
   filters[i] = &grpc_connected_channel_filter;
 
-  grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
+  for (i = 0; i < s->cq_count; i++) {
+    grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+  }
 
   channel = grpc_channel_create_from_filters(filters, num_filters,
                                              s->channel_args, mdctx, 0);
@@ -765,9 +794,11 @@
   server->have_shutdown_tag = have_shutdown_tag;
   server->shutdown_tag = shutdown_tag;
   if (have_shutdown_tag) {
-    grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
-    if (server->lists[ALL_CALLS] == NULL) {
-      grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
+    for (i = 0; i < server->cq_count; i++) {
+      grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+      if (server->lists[ALL_CALLS] == NULL) {
+        grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
+      }
     }
   }
   gpr_mu_unlock(&server->mu);
@@ -826,7 +857,7 @@
 
 void grpc_server_add_listener(grpc_server *server, void *arg,
                               void (*start)(grpc_server *server, void *arg,
-                                            grpc_pollset *pollset),
+                                            grpc_pollset **pollsets, size_t pollset_count),
                               void (*destroy)(grpc_server *server, void *arg)) {
   listener *l = gpr_malloc(sizeof(listener));
   l->arg = arg;
@@ -878,7 +909,7 @@
                                          grpc_completion_queue *cq_bind,
                                          void *tag) {
   requested_call rc;
-  grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
+  grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
   rc.type = BATCH_CALL;
   rc.tag = tag;
   rc.data.batch.cq_bind = cq_bind;
@@ -889,12 +920,13 @@
 }
 
 grpc_call_error grpc_server_request_registered_call(
-    grpc_server *server, void *registered_method, grpc_call **call,
+    grpc_server *server, void *rm, grpc_call **call,
     gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
     grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
     void *tag) {
   requested_call rc;
-  grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
+  registered_method *registered_method = rm;
+  grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
   rc.type = REGISTERED_CALL;
   rc.tag = tag;
   rc.data.registered.cq_bind = cq_bind;
@@ -909,7 +941,7 @@
 grpc_call_error grpc_server_request_call_old(grpc_server *server,
                                              void *tag_new) {
   requested_call rc;
-  grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
+  grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
   rc.type = LEGACY_CALL;
   rc.tag = tag_new;
   return queue_call_request(server, &rc);
@@ -965,6 +997,7 @@
       r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
       r->data.recv_metadata = rc->data.batch.initial_metadata;
       r++;
+      calld->cq_new = server->unregistered_cq;
       publish = publish_registered_or_batch;
       break;
     case REGISTERED_CALL:
@@ -979,6 +1012,7 @@
         r->data.recv_message = rc->data.registered.optional_payload;
         r++;
       }
+      calld->cq_new = rc->data.registered.registered_method->cq;
       publish = publish_registered_or_batch;
       break;
   }
@@ -991,19 +1025,19 @@
 static void fail_call(grpc_server *server, requested_call *rc) {
   switch (rc->type) {
     case LEGACY_CALL:
-      grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL,
+      grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
                           NULL, gpr_inf_past, 0, NULL);
       break;
     case BATCH_CALL:
       *rc->data.batch.call = NULL;
       rc->data.batch.initial_metadata->count = 0;
-      grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+      grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
                               GRPC_OP_ERROR);
       break;
     case REGISTERED_CALL:
       *rc->data.registered.call = NULL;
       rc->data.registered.initial_metadata->count = 0;
-      grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+      grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
                               GRPC_OP_ERROR);
       break;
   }
@@ -1017,7 +1051,7 @@
   grpc_server *server = chand->server;
 
   if (status == GRPC_OP_OK) {
-    grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
+    grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
                         grpc_mdstr_as_c_string(calld->path),
                         grpc_mdstr_as_c_string(calld->host), calld->deadline,
                         calld->legacy->initial_metadata.count,
@@ -1032,9 +1066,8 @@
                                         void *tag) {
   grpc_call_element *elem =
       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
-  channel_data *chand = elem->channel_data;
-  grpc_server *server = chand->server;
-  grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
+  call_data *calld = elem->call_data;
+  grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
 }
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {