Fix head-of-line blocking in server
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 2829a86..37cc2bd 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -95,7 +95,6 @@
       grpc_byte_buffer **optional_payload;
     } registered;
   } data;
-  grpc_closure publish;
 } requested_call;
 
 typedef struct channel_registered_method {
@@ -156,15 +155,21 @@
   bool recv_idempotent_request;
   grpc_metadata_array initial_metadata;
 
+  request_matcher *request_matcher;
+  grpc_byte_buffer *payload;
+
   grpc_closure got_initial_metadata;
   grpc_closure server_on_recv_initial_metadata;
   grpc_closure kill_zombie_closure;
   grpc_closure *on_done_recv_initial_metadata;
 
+  grpc_closure publish;
+
   call_data *pending_next;
 };
 
 struct request_matcher {
+  grpc_server *server;
   call_data *pending_head;
   call_data *pending_tail;
   gpr_stack_lockfree *requests;
@@ -227,8 +232,7 @@
 #define SERVER_FROM_CALL_ELEM(elem) \
   (((channel_data *)(elem)->channel_data)->server)
 
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                       call_data *calld, requested_call *rc);
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
                       requested_call *rc);
 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
@@ -304,8 +308,10 @@
  * request_matcher
  */
 
-static void request_matcher_init(request_matcher *rm, size_t entries) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+                                 grpc_server *server) {
   memset(rm, 0, sizeof(*rm));
+  rm->server = server;
   rm->requests = gpr_stack_lockfree_create(entries);
 }
 
@@ -418,9 +424,6 @@
                        &op);
 }
 
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
-                                        void *user_data, bool success);
-
 static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
   gpr_slice slice = value->slice;
   size_t len = GPR_SLICE_LENGTH(slice);
@@ -432,22 +435,28 @@
   memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
 }
 
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                       call_data *calld, requested_call *rc) {
-  grpc_op ops[1];
-  grpc_op *op = ops;
+static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
+                               grpc_cq_completion *c) {
+  requested_call *rc = req;
+  grpc_server *server = rc->server;
 
-  memset(ops, 0, sizeof(ops));
+  if (rc >= server->requested_calls &&
+      rc < server->requested_calls + server->max_requested_calls) {
+    GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
+    gpr_stack_lockfree_push(server->request_freelist,
+                            (int)(rc - server->requested_calls));
+  } else {
+    gpr_free(req);
+  }
 
-  /* called once initial metadata has been read by the call, but BEFORE
-     the ioreq to fetch it out of the call has been executed.
-     This means metadata related fields can be relied on in calld, but to
-     fill in the metadata array passed by the client, we need to perform
-     an ioreq op, that should complete immediately. */
+  server_unref(exec_ctx, server);
+}
 
+static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
+                         call_data *calld, requested_call *rc) {
   grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
-  grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
-  *rc->call = calld->call;
+  grpc_call *call = calld->call;
+  *rc->call = call;
   calld->cq_new = rc->cq_for_notification;
   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
   switch (rc->type) {
@@ -467,35 +476,38 @@
     case REGISTERED_CALL:
       *rc->data.registered.deadline = calld->deadline;
       if (rc->data.registered.optional_payload) {
-        op->op = GRPC_OP_RECV_MESSAGE;
-        op->data.recv_message = rc->data.registered.optional_payload;
-        op++;
+        *rc->data.registered.optional_payload = calld->payload;
       }
       break;
     default:
       GPR_UNREACHABLE_CODE(return );
   }
 
-  GRPC_CALL_INTERNAL_REF(calld->call, "server");
-  grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
-                                    (size_t)(op - ops), &rc->publish);
+  grpc_call_element *elem =
+      grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+  channel_data *chand = elem->channel_data;
+  server_ref(chand->server);
+  grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc,
+                 &rc->completion);
 }
 
