Beginning transport work
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929..82d1323 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@
   grpc_call *call;
 
   call_state state;
-  gpr_timespec deadline;
   grpc_mdstr *path;
   grpc_mdstr *host;
+  gpr_timespec deadline;
+  int got_initial_metadata;
 
   legacy_data *legacy;
   grpc_completion_queue *cq_new;
 
+  grpc_stream_op_buffer *recv_ops;
+  grpc_stream_state *recv_state;
+  void (*on_done_recv)(void *user_data, int success);
+  void *recv_user_data;
+
   call_data **root[CALL_LIST_COUNT];
   call_link links[CALL_LIST_COUNT];
 };
@@ -371,46 +377,6 @@
   grpc_call_destroy(grpc_call_from_top_element(elem));
 }
 
-static void stream_closed(grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  gpr_mu_lock(&chand->server->mu);
-  switch (calld->state) {
-    case ACTIVATED:
-      break;
-    case PENDING:
-      call_list_remove(calld, PENDING_START);
-    /* fallthrough intended */
-    case NOT_STARTED:
-      calld->state = ZOMBIED;
-      grpc_iomgr_add_callback(kill_zombie, elem);
-      break;
-    case ZOMBIED:
-      break;
-  }
-  gpr_mu_unlock(&chand->server->mu);
-  grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  gpr_mu_lock(&chand->server->mu);
-  switch (calld->state) {
-    case ACTIVATED:
-    case PENDING:
-      grpc_call_read_closed(elem);
-      break;
-    case NOT_STARTED:
-      calld->state = ZOMBIED;
-      grpc_iomgr_add_callback(kill_zombie, elem);
-      break;
-    case ZOMBIED:
-      break;
-  }
-  gpr_mu_unlock(&chand->server->mu);
-}
-
 static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
   grpc_call_element *elem = user_data;
   channel_data *chand = elem->channel_data;
@@ -425,33 +391,69 @@
   return md;
 }
 
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
-                    grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+  grpc_call_element *elem = ptr;
   call_data *calld = elem->call_data;
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-  switch (op->type) {
-    case GRPC_RECV_METADATA:
+  channel_data *chand = elem->channel_data;
+
+  if (success && !calld->got_initial_metadata) {
+    size_t i;
+    size_t nops = calld->recv_ops->nops;
+    grpc_stream_op *ops = calld->recv_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
       grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
-      if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+      if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
         calld->deadline = op->data.metadata.deadline;
-        start_new_rpc(elem);
       }
+      calld->got_initial_metadata = 1;
+      start_new_rpc(elem);
       break;
-    case GRPC_RECV_MESSAGE:
-      grpc_call_recv_message(elem, op->data.message);
-      op->done_cb(op->user_data, GRPC_OP_OK);
+    }
+  }
+
+  switch (*calld->recv_state) {
+    case GRPC_STREAM_OPEN: break;
+    case GRPC_STREAM_SEND_CLOSED: break;
+    case GRPC_STREAM_RECV_CLOSED:
+      gpr_mu_lock(&chand->server->mu);
+      if (calld->state == NOT_STARTED) {
+        calld->state = ZOMBIED;
+        grpc_iomgr_add_callback(kill_zombie, elem);
+      }
+      gpr_mu_unlock(&chand->server->mu);
       break;
-    case GRPC_RECV_HALF_CLOSE:
-      read_closed(elem);
-      break;
-    case GRPC_RECV_FINISH:
-      stream_closed(elem);
-      break;
-    default:
-      GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
-      grpc_call_next_op(elem, op);
+    case GRPC_STREAM_CLOSED:
+      gpr_mu_lock(&chand->server->mu);
+      if (calld->state == NOT_STARTED) {
+        calld->state = ZOMBIED;
+        grpc_iomgr_add_callback(kill_zombie, elem);
+      } else if (calld->state == PENDING) {
+        call_list_remove(calld, PENDING_START);
+      }
+      gpr_mu_unlock(&chand->server->mu);
       break;
   }
+
+  calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+  call_data *calld = elem->call_data;
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+  if (op->recv_ops) {
+    /* substitute our callback for the higher callback */
+    calld->recv_ops = op->recv_ops;
+    calld->recv_state = op->recv_state;
+    calld->on_done_recv = op->on_done_recv;
+    calld->recv_user_data = op->recv_user_data;
+    op->on_done_recv = server_on_recv;
+    op->recv_user_data = elem;
+  }
+
+  grpc_call_next_op(elem, op);
 }
 
 static void channel_op(grpc_channel_element *elem,
@@ -592,7 +594,7 @@
 }
 
 static const grpc_channel_filter server_surface_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+    server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
     sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
 };