Rewrite completion queue internals to use pre-allocation of events
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 341ca29..2c9115e 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,14 @@
 
 typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
 
-typedef struct {
+typedef struct requested_call {
   requested_call_type type;
+  struct requested_call *next;
   void *tag;
   grpc_completion_queue *cq_bound_to_call;
   grpc_completion_queue *cq_for_notification;
   grpc_call **call;
+  grpc_cq_completion completion;
   union {
     struct {
       grpc_call_details *details;
@@ -92,17 +94,11 @@
   } data;
 } requested_call;
 
-typedef struct {
-  requested_call *calls;
-  size_t count;
-  size_t capacity;
-} requested_call_array;
-
 struct registered_method {
   char *method;
   char *host;
   call_data *pending;
-  requested_call_array requested;
+  requested_call *requests;
   registered_method *next;
 };
 
@@ -131,6 +127,7 @@
 typedef struct shutdown_tag {
   void *tag;
   grpc_completion_queue *cq;
+  grpc_cq_completion completion;
 } shutdown_tag;
 
 struct grpc_server {
@@ -153,7 +150,7 @@
   gpr_mu mu_call;   /* mutex for call-specific state */
 
   registered_method *registered_methods;
-  requested_call_array requested_calls;
+  requested_call *requests;
 
   gpr_uint8 shutdown;
   gpr_uint8 shutdown_published;
@@ -325,22 +322,6 @@
   return 1;
 }
 
-static void requested_call_array_destroy(requested_call_array *array) {
-  gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
-  requested_call *rc;
-  if (array->count == array->capacity) {
-    array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
-    array->calls =
-        gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
-  }
-  rc = &array->calls[array->count++];
-  memset(rc, 0, sizeof(*rc));
-  return rc;
-}
-
 static void server_ref(grpc_server *server) {
   gpr_ref(&server->internal_refcount);
 }
@@ -352,12 +333,10 @@
   gpr_mu_destroy(&server->mu_global);
   gpr_mu_destroy(&server->mu_call);
   gpr_free(server->channel_filters);
-  requested_call_array_destroy(&server->requested_calls);
   while ((rm = server->registered_methods) != NULL) {
     server->registered_methods = rm->next;
     gpr_free(rm->method);
     gpr_free(rm->host);
-    requested_call_array_destroy(&rm->requested);
     gpr_free(rm);
   }
   for (i = 0; i < server->cq_count; i++) {
@@ -406,18 +385,18 @@
 static void finish_start_new_rpc_and_unlock(grpc_server *server,
                                             grpc_call_element *elem,
                                             call_data **pending_root,
-                                            requested_call_array *array) {
-  requested_call rc;
+                                            requested_call **requests) {
+  requested_call *rc = *requests;
   call_data *calld = elem->call_data;
-  if (array->count == 0) {
+  if (rc == NULL) {
     calld->state = PENDING;
     call_list_join(pending_root, calld, PENDING_START);
     gpr_mu_unlock(&server->mu_call);
   } else {
-    rc = array->calls[--array->count];
+    *requests = rc->next;
     calld->state = ACTIVATED;
     gpr_mu_unlock(&server->mu_call);
-    begin_call(server, calld, &rc);
+    begin_call(server, calld, rc);
   }
 }
 
@@ -442,7 +421,7 @@
       if (rm->method != calld->path) continue;
       finish_start_new_rpc_and_unlock(server, elem,
                                       &rm->server_registered_method->pending,
-                                      &rm->server_registered_method->requested);
+                                      &rm->server_registered_method->requests);
       return;
     }
     /* check for a wildcard method definition (no host set) */
@@ -455,12 +434,12 @@
       if (rm->method != calld->path) continue;
       finish_start_new_rpc_and_unlock(server, elem,
                                       &rm->server_registered_method->pending,
-                                      &rm->server_registered_method->requested);
+                                      &rm->server_registered_method->requests);
       return;
     }
   }
   finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