-static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
-                                 grpc_call_element *elem, request_matcher *rm) {
-  call_data *calld = elem->call_data;
-  int request_id;
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+  call_data *calld = arg;
+  request_matcher *rm = calld->request_matcher;
+  grpc_server *server = rm->server;
 
-  if (gpr_atm_acq_load(&server->shutdown_flag)) {
+  if (!success || gpr_atm_acq_load(&server->shutdown_flag)) {
     gpr_mu_lock(&calld->mu_state);
     calld->state = ZOMBIED;
     gpr_mu_unlock(&calld->mu_state);
-    grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+    grpc_closure_init(
+        &calld->kill_zombie_closure, kill_zombie,
+        grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
     grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
     return;
   }
 
-  request_id = gpr_stack_lockfree_pop(rm->requests);
+  int request_id = gpr_stack_lockfree_pop(rm->requests);
   if (request_id == -1) {
     gpr_mu_lock(&server->mu_call);
     gpr_mu_lock(&calld->mu_state);
@@ -513,7 +525,41 @@
     gpr_mu_lock(&calld->mu_state);
     calld->state = ACTIVATED;
     gpr_mu_unlock(&calld->mu_state);
-    begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+    publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+  }
+}
+
+static void finish_start_new_rpc(
+    grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
+    request_matcher *rm,
+    grpc_server_register_method_payload_handling payload_handling) {
+  call_data *calld = elem->call_data;
+
+  if (gpr_atm_acq_load(&server->shutdown_flag)) {
+    gpr_mu_lock(&calld->mu_state);
+    calld->state = ZOMBIED;
+    gpr_mu_unlock(&calld->mu_state);
+    grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+    grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
+    return;
+  }
+
+  calld->request_matcher = rm;
+
+  switch (payload_handling) {
+    case GRPC_SRM_PAYLOAD_NONE:
+      publish_new_rpc(exec_ctx, calld, true);
+      break;
+    case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
+      grpc_op op;
+      memset(&op, 0, sizeof(op));
+      op.op = GRPC_OP_RECV_MESSAGE;
+      op.data.recv_message = &calld->payload;
+      grpc_closure_init(&calld->publish, publish_new_rpc, calld);
+      grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
+                                        &calld->publish);
+      break;
+    }
   }
 }
 
@@ -539,7 +585,8 @@
           !calld->recv_idempotent_request)
         continue;
       finish_start_new_rpc(exec_ctx, server, elem,
-                           &rm->server_registered_method->request_matcher);
+                           &rm->server_registered_method->request_matcher,
+                           rm->server_registered_method->payload_handling);
       return;
     }
     /* check for a wildcard method definition (no host set) */
@@ -554,12 +601,14 @@
           !calld->recv_idempotent_request)
         continue;
       finish_start_new_rpc(exec_ctx, server, elem,
-                           &rm->server_registered_method->request_matcher);
+                           &rm->server_registered_method->request_matcher,
+                           rm->server_registered_method->payload_handling);
       return;
     }
   }
   finish_start_new_rpc(exec_ctx, server, elem,
-                       &server->unregistered_request_matcher);
+                       &server->unregistered_request_matcher,
+                       GRPC_SRM_PAYLOAD_NONE);
 }
 
 static int num_listeners(grpc_server *server) {
@@ -888,7 +937,7 @@
     gpr_stack_lockfree_push(server->request_freelist, (int)i);
   }
   request_matcher_init(&server->unregistered_request_matcher,
-                       server->max_requested_calls);
+                       server->max_requested_calls, server);
   server->requested_calls = gpr_malloc(server->max_requested_calls *
                                        sizeof(*server->requested_calls));
 
@@ -932,7 +981,8 @@
   }
   m = gpr_malloc(sizeof(registered_method));
   memset(m, 0, sizeof(*m));
-  request_matcher_init(&m->request_matcher, server->max_requested_calls);
+  request_matcher_init(&m->request_matcher, server->max_requested_calls,
+                       server);
   m->method = gpr_strdup(method);
   m->host = gpr_strdup(host);
   m->next = server->registered_methods;
@@ -1210,8 +1260,8 @@
         GPR_ASSERT(calld->state == PENDING);
         calld->state = ACTIVATED;
         gpr_mu_unlock(&calld->mu_state);
-        begin_call(exec_ctx, server, calld,
-                   &server->requested_calls[request_id]);
+        publish_call(exec_ctx, server, calld,
+                     &server->requested_calls[request_id]);
       }
       gpr_mu_lock(&server->mu_call);
     }
@@ -1299,23 +1349,6 @@
   return error;
 }
 
-static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
-                               grpc_cq_completion *c) {
-  requested_call *rc = req;
-  grpc_server *server = rc->server;
-
-  if (rc >= server->requested_calls &&
-      rc < server->requested_calls + server->max_requested_calls) {
-    GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
-    gpr_stack_lockfree_push(server->request_freelist,
-                            (int)(rc - server->requested_calls));
-  } else {
-    gpr_free(req);
-  }
-
-  server_unref(exec_ctx, server);
-}
-
 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
                       requested_call *rc) {
   *rc->call = NULL;
@@ -1326,20 +1359,6 @@
                  done_request_event, rc, &rc->completion);
 }
 
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
-                                        bool success) {
-  requested_call *rc = prc;
-  grpc_call *call = *rc->call;
-  grpc_call_element *elem =
-      grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  server_ref(chand->server);
-  grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
-                 rc, &rc->completion);
-  GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
-}
-
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
   return server->channel_args;
 }