-                                  &server->requested_calls);
+                                  &server->requests);
 }
 
 static void kill_zombie(void *elem, int success) {
@@ -476,6 +455,10 @@
   return n;
 }
 
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+  server_unref(server);
+}
+
 static void maybe_finish_shutdown(grpc_server *server) {
   size_t i;
   if (!server->shutdown || server->shutdown_published) {
@@ -494,8 +477,14 @@
   }
   server->shutdown_published = 1;
   for (i = 0; i < server->num_shutdown_tags; i++) {
-    grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
-                   NULL, 1);
+    server_ref(server);
+    grpc_cq_end_op(server->shutdown_tags[i].cq, 
+                   server->shutdown_tags[i].tag,
+                   1,
+                   done_shutdown_event,
+                   server,
+                   &server->shutdown_tags[i].completion
+                   );
   }
 }
 
@@ -910,15 +899,14 @@
 void grpc_server_shutdown_and_notify(grpc_server *server,
                                      grpc_completion_queue *cq, void *tag) {
   listener *l;
-  requested_call_array requested_calls;
-  size_t i;
+  requested_call *requests = NULL;
   registered_method *rm;
   shutdown_tag *sdt;
   channel_broadcaster broadcaster;
 
   /* lock, and gather up some stuff to do */
   gpr_mu_lock(&server->mu_global);
-  grpc_cq_begin_op(cq, NULL);
+  grpc_cq_begin_op(cq);
   server->shutdown_tags =
       gpr_realloc(server->shutdown_tags,
                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -934,23 +922,15 @@
 
   /* collect all unregistered then registered calls */
   gpr_mu_lock(&server->mu_call);
-  requested_calls = server->requested_calls;
-  memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+  requests = server->requests;
+  server->requests = NULL;
   for (rm = server->registered_methods; rm; rm = rm->next) {
-    if (requested_calls.count + rm->requested.count >
-        requested_calls.capacity) {
-      requested_calls.capacity =
-          GPR_MAX(requested_calls.count + rm->requested.count,
-                  2 * requested_calls.capacity);
-      requested_calls.calls =
-          gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
-                                                 requested_calls.capacity);
+    while (rm->requests != NULL) {
+      requested_call *c = rm->requests;
+      rm->requests = c->next;
+      c->next = requests;
+      requests = c;
     }
-    memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
-           sizeof(*requested_calls.calls) * rm->requested.count);
-    requested_calls.count += rm->requested.count;
-    gpr_free(rm->requested.calls);
-    memset(&rm->requested, 0, sizeof(rm->requested));
   }
   gpr_mu_unlock(&server->mu_call);
 
@@ -959,10 +939,11 @@
   gpr_mu_unlock(&server->mu_global);
 
   /* terminate all the requested calls */
-  for (i = 0; i < requested_calls.count; i++) {
-    fail_call(server, &requested_calls.calls[i]);
+  while (requests != NULL) {
+    requested_call *next = requests->next;
+    fail_call(server, requests);
+    requests = next;
   }
-  gpr_free(requested_calls.calls);
 
   /* Shutdown listeners */
   for (l = server->listeners; l; l = l->next) {
@@ -1024,7 +1005,7 @@
 static grpc_call_error queue_call_request(grpc_server *server,
                                           requested_call *rc) {
   call_data *calld = NULL;
-  requested_call_array *requested_calls = NULL;
+  requested_call **requests;
   gpr_mu_lock(&server->mu_call);
   if (server->shutdown) {
     gpr_mu_unlock(&server->mu_call);
@@ -1035,12 +1016,12 @@
     case BATCH_CALL:
       calld =
           call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
-      requested_calls = &server->requested_calls;
+      requests = &server->requests;
       break;
     case REGISTERED_CALL:
       calld = call_list_remove_head(
           &rc->data.registered.registered_method->pending, PENDING_START);
-      requested_calls = &rc->data.registered.registered_method->requested;
+      requests = &rc->data.registered.registered_method->requests;
       break;
   }
   if (calld) {
@@ -1050,7 +1031,8 @@
     begin_call(server, calld, rc);
     return GRPC_CALL_OK;
   } else {
-    *requested_call_array_add(requested_calls) = *rc;
+    rc->next = *requests;
+    *requests = rc;
     gpr_mu_unlock(&server->mu_call);
     return GRPC_CALL_OK;
   }
@@ -1061,22 +1043,23 @@
     grpc_metadata_array *initial_metadata,
     grpc_completion_queue *cq_bound_to_call,
     grpc_completion_queue *cq_for_notification, void *tag) {
-  requested_call rc;
+  requested_call *rc = gpr_malloc(sizeof(*rc));
   GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
                                initial_metadata, cq_bound_to_call,
                                cq_for_notification, tag);
   if (!grpc_cq_is_server_cq(cq_for_notification)) {
+    gpr_free(rc);
     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
   }
-  grpc_cq_begin_op(cq_for_notification, NULL);
-  rc.type = BATCH_CALL;
-  rc.tag = tag;
-  rc.cq_bound_to_call = cq_bound_to_call;
-  rc.cq_for_notification = cq_for_notification;
-  rc.call = call;
-  rc.data.batch.details = details;
-  rc.data.batch.initial_metadata = initial_metadata;
-  return queue_call_request(server, &rc);
+  grpc_cq_begin_op(cq_for_notification);
+  rc->type = BATCH_CALL;
+  rc->tag = tag;
+  rc->cq_bound_to_call = cq_bound_to_call;
+  rc->cq_for_notification = cq_for_notification;
+  rc->call = call;
+  rc->data.batch.details = details;
+  rc->data.batch.initial_metadata = initial_metadata;
+  return queue_call_request(server, rc);
 }
 
 grpc_call_error grpc_server_request_registered_call(
@@ -1084,22 +1067,23 @@
     grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
     grpc_completion_queue *cq_bound_to_call,
     grpc_completion_queue *cq_for_notification, void *tag) {
-  requested_call rc;
+  requested_call *rc = gpr_malloc(sizeof(*rc));
   registered_method *registered_method = rm;
   if (!grpc_cq_is_server_cq(cq_for_notification)) {
+    gpr_free(rc);
     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
   }
-  grpc_cq_begin_op(cq_for_notification, NULL);
-  rc.type = REGISTERED_CALL;
-  rc.tag = tag;
-  rc.cq_bound_to_call = cq_bound_to_call;
-  rc.cq_for_notification = cq_for_notification;
-  rc.call = call;
-  rc.data.registered.registered_method = registered_method;
-  rc.data.registered.deadline = deadline;
-  rc.data.registered.initial_metadata = initial_metadata;
-  rc.data.registered.optional_payload = optional_payload;
-  return queue_call_request(server, &rc);
+  grpc_cq_begin_op(cq_for_notification);
+  rc->type = REGISTERED_CALL;
+  rc->tag = tag;
+  rc->cq_bound_to_call = cq_bound_to_call;
+  rc->cq_for_notification = cq_for_notification;
+  rc->call = call;
+  rc->data.registered.registered_method = registered_method;
+  rc->data.registered.deadline = deadline;
+  rc->data.registered.initial_metadata = initial_metadata;
+  rc->data.registered.optional_payload = optional_payload;
+  return queue_call_request(server, rc);
 }
 
 static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1167,7 +1151,11 @@
 
   GRPC_CALL_INTERNAL_REF(calld->call, "server");
   grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
-                                      rc->tag);
+                                      rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+  gpr_free(req);
 }
 
 static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1180,15 +1168,16 @@
       rc->data.registered.initial_metadata->count = 0;
       break;
   }
-  grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+  grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion);
 }
 
 static void publish_registered_or_batch(grpc_call *call, int success,
-                                        void *tag) {
+                                        void *prc) {
   grpc_call_element *elem =
       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+  requested_call *rc = prc;
   call_data *calld = elem->call_data;
-  grpc_cq_end_op(calld->cq_new, tag, call, success);
+  grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion);
 }
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {