Merge github.com:google/grpc into async-api
diff --git a/Makefile b/Makefile
index f5b41de..5607edc 100644
--- a/Makefile
+++ b/Makefile
@@ -189,11 +189,13 @@
 ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
 PERFTOOLS_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/perftools.c -lprofiler $(LDFLAGS)
 
+ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
 HAS_SYSTEM_PERFTOOLS = $(shell $(PERFTOOLS_CHECK_CMD) 2> /dev/null && echo true || echo false)
 ifeq ($(HAS_SYSTEM_PERFTOOLS),true)
 DEFINES += GRPC_HAVE_PERFTOOLS
 LIBS += profiler
 endif
+endif
 
 ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
 HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)
@@ -1453,6 +1455,7 @@
     src/core/statistics/hash_table.c \
     src/core/statistics/window_stats.c \
     src/core/surface/byte_buffer.c \
+    src/core/surface/byte_buffer_queue.c \
     src/core/surface/byte_buffer_reader.c \
     src/core/surface/call.c \
     src/core/surface/channel.c \
@@ -1579,6 +1582,7 @@
 src/core/statistics/hash_table.c: $(OPENSSL_DEP)
 src/core/statistics/window_stats.c: $(OPENSSL_DEP)
 src/core/surface/byte_buffer.c: $(OPENSSL_DEP)
+src/core/surface/byte_buffer_queue.c: $(OPENSSL_DEP)
 src/core/surface/byte_buffer_reader.c: $(OPENSSL_DEP)
 src/core/surface/call.c: $(OPENSSL_DEP)
 src/core/surface/channel.c: $(OPENSSL_DEP)
@@ -1727,6 +1731,7 @@
 objs/$(CONFIG)/src/core/statistics/hash_table.o: 
 objs/$(CONFIG)/src/core/statistics/window_stats.o: 
 objs/$(CONFIG)/src/core/surface/byte_buffer.o: 
+objs/$(CONFIG)/src/core/surface/byte_buffer_queue.o: 
 objs/$(CONFIG)/src/core/surface/byte_buffer_reader.o: 
 objs/$(CONFIG)/src/core/surface/call.o: 
 objs/$(CONFIG)/src/core/surface/channel.o: 
@@ -1894,6 +1899,7 @@
     src/core/statistics/hash_table.c \
     src/core/statistics/window_stats.c \
     src/core/surface/byte_buffer.c \
+    src/core/surface/byte_buffer_queue.c \
     src/core/surface/byte_buffer_reader.c \
     src/core/surface/call.c \
     src/core/surface/channel.c \
@@ -2025,6 +2031,7 @@
 objs/$(CONFIG)/src/core/statistics/hash_table.o: 
 objs/$(CONFIG)/src/core/statistics/window_stats.o: 
 objs/$(CONFIG)/src/core/surface/byte_buffer.o: 
+objs/$(CONFIG)/src/core/surface/byte_buffer_queue.o: 
 objs/$(CONFIG)/src/core/surface/byte_buffer_reader.o: 
 objs/$(CONFIG)/src/core/surface/call.o: 
 objs/$(CONFIG)/src/core/surface/channel.o: 
diff --git a/build.json b/build.json
index 478fa78..66d82b8 100644
--- a/build.json
+++ b/build.json
@@ -73,6 +73,7 @@
         "src/core/statistics/census_tracing.h",
         "src/core/statistics/hash_table.h",
         "src/core/statistics/window_stats.h",
+        "src/core/surface/byte_buffer_queue.h",
         "src/core/surface/call.h",
         "src/core/surface/channel.h",
         "src/core/surface/client.h",
@@ -159,6 +160,7 @@
         "src/core/statistics/hash_table.c",
         "src/core/statistics/window_stats.c",
         "src/core/surface/byte_buffer.c",
+        "src/core/surface/byte_buffer_queue.c",
         "src/core/surface/byte_buffer_reader.c",
         "src/core/surface/call.c",
         "src/core/surface/channel.c",
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index a9fae0d..c18e47e 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -183,17 +183,16 @@
 } grpc_metadata;
 
 typedef enum grpc_completion_type {
-  GRPC_QUEUE_SHUTDOWN,  /* Shutting down */
-  GRPC_READ,            /* A read has completed */
-  GRPC_INVOKE_ACCEPTED, /* An invoke call has been accepted by flow
-                           control */
-  GRPC_WRITE_ACCEPTED, /* A write has been accepted by
-                          flow control */
+  GRPC_QUEUE_SHUTDOWN,       /* Shutting down */
+  GRPC_IOREQ,                /* grpc_call_ioreq completion */
+  GRPC_READ,                 /* A read has completed */
+  GRPC_WRITE_ACCEPTED,       /* A write has been accepted by
+                                flow control */
   GRPC_FINISH_ACCEPTED,      /* writes_done or write_status has been accepted */
   GRPC_CLIENT_METADATA_READ, /* The metadata array sent by server received at
                                 client */
-  GRPC_FINISHED, /* An RPC has finished. The event contains status.
-                    On the server this will be OK or Cancelled. */
+  GRPC_FINISHED,             /* An RPC has finished. The event contains status.
+                                On the server this will be OK or Cancelled. */
   GRPC_SERVER_RPC_NEW,       /* A new RPC has arrived at the server */
   GRPC_SERVER_SHUTDOWN,      /* The server has finished shutting down */
   GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include
@@ -213,6 +212,7 @@
     grpc_op_error write_accepted;
     grpc_op_error finish_accepted;
     grpc_op_error invoke_accepted;
+    grpc_op_error ioreq;
     struct {
       size_t count;
       grpc_metadata *elements;
@@ -233,6 +233,57 @@
   } data;
 } grpc_event;
 
+typedef struct {
+  size_t count;
+  size_t capacity;
+  grpc_metadata *metadata;
+} grpc_metadata_array;
+
+typedef struct {
+  const char *method;
+  const char *host;
+  gpr_timespec deadline;
+} grpc_call_details;
+
+typedef enum {
+  GRPC_OP_SEND_INITIAL_METADATA = 0,
+  GRPC_OP_SEND_MESSAGE,
+  GRPC_OP_SEND_CLOSE_FROM_CLIENT,
+  GRPC_OP_SEND_STATUS_FROM_SERVER,
+  GRPC_OP_RECV_INITIAL_METADATA,
+  GRPC_OP_RECV_MESSAGES,
+  GRPC_OP_RECV_STATUS_ON_CLIENT,
+  GRPC_OP_RECV_CLOSE_ON_SERVER
+} grpc_op_type;
+
+typedef struct grpc_op {
+  grpc_op_type op;
+  union {
+    struct {
+      size_t count;
+      const grpc_metadata *metadata;
+    } send_initial_metadata;
+    grpc_byte_buffer *send_message;
+    struct {
+      size_t trailing_metadata_count;
+      grpc_metadata *trailing_metadata;
+      grpc_status_code status;
+      const char *status_details;
+    } send_status_from_server;
+    grpc_metadata_array *recv_initial_metadata;
+    grpc_byte_buffer **recv_message;
+    struct {
+      grpc_metadata_array *trailing_metadata;
+      grpc_status_code *status;
+      char **status_details;
+      size_t *status_details_capacity;
+    } recv_status_on_client;
+    struct {
+      int *cancelled;
+    } recv_close_on_server;
+  } data;
+} grpc_op;
+
 /* Initialize the grpc library */
 void grpc_init(void);
 
@@ -279,6 +330,9 @@
                                         const char *method, const char *host,
                                         gpr_timespec deadline);
 
+grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
+                                      size_t nops, void *tag);
+
 /* Create a client channel */
 grpc_channel *grpc_channel_create(const char *target,
                                   const grpc_channel_args *args);
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index e28bbd7..d9e722c 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -210,6 +210,7 @@
   metadata_op.dir = GRPC_CALL_UP;
   metadata_op.done_cb = do_nothing;
   metadata_op.user_data = NULL;
+  metadata_op.flags = 0;
   metadata_op.data.metadata = mdelem;
   grpc_call_next_op(cur_elem, &metadata_op);
 }
@@ -221,6 +222,7 @@
   metadata_op.dir = GRPC_CALL_DOWN;
   metadata_op.done_cb = do_nothing;
   metadata_op.user_data = NULL;
+  metadata_op.flags = 0;
   metadata_op.data.metadata = mdelem;
   grpc_call_next_op(cur_elem, &metadata_op);
 }
@@ -231,14 +233,16 @@
   cancel_op.dir = GRPC_CALL_DOWN;
   cancel_op.done_cb = do_nothing;
   cancel_op.user_data = NULL;
+  cancel_op.flags = 0;
   grpc_call_next_op(cur_elem, &cancel_op);
 }
 
 void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
-  grpc_call_op cancel_op;
-  cancel_op.type = GRPC_SEND_FINISH;
-  cancel_op.dir = GRPC_CALL_DOWN;
-  cancel_op.done_cb = do_nothing;
-  cancel_op.user_data = NULL;
-  grpc_call_next_op(cur_elem, &cancel_op);
+  grpc_call_op finish_op;
+  finish_op.type = GRPC_SEND_FINISH;
+  finish_op.dir = GRPC_CALL_DOWN;
+  finish_op.done_cb = do_nothing;
+  finish_op.user_data = NULL;
+  finish_op.flags = 0;
+  grpc_call_next_op(cur_elem, &finish_op);
 }
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index d35cede..61a6caf 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -298,10 +298,6 @@
 
 static void do_nothing(void *calldata, grpc_op_error error) {}
 
-static void done_message(void *user_data, grpc_op_error error) {
-  grpc_byte_buffer_destroy(user_data);
-}
-
 static void finish_message(channel_data *chand, call_data *calld) {
   grpc_call_element *elem = calld->elem;
   grpc_call_op call_op;
@@ -309,9 +305,9 @@
   call_op.flags = 0;
   /* if we got all the bytes for this message, call up the stack */
   call_op.type = GRPC_RECV_MESSAGE;
-  call_op.done_cb = done_message;
+  call_op.done_cb = do_nothing;
   /* TODO(ctiller): this could be a lot faster if coded directly */
-  call_op.user_data = call_op.data.message = grpc_byte_buffer_create(
+  call_op.data.message = grpc_byte_buffer_create(
       calld->incoming_message.slices, calld->incoming_message.count);
   gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
 
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 994dbe4..b1c2c64 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -80,9 +80,7 @@
   }
 }
 
-void grpc_pollset_force_kick(grpc_pollset *p) {
-  grpc_pollset_kick_kick(&p->kick_state);
-}
+void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); }
 
 /* global state management */
 
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
new file mode 100644
index 0000000..dc280a6
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.c
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/byte_buffer_queue.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); }
+
+/* Append an operation to an array, expanding as needed */
+static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
+  if (a->count == a->capacity) {
+    a->capacity = GPR_MAX(a->capacity * 2, 8);
+    a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer *) * a->capacity);
+  }
+  a->data[a->count++] = buffer;
+}
+
+void grpc_bbq_destroy(grpc_byte_buffer_queue *q) {
+  bba_destroy(&q->filling);
+  bba_destroy(&q->draining);
+}
+
+int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
+  return (q->drain_pos == q->draining.count && q->filling.count == 0);
+}
+
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+  bba_push(&q->filling, buffer);
+}
+
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
+  grpc_bbq_array temp_array;
+
+  if (q->drain_pos == q->draining.count) {
+    if (q->filling.count == 0) {
+      return NULL;
+    }
+    q->draining.count = 0;
+    q->drain_pos = 0;
+    /* swap arrays */
+    temp_array = q->filling;
+    q->filling = q->draining;
+    q->draining = temp_array;
+  }
+
+  return q->draining.data[q->drain_pos++];
+}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
new file mode 100644
index 0000000..358a42d
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+
+#include <grpc/byte_buffer.h>
+
+/* TODO(ctiller): inline an element or two into this struct to avoid per-call
+                  allocations */
+typedef struct {
+  grpc_byte_buffer **data;
+  size_t count;
+  size_t capacity;
+} grpc_bbq_array;
+
+/* should be initialized by zeroing memory */
+typedef struct {
+  size_t drain_pos;
+  grpc_bbq_array filling;
+  grpc_bbq_array draining;
+} grpc_byte_buffer_queue;
+
+void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
+int grpc_bbq_empty(grpc_byte_buffer_queue *q);
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+
+#endif  /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5a24264..f3b7636 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -36,6 +36,7 @@
 #include "src/core/channel/metadata_buffer.h"
 #include "src/core/iomgr/alarm.h"
 #include "src/core/support/string.h"
+#include "src/core/surface/byte_buffer_queue.h"
 #include "src/core/surface/channel.h"
 #include "src/core/surface/completion_queue.h"
 #include <grpc/support/alloc.h>
@@ -45,162 +46,162 @@
 #include <stdlib.h>
 #include <string.h>
 
-#define INVALID_TAG ((void *)0xdeadbeef)
+typedef struct legacy_state legacy_state;
+static void destroy_legacy_state(legacy_state *ls);
 
-/* Pending read queue
+typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
 
-   This data structure tracks reads that need to be presented to the completion
-   queue but are waiting for the application to ask for them. */
-
-#define INITIAL_PENDING_READ_COUNT 4
-
-typedef struct {
-  grpc_byte_buffer *byte_buffer;
-  void *user_data;
-  void (*on_finish)(void *user_data, grpc_op_error error);
-} pending_read;
-
-/* TODO(ctiller): inline an element or two into this struct to avoid per-call
-                  allocations */
-typedef struct {
-  pending_read *data;
-  size_t count;
-  size_t capacity;
-} pending_read_array;
-
-typedef struct {
-  size_t drain_pos;
-  pending_read_array filling;
-  pending_read_array draining;
-} pending_read_queue;
-
-static void pra_init(pending_read_array *array) {
-  array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT);
-  array->count = 0;
-  array->capacity = INITIAL_PENDING_READ_COUNT;
-}
-
-static void pra_destroy(pending_read_array *array,
-                        size_t finish_starting_from) {
-  size_t i;
-  for (i = finish_starting_from; i < array->count; i++) {
-    array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR);
-  }
-  gpr_free(array->data);
-}
-
-/* Append an operation to an array, expanding as needed */
-static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer,
-                     void (*on_finish)(void *user_data, grpc_op_error error),
-                     void *user_data) {
-  if (a->count == a->capacity) {
-    a->capacity *= 2;
-    a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity);
-  }
-  a->data[a->count].byte_buffer = buffer;
-  a->data[a->count].user_data = user_data;
-  a->data[a->count].on_finish = on_finish;
-  a->count++;
-}
-
-static void prq_init(pending_read_queue *q) {
-  q->drain_pos = 0;
-  pra_init(&q->filling);
-  pra_init(&q->draining);
-}
-
-static void prq_destroy(pending_read_queue *q) {
-  pra_destroy(&q->filling, 0);
-  pra_destroy(&q->draining, q->drain_pos);
-}
-
-static int prq_is_empty(pending_read_queue *q) {
-  return (q->drain_pos == q->draining.count && q->filling.count == 0);
-}
-
-static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer,
-                     void (*on_finish)(void *user_data, grpc_op_error error),
-                     void *user_data) {
-  pra_push(&q->filling, buffer, on_finish, user_data);
-}
-
-/* Take the first queue element and move it to the completion queue. Do nothing
-   if q is empty */
-static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
-                         grpc_completion_queue *cq) {
-  pending_read_array temp_array;
-  pending_read *pr;
-
-  if (q->drain_pos == q->draining.count) {
-    if (q->filling.count == 0) {
-      return 0;
-    }
-    q->draining.count = 0;
-    q->drain_pos = 0;
-    /* swap arrays */
-    temp_array = q->filling;
-    q->filling = q->draining;
-    q->draining = temp_array;
-  }
-
-  pr = q->draining.data + q->drain_pos;
-  q->drain_pos++;
-  grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data,
-                   pr->byte_buffer);
-  return 1;
-}
-
-/* grpc_call proper */
-
-/* the state of a call, based upon which functions have been called against
-   said call */
 typedef enum {
-  CALL_CREATED,
-  CALL_BOUNDCQ,
-  CALL_STARTED,
-  CALL_FINISHED
-} call_state;
+  SEND_NOTHING,
+  SEND_INITIAL_METADATA,
+  SEND_MESSAGE,
+  SEND_TRAILING_METADATA_AND_FINISH,
+  SEND_FINISH
+} send_action;
+
+typedef struct {
+  grpc_ioreq_completion_func on_complete;
+  void *user_data;
+  grpc_op_error status;
+} completed_request;
+
+/* See request_set in grpc_call below for a description */
+#define REQSET_EMPTY 255
+#define REQSET_DONE 254
+
+typedef struct {
+  /* Overall status of the operation: starts OK, may degrade to
+     non-OK */
+  grpc_op_error status;
+  /* Completion function to call at the end of the operation */
+  grpc_ioreq_completion_func on_complete;
+  void *user_data;
+  /* a bit mask of which request ops are needed (1u << opid) */
+  gpr_uint32 need_mask;
+  /* a bit mask of which request ops are now completed */
+  gpr_uint32 complete_mask;
+} reqinfo_master;
+
+/* Status data for a request can come from several sources; this
+   enumerates them all, and acts as a priority sorting for which
+   status to return to the application - earlier entries override
+   later ones */
+typedef enum {
+  /* Status came from the application layer overriding whatever
+     the wire says */
+  STATUS_FROM_API_OVERRIDE = 0,
+  /* Status came from 'the wire' - or somewhere below the surface
+     layer */
+  STATUS_FROM_WIRE,
+  STATUS_SOURCE_COUNT
+} status_source;
+
+typedef struct {
+  gpr_uint8 is_set;
+  grpc_status_code code;
+  grpc_mdstr *details;
+} received_status;
+
+/* How far through the GRPC stream have we read? */
+typedef enum {
+  /* We are still waiting for initial metadata to complete */
+  READ_STATE_INITIAL = 0,
+  /* We have gotten initial metadata, and are reading either
+     messages or trailing metadata */
+  READ_STATE_GOT_INITIAL_METADATA,
+  /* The stream is closed for reading */
+  READ_STATE_READ_CLOSED,
+  /* The stream is closed for reading & writing */
+  READ_STATE_STREAM_CLOSED
+} read_state;
+
+typedef enum {
+  WRITE_STATE_INITIAL = 0,
+  WRITE_STATE_STARTED,
+  WRITE_STATE_WRITE_CLOSED
+} write_state;
 
 struct grpc_call {
   grpc_completion_queue *cq;
   grpc_channel *channel;
   grpc_mdctx *metadata_context;
+  /* TODO(ctiller): share with cq if possible? */
+  gpr_mu mu;
 
-  call_state state;
+  /* how far through the stream have we read? */
+  read_state read_state;
+  /* how far through the stream have we written? */
+  write_state write_state;
+  /* client or server call */
   gpr_uint8 is_client;
-  gpr_uint8 have_write;
-  grpc_metadata_buffer incoming_metadata;
-
-  /* protects variables in this section */
-  gpr_mu read_mu;
-  gpr_uint8 received_start;
-  gpr_uint8 start_ok;
-  gpr_uint8 reads_done;
-  gpr_uint8 received_finish;
-  gpr_uint8 received_metadata;
-  gpr_uint8 have_read;
+  /* is the alarm set */
   gpr_uint8 have_alarm;
-  gpr_uint8 pending_writes_done;
-  gpr_uint8 got_status_code;
-  /* The current outstanding read message tag (only valid if have_read == 1) */
-  void *read_tag;
-  void *metadata_tag;
-  void *finished_tag;
-  pending_read_queue prq;
+  /* are we currently performing a send operation */
+  gpr_uint8 sending;
+  /* pairs with completed_requests */
+  gpr_uint8 num_completed_requests;
+  /* flag that we need to request more data */
+  gpr_uint8 need_more_data;
 
+  /* Active ioreqs.
+     request_set and request_data contain one element per active ioreq
+     operation.
+     
+     request_set[op] is an integer specifying a set of operations to which
+     the request belongs:
+       - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending 
+         completion, and the integer represents to which group of operations
+         the ioreq belongs. Each group is represented by one master, and the
+         integer in request_set is an index into masters to find the master
+         data.
+       - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
+         started
+       - finally, if request_set[op] is REQSET_DONE, then the operation is
+         complete and unavailable to be started again
+     
+     request_data[op] is the request data as supplied by the initiator of
+     a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
+     The set fields are as per the request type specified by op.
+
+     Finally, one element of masters[op] is set per active _group_ of ioreq
+     operations. It describes work left outstanding, result status, and
+     what work to perform upon operation completion. As one ioreq of each
+     op type can be active at once, by convention we choose the first element
+     of a the group to be the master. This allows constant time allocation
+     and a strong upper bound of a count of masters to be calculated. */
+  gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
+  grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+  reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
+
+  /* Dynamic array of ioreq's that have completed: the count of
+     elements is queued in num_completed_requests.
+     This list is built up under lock(), and flushed entirely during
+     unlock().
+     We know the upper bound of the number of elements as we can only
+     have one ioreq of each type active at once. */
+  completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
+  /* Incoming buffer of messages */
+  grpc_byte_buffer_queue incoming_queue;
+  /* Buffered read metadata waiting to be returned to the application.
+     Element 0 is initial metadata, element 1 is trailing metadata. */
+  grpc_metadata_array buffered_metadata[2];
+  /* All metadata received - unreffed at once at the end of the call */
+  grpc_mdelem **owned_metadata;
+  size_t owned_metadata_count;
+  size_t owned_metadata_capacity;
+
+  /* Received call statuses from various sources */
+  received_status status[STATUS_SOURCE_COUNT];
+
+  /* Deadline alarm - if have_alarm is non-zero */
   grpc_alarm alarm;
 
-  /* The current outstanding send message/context/invoke/end tag (only valid if
-     have_write == 1) */
-  void *write_tag;
-  grpc_byte_buffer *pending_write;
-  gpr_uint32 pending_write_flags;
-
-  /* The final status of the call */
-  grpc_status_code status_code;
-  grpc_mdstr *status_details;
-
+  /* Call refcount - to keep the call alive during asynchronous operations */
   gpr_refcount internal_refcount;
+
+  /* Data that the legacy api needs to track. To be deleted at some point 
+     soon */
+  legacy_state *legacy_state;
 };
 
 #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -210,86 +211,554 @@
 #define CALL_FROM_TOP_ELEM(top_elem) \
   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
 
+#define SWAP(type, x, y) \
+  do {                   \
+    type temp = x;       \
+    x = y;               \
+    y = temp;            \
+  } while (0)
+
 static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
+static send_action choose_send_action(grpc_call *call);
+static void enact_send_action(grpc_call *call, send_action sa);
 
 grpc_call *grpc_call_create(grpc_channel *channel,
                             const void *server_transport_data) {
+  size_t i;
   grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
   grpc_call *call =
       gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
-  call->cq = NULL;
+  memset(call, 0, sizeof(grpc_call));
+  gpr_mu_init(&call->mu);
   call->channel = channel;
+  call->is_client = server_transport_data == NULL;
+  for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+    call->request_set[i] = REQSET_EMPTY;
+  }
+  if (call->is_client) {
+    call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
+    call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
+  }
   grpc_channel_internal_ref(channel);
   call->metadata_context = grpc_channel_get_metadata_context(channel);
-  call->state = CALL_CREATED;
-  call->is_client = (server_transport_data == NULL);
-  call->write_tag = INVALID_TAG;
-  call->read_tag = INVALID_TAG;
-  call->metadata_tag = INVALID_TAG;
-  call->finished_tag = INVALID_TAG;
-  call->have_read = 0;
-  call->have_write = 0;
-  call->have_alarm = 0;
-  call->received_metadata = 0;
-  call->got_status_code = 0;
-  call->start_ok = 0;
-  call->status_code =
-      server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
-  call->status_details = NULL;
-  call->received_finish = 0;
-  call->reads_done = 0;
-  call->received_start = 0;
-  call->pending_write = NULL;
-  call->pending_writes_done = 0;
-  grpc_metadata_buffer_init(&call->incoming_metadata);
-  gpr_ref_init(&call->internal_refcount, 1);
+  /* one ref is dropped in response to destroy, the other in
+     stream_closed */
+  gpr_ref_init(&call->internal_refcount, 2);
   grpc_call_stack_init(channel_stack, server_transport_data,
                        CALL_STACK_FROM_CALL(call));
-  prq_init(&call->prq);
-  gpr_mu_init(&call->read_mu);
   return call;
 }
 
 void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
 
-void grpc_call_internal_unref(grpc_call *c) {
-  if (gpr_unref(&c->internal_refcount)) {
-    grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
-    grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK);
-    if (c->status_details) {
-      grpc_mdstr_unref(c->status_details);
+static void destroy_call(void *call, int ignored_success) {
+  size_t i;
+  grpc_call *c = call;
+  grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
+  grpc_channel_internal_unref(c->channel);
+  gpr_mu_destroy(&c->mu);
+  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+    if (c->status[i].details) {
+      grpc_mdstr_unref(c->status[i].details);
     }
-    prq_destroy(&c->prq);
-    gpr_mu_destroy(&c->read_mu);
-    grpc_channel_internal_unref(c->channel);
-    gpr_free(c);
   }
+  for (i = 0; i < c->owned_metadata_count; i++) {
+    grpc_mdelem_unref(c->owned_metadata[i]);
+  }
+  gpr_free(c->owned_metadata);
+  for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
+    gpr_free(c->buffered_metadata[i].metadata);
+  }
+  if (c->legacy_state) {
+    destroy_legacy_state(c->legacy_state);
+  }
+  gpr_free(c);
+}
+
+void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
+  if (gpr_unref(&c->internal_refcount)) {
+    if (allow_immediate_deletion) {
+      destroy_call(c, 1);
+    } else {
+      grpc_iomgr_add_callback(destroy_call, c);
+    }
+  }
+}
+
+static void set_status_code(grpc_call *call, status_source source,
+                            gpr_uint32 status) {
+  call->status[source].is_set = 1;
+  call->status[source].code = status;
+}
+
+static void set_status_details(grpc_call *call, status_source source,
+                               grpc_mdstr *status) {
+  if (call->status[source].details != NULL) {
+    grpc_mdstr_unref(call->status[source].details);
+  }
+  call->status[source].details = status;
+}
+
+static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
+  if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
+  call->cq = cq;
+  return GRPC_CALL_OK;
+}
+
+static void request_more_data(grpc_call *call) {
+  grpc_call_op op;
+
+  /* call down */
+  op.type = GRPC_REQUEST_DATA;
+  op.dir = GRPC_CALL_DOWN;
+  op.flags = 0;
+  op.done_cb = do_nothing;
+  op.user_data = NULL;
+
+  grpc_call_execute_op(call, &op);
+}
+
+static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
+  gpr_uint8 set = call->request_set[op];
+  reqinfo_master *master;
+  if (set >= GRPC_IOREQ_OP_COUNT) return 0;
+  master = &call->masters[set];
+  return (master->complete_mask & (1u << op)) == 0;
+}
+
+static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+
+static void unlock(grpc_call *call) {
+  send_action sa = SEND_NOTHING;
+  completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
+  int num_completed_requests = call->num_completed_requests;
+  int need_more_data =
+      call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
+  int i;
+
+  if (need_more_data) {
+    call->need_more_data = 0;
+  }
+
+  if (num_completed_requests != 0) {
+    memcpy(completed_requests, call->completed_requests,
+           sizeof(completed_requests));
+    call->num_completed_requests = 0;
+  }
+
+  if (!call->sending) {
+    sa = choose_send_action(call);
+    if (sa != SEND_NOTHING) {
+      call->sending = 1;
+      grpc_call_internal_ref(call);
+    }
+  }
+
+  gpr_mu_unlock(&call->mu);
+
+  if (need_more_data) {
+    request_more_data(call);
+  }
+
+  if (sa != SEND_NOTHING) {
+    enact_send_action(call, sa);
+  }
+
+  for (i = 0; i < num_completed_requests; i++) {
+    completed_requests[i].on_complete(call, completed_requests[i].status,
+                                      completed_requests[i].user_data);
+  }
+}
+
+static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
+  int i;
+  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+    if (call->status[i].is_set) {
+      *args.code = call->status[i].code;
+      if (!args.details) return;
+      if (call->status[i].details) {
+        gpr_slice details = call->status[i].details->slice;
+        size_t len = GPR_SLICE_LENGTH(details);
+        if (len + 1 > *args.details_capacity) {
+          *args.details_capacity =
+              GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
+          *args.details = gpr_realloc(*args.details, *args.details_capacity);
+        }
+        memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
+        (*args.details)[len] = 0;
+      } else {
+        goto no_details;
+      }
+      return;
+    }
+  }
+  *args.code = GRPC_STATUS_UNKNOWN;
+  if (!args.details) return;
+
+no_details:
+  if (0 == *args.details_capacity) {
+    *args.details_capacity = 8;
+    *args.details = gpr_malloc(*args.details_capacity);
+  }
+  **args.details = 0;
+}
+
+static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
+                                 grpc_op_error status) {
+  completed_request *cr;
+  gpr_uint8 master_set = call->request_set[op];
+  reqinfo_master *master;
+  size_t i;
+  /* ioreq is live: we need to do something */
+  master = &call->masters[master_set];
+  master->complete_mask |= 1u << op;
+  if (status != GRPC_OP_OK) {
+    master->status = status;
+    master->complete_mask = master->need_mask;
+  }
+  if (master->complete_mask == master->need_mask) {
+    for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+      if (call->request_set[i] != master_set) {
+        continue;
+      }
+      call->request_set[i] = REQSET_DONE;
+      switch ((grpc_ioreq_op)i) {
+        case GRPC_IOREQ_RECV_MESSAGE:
+        case GRPC_IOREQ_SEND_MESSAGE:
+          if (master->status == GRPC_OP_OK) {
+            call->request_set[i] = REQSET_EMPTY;
+          } else {
+            call->write_state = WRITE_STATE_WRITE_CLOSED;
+          }
+          break;
+        case GRPC_IOREQ_RECV_CLOSE:
+        case GRPC_IOREQ_SEND_INITIAL_METADATA:
+        case GRPC_IOREQ_SEND_TRAILING_METADATA:
+        case GRPC_IOREQ_SEND_STATUS:
+        case GRPC_IOREQ_SEND_CLOSE:
+          break;
+        case GRPC_IOREQ_RECV_STATUS:
+          get_final_status(
+              call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status);
+          break;
+        case GRPC_IOREQ_RECV_INITIAL_METADATA:
+          SWAP(grpc_metadata_array, call->buffered_metadata[0],
+               *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
+                    .recv_metadata);
+          break;
+        case GRPC_IOREQ_RECV_TRAILING_METADATA:
+          SWAP(grpc_metadata_array, call->buffered_metadata[1],
+               *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
+                    .recv_metadata);
+          break;
+        case GRPC_IOREQ_OP_COUNT:
+          abort();
+          break;
+      }
+    }
+    cr = &call->completed_requests[call->num_completed_requests++];
+    cr->status = master->status;
+    cr->on_complete = master->on_complete;
+    cr->user_data = master->user_data;
+  }
+}
+
+static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
+                            grpc_op_error status) {
+  if (is_op_live(call, op)) {
+    finish_live_ioreq_op(call, op, status);
+  }
+}
+
+static void finish_send_op(grpc_call *call, grpc_ioreq_op op,
+                           grpc_op_error error) {
+  lock(call);
+  finish_ioreq_op(call, op, error);
+  call->sending = 0;
+  unlock(call);
+  grpc_call_internal_unref(call, 0);
+}
+
+static void finish_write_step(void *pc, grpc_op_error error) {
+  finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error);
+}
+
+static void finish_finish_step(void *pc, grpc_op_error error) {
+  finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error);
+}
+
+static void finish_start_step(void *pc, grpc_op_error error) {
+  finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+}
+
+static send_action choose_send_action(grpc_call *call) {
+  switch (call->write_state) {
+    case WRITE_STATE_INITIAL:
+      if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] !=
+          REQSET_EMPTY) {
+        call->write_state = WRITE_STATE_STARTED;
+        return SEND_INITIAL_METADATA;
+      }
+      return SEND_NOTHING;
+    case WRITE_STATE_STARTED:
+      if (call->request_set[GRPC_IOREQ_SEND_MESSAGE] != REQSET_EMPTY) {
+        return SEND_MESSAGE;
+      }
+      if (call->request_set[GRPC_IOREQ_SEND_CLOSE] != REQSET_EMPTY) {
+        call->write_state = WRITE_STATE_WRITE_CLOSED;
+        finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
+        finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
+        return call->is_client ? SEND_FINISH
+                               : SEND_TRAILING_METADATA_AND_FINISH;
+      }
+      return SEND_NOTHING;
+    case WRITE_STATE_WRITE_CLOSED:
+      return SEND_NOTHING;
+  }
+  gpr_log(GPR_ERROR, "should never reach here");
+  abort();
+  return SEND_NOTHING;
+}
+
+static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
+  grpc_call_op op;
+  op.type = GRPC_SEND_METADATA;
+  op.dir = GRPC_CALL_DOWN;
+  op.flags = 0;
+  op.data.metadata = elem;
+  op.done_cb = do_nothing;
+  op.user_data = NULL;
+  grpc_call_execute_op(call, &op);
+}
+
+static void enact_send_action(grpc_call *call, send_action sa) {
+  grpc_ioreq_data data;
+  grpc_call_op op;
+  size_t i;
+  char status_str[GPR_LTOA_MIN_BUFSIZE];
+
+  switch (sa) {
+    case SEND_NOTHING:
+      abort();
+      break;
+    case SEND_INITIAL_METADATA:
+      data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
+      for (i = 0; i < data.send_metadata.count; i++) {
+        const grpc_metadata *md = &data.send_metadata.metadata[i];
+        send_metadata(call,
+                      grpc_mdelem_from_string_and_buffer(
+                          call->metadata_context, md->key,
+                          (const gpr_uint8 *)md->value, md->value_length));
+      }
+      op.type = GRPC_SEND_START;
+      op.dir = GRPC_CALL_DOWN;
+      op.flags = 0;
+      op.data.start.pollset = grpc_cq_pollset(call->cq);
+      op.done_cb = finish_start_step;
+      op.user_data = call;
+      grpc_call_execute_op(call, &op);
+      break;
+    case SEND_MESSAGE:
+      data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+      op.type = GRPC_SEND_MESSAGE;
+      op.dir = GRPC_CALL_DOWN;
+      op.flags = 0;
+      op.data.message = data.send_message;
+      op.done_cb = finish_write_step;
+      op.user_data = call;
+      grpc_call_execute_op(call, &op);
+      break;
+    case SEND_TRAILING_METADATA_AND_FINISH:
+      /* send trailing metadata */
+      data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
+      for (i = 0; i < data.send_metadata.count; i++) {
+        const grpc_metadata *md = &data.send_metadata.metadata[i];
+        send_metadata(call,
+                      grpc_mdelem_from_string_and_buffer(
+                          call->metadata_context, md->key,
+                          (const gpr_uint8 *)md->value, md->value_length));
+      }
+      /* send status */
+      /* TODO(ctiller): cache common status values */
+      data = call->request_data[GRPC_IOREQ_SEND_STATUS];
+      gpr_ltoa(data.send_status.code, status_str);
+      send_metadata(
+          call,
+          grpc_mdelem_from_metadata_strings(
+              call->metadata_context,
+              grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+              grpc_mdstr_from_string(call->metadata_context, status_str)));
+      if (data.send_status.details) {
+        send_metadata(
+            call,
+            grpc_mdelem_from_metadata_strings(
+                call->metadata_context,
+                grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
+                grpc_mdstr_from_string(call->metadata_context,
+                                       data.send_status.details)));
+      }
+    /* fallthrough: see choose_send_action for details */
+    case SEND_FINISH:
+      op.type = GRPC_SEND_FINISH;
+      op.dir = GRPC_CALL_DOWN;
+      op.flags = 0;
+      op.done_cb = finish_finish_step;
+      op.user_data = call;
+      grpc_call_execute_op(call, &op);
+      break;
+  }
+}
+
+static grpc_call_error start_ioreq_error(grpc_call *call,
+                                         gpr_uint32 mutated_ops,
+                                         grpc_call_error ret) {
+  size_t i;
+  for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+    if (mutated_ops & (1u << i)) {
+      call->request_set[i] = REQSET_EMPTY;
+    }
+  }
+  return ret;
+}
+
+static void finish_read_ops(grpc_call *call) {
+  int empty;
+
+  if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
+    empty =
+        (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
+                      grpc_bbq_pop(&call->incoming_queue)));
+    if (!empty) {
+      finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+      empty = grpc_bbq_empty(&call->incoming_queue);
+    }
+  } else {
+    empty = grpc_bbq_empty(&call->incoming_queue);
+  }
+
+  switch (call->read_state) {
+    case READ_STATE_STREAM_CLOSED:
+      if (empty) {
+        finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+      }
+    /* fallthrough */
+    case READ_STATE_READ_CLOSED:
+      if (empty) {
+        finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+      }
+      finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+      finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+    /* fallthrough */
+    case READ_STATE_GOT_INITIAL_METADATA:
+      finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+    /* fallthrough */
+    case READ_STATE_INITIAL:
+      /* do nothing */
+      break;
+  }
+}
+
+static void early_out_write_ops(grpc_call *call) {
+  switch (call->write_state) {
+    case WRITE_STATE_WRITE_CLOSED:
+      finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
+      finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR);
+      finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR);
+      finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+    /* fallthrough */
+    case WRITE_STATE_STARTED:
+      finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR);
+    /* fallthrough */
+    case WRITE_STATE_INITIAL:
+      /* do nothing */
+      break;
+  }
+}
+
+static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
+                                   size_t nreqs,
+                                   grpc_ioreq_completion_func completion,
+                                   void *user_data) {
+  size_t i;
+  gpr_uint32 have_ops = 0;
+  grpc_ioreq_op op;
+  reqinfo_master *master;
+  grpc_ioreq_data data;
+  gpr_uint8 set;
+
+  if (nreqs == 0) {
+    return GRPC_CALL_OK;
+  }
+
+  set = reqs[0].op;
+
+  for (i = 0; i < nreqs; i++) {
+    op = reqs[i].op;
+    if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
+      return start_ioreq_error(call, have_ops,
+                               GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
+    } else if (call->request_set[op] == REQSET_DONE) {
+      return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
+    }
+    have_ops |= 1u << op;
+    data = reqs[i].data;
+
+    call->request_data[op] = data;
+    call->request_set[op] = set;
+  }
+
+  master = &call->masters[set];
+  master->status = GRPC_OP_OK;
+  master->need_mask = have_ops;
+  master->complete_mask = 0;
+  master->on_complete = completion;
+  master->user_data = user_data;
+
+  if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
+    call->need_more_data = 1;
+  }
+
+  finish_read_ops(call);
+  early_out_write_ops(call);
+
+  return GRPC_CALL_OK;
+}
+
+static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
+                                  void *user_data) {
+  grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
+}
+
+grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
+                                      size_t nreqs, void *tag) {
+  grpc_call_error err;
+  lock(call);
+  err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
+  unlock(call);
+  return err;
+}
+
+grpc_call_error grpc_call_start_ioreq_and_call_back(
+    grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
+    grpc_ioreq_completion_func on_complete, void *user_data) {
+  grpc_call_error err;
+  lock(call);
+  err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
+  unlock(call);
+  return err;
 }
 
 void grpc_call_destroy(grpc_call *c) {
   int cancel;
-  gpr_mu_lock(&c->read_mu);
+  lock(c);
   if (c->have_alarm) {
     grpc_alarm_cancel(&c->alarm);
     c->have_alarm = 0;
   }
-  cancel = !c->received_finish;
-  gpr_mu_unlock(&c->read_mu);
+  cancel = c->read_state != READ_STATE_STREAM_CLOSED;
+  unlock(c);
   if (cancel) grpc_call_cancel(c);
-  grpc_call_internal_unref(c);
-}
-
-static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
-  if (!call->got_status_code) {
-    call->status_code = status;
-    call->got_status_code = 1;
-  }
-}
-
-static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
-  if (!call->status_details) {
-    call->status_details = grpc_mdstr_ref(status);
-  }
+  grpc_call_internal_unref(c, 1);
 }
 
 grpc_call_error grpc_call_cancel(grpc_call *c) {
@@ -314,12 +783,10 @@
   grpc_mdstr *details =
       description ? grpc_mdstr_from_string(c->metadata_context, description)
                   : NULL;
-  gpr_mu_lock(&c->read_mu);
-  maybe_set_status_code(c, status);
-  if (details) {
-    maybe_set_status_details(c, details);
-  }
-  gpr_mu_unlock(&c->read_mu);
+  lock(c);
+  set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
+  set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
+  unlock(c);
   return grpc_call_cancel(c);
 }
 
@@ -330,529 +797,50 @@
   elem->filter->call_op(elem, NULL, op);
 }
 
-void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
-                          gpr_uint32 flags) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  GPR_ASSERT(call->state < CALL_FINISHED);
-
-  op.type = GRPC_SEND_METADATA;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = flags;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-  op.data.metadata = mdelem;
-
-  elem = CALL_ELEM_FROM_CALL(call, 0);
-  elem->filter->call_op(elem, NULL, &op);
+grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
+  return CALL_FROM_TOP_ELEM(elem);
 }
 
-grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
-                                           grpc_metadata *metadata,
-                                           gpr_uint32 flags) {
-  grpc_mdelem *mdelem;
-
-  if (call->is_client) {
-    if (call->state >= CALL_STARTED) {
-      return GRPC_CALL_ERROR_ALREADY_INVOKED;
-    }
-  } else {
-    if (call->state >= CALL_FINISHED) {
-      return GRPC_CALL_ERROR_ALREADY_FINISHED;
-    }
-  }
-
-  mdelem = grpc_mdelem_from_string_and_buffer(
-      call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value,
-      metadata->value_length);
-  grpc_call_add_mdelem(call, mdelem, flags);
-  return GRPC_CALL_OK;
-}
-
-static void finish_call(grpc_call *call) {
-  size_t count;
-  grpc_metadata *elements;
-  count = grpc_metadata_buffer_count(&call->incoming_metadata);
-  elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
-  grpc_cq_end_finished(
-      call->cq, call->finished_tag, call, grpc_metadata_buffer_cleanup_elements,
-      elements, call->status_code,
-      call->status_details
-          ? (char *)grpc_mdstr_as_c_string(call->status_details)
-          : NULL,
-      elements, count);
-}
-
-static void done_write(void *user_data, grpc_op_error error) {
-  grpc_call *call = user_data;
-  void *tag = call->write_tag;
-
-  GPR_ASSERT(call->have_write);
-  call->have_write = 0;
-  call->write_tag = INVALID_TAG;
-  grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void done_writes_done(void *user_data, grpc_op_error error) {
-  grpc_call *call = user_data;
-  void *tag = call->write_tag;
-
-  GPR_ASSERT(call->have_write);
-  call->have_write = 0;
-  call->write_tag = INVALID_TAG;
-  grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void call_started(void *user_data, grpc_op_error error) {
-  grpc_call *call = user_data;
-  grpc_call_element *elem;
-  grpc_byte_buffer *pending_write = NULL;
-  gpr_uint32 pending_write_flags = 0;
-  gpr_uint8 pending_writes_done = 0;
-  int ok;
-  grpc_call_op op;
-
-  gpr_mu_lock(&call->read_mu);
-  GPR_ASSERT(!call->received_start);
-  call->received_start = 1;
-  ok = call->start_ok = (error == GRPC_OP_OK);
-  pending_write = call->pending_write;
-  pending_write_flags = call->pending_write_flags;
-  pending_writes_done = call->pending_writes_done;
-  gpr_mu_unlock(&call->read_mu);
-
-  if (pending_write) {
-    if (ok) {
-      op.type = GRPC_SEND_MESSAGE;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = pending_write_flags;
-      op.done_cb = done_write;
-      op.user_data = call;
-      op.data.message = pending_write;
-
-      elem = CALL_ELEM_FROM_CALL(call, 0);
-      elem->filter->call_op(elem, NULL, &op);
+static void call_alarm(void *arg, int success) {
+  grpc_call *call = arg;
+  if (success) {
+    if (call->is_client) {
+      grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
+                                   "Deadline Exceeded");
     } else {
-      done_write(call, error);
-    }
-    grpc_byte_buffer_destroy(pending_write);
-  }
-  if (pending_writes_done) {
-    if (ok) {
-      op.type = GRPC_SEND_FINISH;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = 0;
-      op.done_cb = done_writes_done;
-      op.user_data = call;
-
-      elem = CALL_ELEM_FROM_CALL(call, 0);
-      elem->filter->call_op(elem, NULL, &op);
-    } else {
-      done_writes_done(call, error);
+      grpc_call_cancel(call);
     }
   }
-
-  grpc_call_internal_unref(call);
+  grpc_call_internal_unref(call, 1);
 }
 
-grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
-                                     void *metadata_read_tag,
-                                     void *finished_tag, gpr_uint32 flags) {
-  grpc_call_element *elem;
-  grpc_call_op op;
+void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-  /* validate preconditions */
-  if (!call->is_client) {
-    gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__);
-    return GRPC_CALL_ERROR_NOT_ON_SERVER;
+  if (call->have_alarm) {
+    gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
   }
-
-  if (call->state >= CALL_STARTED || call->cq) {
-    gpr_log(GPR_ERROR, "call is already invoked");
-    return GRPC_CALL_ERROR_ALREADY_INVOKED;
-  }
-
-  if (call->have_write) {
-    gpr_log(GPR_ERROR, "can only have one pending write operation at a time");
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  if (call->have_read) {
-    gpr_log(GPR_ERROR, "can only have one pending read operation at a time");
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  if (flags & GRPC_WRITE_NO_COMPRESS) {
-    return GRPC_CALL_ERROR_INVALID_FLAGS;
-  }
-
-  /* inform the completion queue of an incoming operation */
-  grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-  grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
-
-  gpr_mu_lock(&call->read_mu);
-
-  /* update state */
-  call->cq = cq;
-  call->state = CALL_STARTED;
-  call->finished_tag = finished_tag;
-
-  if (call->received_finish) {
-    /* handle early cancellation */
-    grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
-                                     NULL, 0, NULL);
-    finish_call(call);
-
-    /* early out.. unlock & return */
-    gpr_mu_unlock(&call->read_mu);
-    return GRPC_CALL_OK;
-  }
-
-  call->metadata_tag = metadata_read_tag;
-
-  gpr_mu_unlock(&call->read_mu);
-
-  /* call down the filter stack */
-  op.type = GRPC_SEND_START;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = flags;
-  op.done_cb = call_started;
-  op.data.start.pollset = grpc_cq_pollset(cq);
-  op.user_data = call;
   grpc_call_internal_ref(call);
-
-  elem = CALL_ELEM_FROM_CALL(call, 0);
-  elem->filter->call_op(elem, NULL, &op);
-
-  return GRPC_CALL_OK;
+  call->have_alarm = 1;
+  grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
 }
 
-grpc_call_error grpc_call_server_accept_old(grpc_call *call,
-                                            grpc_completion_queue *cq,
-                                            void *finished_tag) {
-  /* validate preconditions */
-  if (call->is_client) {
-    gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__);
-    return GRPC_CALL_ERROR_NOT_ON_CLIENT;
-  }
-
-  if (call->state >= CALL_BOUNDCQ) {
-    gpr_log(GPR_ERROR, "call is already accepted");
-    return GRPC_CALL_ERROR_ALREADY_ACCEPTED;
-  }
-
-  /* inform the completion queue of an incoming operation (corresponding to
-     finished_tag) */
-  grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-
-  /* update state */
-  gpr_mu_lock(&call->read_mu);
-  call->state = CALL_BOUNDCQ;
-  call->cq = cq;
-  call->finished_tag = finished_tag;
-  call->received_start = 1;
-  if (prq_is_empty(&call->prq) && call->received_finish) {
-    finish_call(call);
-
-    /* early out.. unlock & return */
-    gpr_mu_unlock(&call->read_mu);
-    return GRPC_CALL_OK;
-  }
-  gpr_mu_unlock(&call->read_mu);
-
-  return GRPC_CALL_OK;
+static void set_read_state(grpc_call *call, read_state state) {
+  lock(call);
+  GPR_ASSERT(call->read_state < state);
+  call->read_state = state;
+  finish_read_ops(call);
+  unlock(call);
 }
 
-grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
-                                                          gpr_uint32 flags) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  /* validate preconditions */
-  if (call->is_client) {
-    gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__);
-    return GRPC_CALL_ERROR_NOT_ON_CLIENT;
-  }
-
-  if (call->state >= CALL_STARTED) {
-    gpr_log(GPR_ERROR, "call is already started");
-    return GRPC_CALL_ERROR_ALREADY_INVOKED;
-  }
-
-  if (flags & GRPC_WRITE_NO_COMPRESS) {
-    return GRPC_CALL_ERROR_INVALID_FLAGS;
-  }
-
-  /* update state */
-  call->state = CALL_STARTED;
-
-  /* call down */
-  op.type = GRPC_SEND_START;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = flags;
-  op.done_cb = do_nothing;
-  op.data.start.pollset = grpc_cq_pollset(call->cq);
-  op.user_data = NULL;
-
-  elem = CALL_ELEM_FROM_CALL(call, 0);
-  elem->filter->call_op(elem, NULL, &op);
-
-  return GRPC_CALL_OK;
+void grpc_call_read_closed(grpc_call_element *elem) {
+  set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
 }
 
-void grpc_call_client_initial_metadata_complete(
-    grpc_call_element *surface_element) {
-  grpc_call *call = grpc_call_from_top_element(surface_element);
-  size_t count;
-  grpc_metadata *elements;
-
-  gpr_mu_lock(&call->read_mu);
-  count = grpc_metadata_buffer_count(&call->incoming_metadata);
-  elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
-
-  GPR_ASSERT(!call->received_metadata);
-  grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
-                                   grpc_metadata_buffer_cleanup_elements,
-                                   elements, count, elements);
-  call->received_metadata = 1;
-  call->metadata_tag = INVALID_TAG;
-  gpr_mu_unlock(&call->read_mu);
-}
-
-static void request_more_data(grpc_call *call) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  /* call down */
-  op.type = GRPC_REQUEST_DATA;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = 0;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-
-  elem = CALL_ELEM_FROM_CALL(call, 0);
-  elem->filter->call_op(elem, NULL, &op);
-}
-
-grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
-  gpr_uint8 request_more = 0;
-
-  switch (call->state) {
-    case CALL_CREATED:
-      return GRPC_CALL_ERROR_NOT_INVOKED;
-    case CALL_BOUNDCQ:
-    case CALL_STARTED:
-      break;
-    case CALL_FINISHED:
-      return GRPC_CALL_ERROR_ALREADY_FINISHED;
-  }
-
-  gpr_mu_lock(&call->read_mu);
-
-  if (call->have_read) {
-    gpr_mu_unlock(&call->read_mu);
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  grpc_cq_begin_op(call->cq, call, GRPC_READ);
-
-  if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) {
-    if (call->reads_done) {
-      grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
-    } else {
-      call->read_tag = tag;
-      call->have_read = 1;
-      request_more = call->received_start;
-    }
-  } else if (prq_is_empty(&call->prq) && call->received_finish) {
-    finish_call(call);
-  }
-
-  gpr_mu_unlock(&call->read_mu);
-
-  if (request_more) {
-    request_more_data(call);
-  }
-
-  return GRPC_CALL_OK;
-}
-
-grpc_call_error grpc_call_start_write_old(grpc_call *call,
-                                          grpc_byte_buffer *byte_buffer,
-                                          void *tag, gpr_uint32 flags) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  switch (call->state) {
-    case CALL_CREATED:
-    case CALL_BOUNDCQ:
-      return GRPC_CALL_ERROR_NOT_INVOKED;
-    case CALL_STARTED:
-      break;
-    case CALL_FINISHED:
-      return GRPC_CALL_ERROR_ALREADY_FINISHED;
-  }
-
-  if (call->have_write) {
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
-
-  /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
-     flush, and that flush should be propogated down from here */
-  if (byte_buffer == NULL) {
-    grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK);
-    return GRPC_CALL_OK;
-  }
-
-  call->write_tag = tag;
-  call->have_write = 1;
-
-  gpr_mu_lock(&call->read_mu);
-  if (!call->received_start) {
-    call->pending_write = grpc_byte_buffer_copy(byte_buffer);
-    call->pending_write_flags = flags;
-
-    gpr_mu_unlock(&call->read_mu);
-  } else {
-    gpr_mu_unlock(&call->read_mu);
-
-    op.type = GRPC_SEND_MESSAGE;
-    op.dir = GRPC_CALL_DOWN;
-    op.flags = flags;
-    op.done_cb = done_write;
-    op.user_data = call;
-    op.data.message = byte_buffer;
-
-    elem = CALL_ELEM_FROM_CALL(call, 0);
-    elem->filter->call_op(elem, NULL, &op);
-  }
-
-  return GRPC_CALL_OK;
-}
-
-grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  if (!call->is_client) {
-    return GRPC_CALL_ERROR_NOT_ON_SERVER;
-  }
-
-  switch (call->state) {
-    case CALL_CREATED:
-    case CALL_BOUNDCQ:
-      return GRPC_CALL_ERROR_NOT_INVOKED;
-    case CALL_FINISHED:
-      return GRPC_CALL_ERROR_ALREADY_FINISHED;
-    case CALL_STARTED:
-      break;
-  }
-
-  if (call->have_write) {
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
-  call->write_tag = tag;
-  call->have_write = 1;
-
-  gpr_mu_lock(&call->read_mu);
-  if (!call->received_start) {
-    call->pending_writes_done = 1;
-
-    gpr_mu_unlock(&call->read_mu);
-  } else {
-    gpr_mu_unlock(&call->read_mu);
-
-    op.type = GRPC_SEND_FINISH;
-    op.dir = GRPC_CALL_DOWN;
-    op.flags = 0;
-    op.done_cb = done_writes_done;
-    op.user_data = call;
-
-    elem = CALL_ELEM_FROM_CALL(call, 0);
-    elem->filter->call_op(elem, NULL, &op);
-  }
-
-  return GRPC_CALL_OK;
-}
-
-grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
-                                                 grpc_status_code status,
-                                                 const char *details,
-                                                 void *tag) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  if (call->is_client) {
-    return GRPC_CALL_ERROR_NOT_ON_CLIENT;
-  }
-
-  switch (call->state) {
-    case CALL_CREATED:
-    case CALL_BOUNDCQ:
-      return GRPC_CALL_ERROR_NOT_INVOKED;
-    case CALL_FINISHED:
-      return GRPC_CALL_ERROR_ALREADY_FINISHED;
-    case CALL_STARTED:
-      break;
-  }
-
-  if (call->have_write) {
-    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
-  }
-
-  elem = CALL_ELEM_FROM_CALL(call, 0);
-
-  if (details && details[0]) {
-    grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context,
-                                               "grpc-message", details);
-
-    op.type = GRPC_SEND_METADATA;
-    op.dir = GRPC_CALL_DOWN;
-    op.flags = 0;
-    op.done_cb = do_nothing;
-    op.user_data = NULL;
-    op.data.metadata = md;
-    elem->filter->call_op(elem, NULL, &op);
-  }
-
-  /* always send status */
-  {
-    grpc_mdelem *md;
-    char buffer[GPR_LTOA_MIN_BUFSIZE];
-    gpr_ltoa(status, buffer);
-    md =
-        grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer);
-
-    op.type = GRPC_SEND_METADATA;
-    op.dir = GRPC_CALL_DOWN;
-    op.flags = 0;
-    op.done_cb = do_nothing;
-    op.user_data = NULL;
-    op.data.metadata = md;
-    elem->filter->call_op(elem, NULL, &op);
-  }
-
-  grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
-  call->state = CALL_FINISHED;
-  call->write_tag = tag;
-  call->have_write = 1;
-
-  op.type = GRPC_SEND_FINISH;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = 0;
-  op.done_cb = done_writes_done;
-  op.user_data = call;
-
-  elem->filter->call_op(elem, NULL, &op);
-
-  return GRPC_CALL_OK;
+void grpc_call_stream_closed(grpc_call_element *elem) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+  set_read_state(call, READ_STATE_STREAM_CLOSED);
+  grpc_call_internal_unref(call, 0);
 }
 
 /* we offset status by a small amount when storing it into transport metadata
@@ -878,112 +866,359 @@
   return status;
 }
 
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
+void grpc_call_recv_message(grpc_call_element *elem,
+                            grpc_byte_buffer *byte_buffer) {
   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  grpc_mdelem *md = op->data.metadata;
+  lock(call);
+  grpc_bbq_push(&call->incoming_queue, byte_buffer);
+  finish_read_ops(call);
+  unlock(call);
+}
+
+void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
   grpc_mdstr *key = md->key;
+  grpc_metadata_array *dest;
+  grpc_metadata *mdusr;
 
+  lock(call);
   if (key == grpc_channel_get_status_string(call->channel)) {
-    maybe_set_status_code(call, decode_status(md));
+    set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
     grpc_mdelem_unref(md);
-    op->done_cb(op->user_data, GRPC_OP_OK);
   } else if (key == grpc_channel_get_message_string(call->channel)) {
-    maybe_set_status_details(call, md->value);
+    set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
     grpc_mdelem_unref(md);
-    op->done_cb(op->user_data, GRPC_OP_OK);
   } else {
-    grpc_metadata_buffer_queue(&call->incoming_metadata, op);
-  }
-}
-
-void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
-  gpr_mu_lock(&call->read_mu);
-
-  if (call->have_read) {
-    grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL);
-    call->read_tag = INVALID_TAG;
-    call->have_read = 0;
-  }
-  if (call->is_client && !call->received_metadata && call->cq) {
-    size_t count;
-    grpc_metadata *elements;
-
-    call->received_metadata = 1;
-
-    count = grpc_metadata_buffer_count(&call->incoming_metadata);
-    elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
-    grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
-                                     grpc_metadata_buffer_cleanup_elements,
-                                     elements, count, elements);
-  }
-  if (is_full_close) {
-    if (call->have_alarm) {
-      grpc_alarm_cancel(&call->alarm);
-      call->have_alarm = 0;
+    dest = &call->buffered_metadata[call->read_state >=
+                                    READ_STATE_GOT_INITIAL_METADATA];
+    if (dest->count == dest->capacity) {
+      dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+      dest->metadata =
+          gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
     }
-    call->received_finish = 1;
-    if (prq_is_empty(&call->prq) && call->cq != NULL) {
-      finish_call(call);
+    mdusr = &dest->metadata[dest->count++];
+    mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
+    mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
+    mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+    if (call->owned_metadata_count == call->owned_metadata_capacity) {
+      call->owned_metadata_capacity = GPR_MAX(
+          call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
+      call->owned_metadata =
+          gpr_realloc(call->owned_metadata,
+                      sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
     }
-  } else {
-    call->reads_done = 1;
+    call->owned_metadata[call->owned_metadata_count++] = md;
   }
-  gpr_mu_unlock(&call->read_mu);
-}
-
-void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message,
-                            void (*on_finish)(void *user_data,
-                                              grpc_op_error error),
-                            void *user_data) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
-  gpr_mu_lock(&call->read_mu);
-  if (call->have_read) {
-    grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data,
-                     message);
-    call->read_tag = INVALID_TAG;
-    call->have_read = 0;
-  } else {
-    prq_push(&call->prq, message, on_finish, user_data);
-  }
-  gpr_mu_unlock(&call->read_mu);
-}
-
-grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
-  return CALL_FROM_TOP_ELEM(elem);
-}
-
-grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
-  return &call->incoming_metadata;
-}
-
-static void call_alarm(void *arg, int success) {
-  grpc_call *call = arg;
-  if (success) {
-    if (call->is_client) {
-      grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
-                                   "Deadline Exceeded");
-    } else {
-      grpc_call_cancel(call);
-    }
-  }
-  grpc_call_internal_unref(call);
-}
-
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
-  if (call->have_alarm) {
-    gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
-  }
-  grpc_call_internal_ref(call);
-  call->have_alarm = 1;
-  grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
+  unlock(call);
 }
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
   return CALL_STACK_FROM_CALL(call);
 }
 
+void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
+  grpc_call *call = grpc_call_from_top_element(surface_element);
+  set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
+}
+
+/*
+ * LEGACY API IMPLEMENTATION
+ * All this code will disappear as soon as wrappings are updated
+ */
+
+struct legacy_state {
+  gpr_uint8 md_out_buffer;
+  size_t md_out_count[2];
+  size_t md_out_capacity[2];
+  grpc_metadata *md_out[2];
+  grpc_byte_buffer *msg_out;
+
+  /* input buffers */
+  grpc_metadata_array initial_md_in;
+  grpc_metadata_array trailing_md_in;
+
+  size_t details_capacity;
+  char *details;
+  grpc_status_code status;
+
+  size_t msg_in_read_idx;
+  grpc_byte_buffer *msg_in;
+
+  void *finished_tag;
+};
+
+static legacy_state *get_legacy_state(grpc_call *call) {
+  if (call->legacy_state == NULL) {
+    call->legacy_state = gpr_malloc(sizeof(legacy_state));
+    memset(call->legacy_state, 0, sizeof(legacy_state));
+  }
+  return call->legacy_state;
+}
+
+static void destroy_legacy_state(legacy_state *ls) {
+  size_t i, j;
+  for (i = 0; i < 2; i++) {
+    for (j = 0; j < ls->md_out_count[i]; j++) {
+      gpr_free(ls->md_out[i][j].key);
+      gpr_free(ls->md_out[i][j].value);
+    }
+    gpr_free(ls->md_out[i]);
+  }
+  gpr_free(ls->initial_md_in.metadata);
+  gpr_free(ls->trailing_md_in.metadata);
+  gpr_free(ls);
+}
+
+grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
+                                           grpc_metadata *metadata,
+                                           gpr_uint32 flags) {
+  legacy_state *ls;
+  grpc_metadata *mdout;
+
+  lock(call);
+  ls = get_legacy_state(call);
+
+  if (ls->md_out_count[ls->md_out_buffer] ==
+      ls->md_out_capacity[ls->md_out_buffer]) {
+    ls->md_out_capacity[ls->md_out_buffer] =
+        GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
+                ls->md_out_capacity[ls->md_out_buffer] + 8);
+    ls->md_out[ls->md_out_buffer] = gpr_realloc(
+        ls->md_out[ls->md_out_buffer],
+        sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
+  }
+  mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
+  mdout->key = gpr_strdup(metadata->key);
+  mdout->value = gpr_malloc(metadata->value_length);
+  mdout->value_length = metadata->value_length;
+  memcpy(mdout->value, metadata->value, metadata->value_length);
+
+  unlock(call);
+
+  return GRPC_CALL_OK;
+}
+
+static void finish_status(grpc_call *call, grpc_op_error status,
+                          void *ignored) {
+  legacy_state *ls;
+
+  lock(call);
+  ls = get_legacy_state(call);
+  grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
+                       ls->status, ls->details, ls->trailing_md_in.metadata,
+                       ls->trailing_md_in.count);
+  unlock(call);
+}
+
+static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
+                                 void *tag) {
+  legacy_state *ls;
+
+  lock(call);
+  ls = get_legacy_state(call);
+  if (status == GRPC_OP_OK) {
+    grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
+                                     ls->initial_md_in.count,
+                                     ls->initial_md_in.metadata);
+
+  } else {
+    grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
+                                     NULL);
+  }
+  unlock(call);
+}
+
+static void finish_send_metadata(grpc_call *call, grpc_op_error status,
+                                 void *tag) {}
+
+grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
+                                     void *metadata_read_tag,
+                                     void *finished_tag, gpr_uint32 flags) {
+  grpc_ioreq reqs[3];
+  legacy_state *ls;
+  grpc_call_error err;
+
+  grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
+  grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+
+  lock(call);
+  ls = get_legacy_state(call);
+  err = bind_cq(call, cq);
+  if (err != GRPC_CALL_OK) goto done;
+
+  ls->finished_tag = finished_tag;
+
+  reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
+  reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+  reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+  ls->md_out_buffer++;
+  err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
+  if (err != GRPC_CALL_OK) goto done;
+
+  reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+  reqs[0].data.recv_metadata = &ls->initial_md_in;
+  err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
+  if (err != GRPC_CALL_OK) goto done;
+
+  reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
+  reqs[0].data.recv_metadata = &ls->trailing_md_in;
+  reqs[1].op = GRPC_IOREQ_RECV_STATUS;
+  reqs[1].data.recv_status.details = &ls->details;
+  reqs[1].data.recv_status.details_capacity = &ls->details_capacity;
+  reqs[1].data.recv_status.code = &ls->status;
+  reqs[2].op = GRPC_IOREQ_RECV_CLOSE;
+  err = start_ioreq(call, reqs, 3, finish_status, NULL);
+  if (err != GRPC_CALL_OK) goto done;
+
+done:
+  unlock(call);
+  return err;
+}
+
+grpc_call_error grpc_call_server_accept_old(grpc_call *call,
+                                            grpc_completion_queue *cq,
+                                            void *finished_tag) {
+  grpc_ioreq reqs[2];
+  grpc_call_error err;
+  legacy_state *ls;
+
+  /* inform the completion queue of an incoming operation (corresponding to
+     finished_tag) */
+  grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+
+  lock(call);
+  ls = get_legacy_state(call);
+
+  err = bind_cq(call, cq);
+  if (err != GRPC_CALL_OK) return err;
+
+  ls->finished_tag = finished_tag;
+
+  reqs[0].op = GRPC_IOREQ_RECV_STATUS;
+  reqs[0].data.recv_status.details = NULL;
+  reqs[0].data.recv_status.details_capacity = 0;
+  reqs[0].data.recv_status.code = &ls->status;
+  reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
+  err = start_ioreq(call, reqs, 2, finish_status, NULL);
+  unlock(call);
+  return err;
+}
+
+static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
+                                         void *tag) {}
+
+grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
+                                                          gpr_uint32 flags) {
+  grpc_ioreq req;
+  grpc_call_error err;
+  legacy_state *ls;
+
+  lock(call);
+  ls = get_legacy_state(call);
+  req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
+  req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+  req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+  err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
+  unlock(call);
+
+  return err;
+}
+
+static void finish_read_event(void *p, grpc_op_error error) {
+  if (p) grpc_byte_buffer_destroy(p);
+}
+
+static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
+  legacy_state *ls;
+  grpc_byte_buffer *msg;
+  lock(call);
+  ls = get_legacy_state(call);
+  msg = ls->msg_in;
+  grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
+  unlock(call);
+}
+
+grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
+  legacy_state *ls;
+  grpc_ioreq req;
+  grpc_call_error err;
+
+  grpc_cq_begin_op(call->cq, call, GRPC_READ);
+
+  lock(call);
+  ls = get_legacy_state(call);
+  req.op = GRPC_IOREQ_RECV_MESSAGE;
+  req.data.recv_message = &ls->msg_in;
+  err = start_ioreq(call, &req, 1, finish_read, tag);
+  unlock(call);
+  return err;
+}
+
+static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
+  lock(call);
+  grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
+  unlock(call);
+  grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
+}
+
+grpc_call_error grpc_call_start_write_old(grpc_call *call,
+                                          grpc_byte_buffer *byte_buffer,
+                                          void *tag, gpr_uint32 flags) {
+  grpc_ioreq req;
+  legacy_state *ls;
+  grpc_call_error err;
+
+  grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
+
+  lock(call);
+  ls = get_legacy_state(call);
+  ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
+  req.op = GRPC_IOREQ_SEND_MESSAGE;
+  req.data.send_message = ls->msg_out;
+  err = start_ioreq(call, &req, 1, finish_write, tag);
+  unlock(call);
+
+  return err;
+}
+
+static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
+  grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
+}
+
+grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
+  grpc_ioreq req;
+  grpc_call_error err;
+  grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+  lock(call);
+  req.op = GRPC_IOREQ_SEND_CLOSE;
+  err = start_ioreq(call, &req, 1, finish_finish, tag);
+  unlock(call);
+
+  return err;
+}
+
+grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
+                                                 grpc_status_code status,
+                                                 const char *details,
+                                                 void *tag) {
+  grpc_ioreq reqs[3];
+  grpc_call_error err;
+  legacy_state *ls;
+  grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+  lock(call);
+  ls = get_legacy_state(call);
+  reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+  reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+  reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+  reqs[1].op = GRPC_IOREQ_SEND_STATUS;
+  reqs[1].data.send_status.code = status;
+  /* MEMLEAK */
+  reqs[1].data.send_status.details = gpr_strdup(details);
+  reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
+  err = start_ioreq(call, reqs, 3, finish_finish, tag);
+  unlock(call);
+
+  return err;
+}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 804b387..936fb29 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -38,27 +38,73 @@
 #include "src/core/channel/metadata_buffer.h"
 #include <grpc/grpc.h>
 
+/* Primitive operation types - grpc_op's get rewritten into these */
+typedef enum {
+  GRPC_IOREQ_RECV_INITIAL_METADATA,
+  GRPC_IOREQ_RECV_MESSAGE,
+  GRPC_IOREQ_RECV_TRAILING_METADATA,
+  GRPC_IOREQ_RECV_STATUS,
+  GRPC_IOREQ_RECV_CLOSE,
+  GRPC_IOREQ_SEND_INITIAL_METADATA,
+  GRPC_IOREQ_SEND_MESSAGE,
+  GRPC_IOREQ_SEND_TRAILING_METADATA,
+  GRPC_IOREQ_SEND_STATUS,
+  GRPC_IOREQ_SEND_CLOSE,
+  GRPC_IOREQ_OP_COUNT
+} grpc_ioreq_op;
+
+typedef struct {
+  grpc_status_code *code;
+  char **details;
+  size_t *details_capacity;
+} grpc_recv_status_args;
+
+typedef union {
+  grpc_metadata_array *recv_metadata;
+  grpc_byte_buffer **recv_message;
+  grpc_recv_status_args recv_status;
+  struct {
+    size_t count;
+    grpc_metadata *metadata;
+  } send_metadata;
+  grpc_byte_buffer *send_message;
+  struct {
+    grpc_status_code code;
+    char *details;
+  } send_status;
+} grpc_ioreq_data;
+
+typedef struct {
+  grpc_ioreq_op op;
+  grpc_ioreq_data data;
+} grpc_ioreq;
+
+typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
+                                           grpc_op_error status,
+                                           void *user_data);
+
 grpc_call *grpc_call_create(grpc_channel *channel,
                             const void *server_transport_data);
 
 void grpc_call_internal_ref(grpc_call *call);
-void grpc_call_internal_unref(grpc_call *call);
+void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
 
 /* Helpers for grpc_client, grpc_server filters to publish received data to
    the completion queue/surface layer */
 void grpc_call_recv_metadata(grpc_call_element *surface_element,
-                             grpc_call_op *op);
-void grpc_call_recv_message(
-    grpc_call_element *surface_element, grpc_byte_buffer *message,
-    void (*on_finish)(void *user_data, grpc_op_error error), void *user_data);
-void grpc_call_recv_finish(grpc_call_element *surface_element,
-                           int is_full_close);
+                             grpc_mdelem *md);
+void grpc_call_recv_message(grpc_call_element *surface_element,
+                            grpc_byte_buffer *message);
+void grpc_call_read_closed(grpc_call_element *surface_element);
+void grpc_call_stream_closed(grpc_call_element *surface_element);
 
 void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
+grpc_call_error grpc_call_start_ioreq_and_call_back(
+    grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
+    grpc_ioreq_completion_func on_complete, void *user_data);
 
-/* Called when it's known that the initial batch of metadata is complete on the
-   client side (must not be called on the server) */
-void grpc_call_client_initial_metadata_complete(
+/* Called when it's known that the initial batch of metadata is complete */
+void grpc_call_initial_metadata_complete(
     grpc_call_element *surface_element);
 
 void grpc_call_set_deadline(grpc_call_element *surface_element,
@@ -69,10 +115,4 @@
 /* Given the top call_element, get the call object. */
 grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
 
-/* Get the metadata buffer. */
-grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call);
-
-void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
-                          gpr_uint32 flags);
-
 #endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 93a2c06..c33ea92 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -51,7 +51,7 @@
   grpc_mdstr *authority_string;
 };
 
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
 
 grpc_channel *grpc_channel_create_from_filters(
     const grpc_channel_filter **filters, size_t num_filters,
@@ -80,6 +80,7 @@
   grpc_call *call;
   grpc_mdelem *path_mdelem;
   grpc_mdelem *authority_mdelem;
+  grpc_call_op op;
 
   if (!channel->is_client) {
     gpr_log(GPR_ERROR, "Cannot create a call on the server.");
@@ -91,20 +92,25 @@
   /* Add :path and :authority headers. */
   /* TODO(klempner): Consider optimizing this by stashing mdelems for common
      values of method and host. */
-  grpc_mdstr_ref(channel->path_string);
   path_mdelem = grpc_mdelem_from_metadata_strings(
-      channel->metadata_context, channel->path_string,
+      channel->metadata_context, grpc_mdstr_ref(channel->path_string),
       grpc_mdstr_from_string(channel->metadata_context, method));
-  grpc_call_add_mdelem(call, path_mdelem, 0);
+  op.type = GRPC_SEND_METADATA;
+  op.dir = GRPC_CALL_DOWN;
+  op.flags = 0;
+  op.data.metadata = path_mdelem;
+  op.done_cb = do_nothing;
+  op.user_data = NULL;
+  grpc_call_execute_op(call, &op);
 
   grpc_mdstr_ref(channel->authority_string);
   authority_mdelem = grpc_mdelem_from_metadata_strings(
       channel->metadata_context, channel->authority_string,
       grpc_mdstr_from_string(channel->metadata_context, host));
-  grpc_call_add_mdelem(call, authority_mdelem, 0);
+  op.data.metadata = authority_mdelem;
+  grpc_call_execute_op(call, &op);
 
   if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
-    grpc_call_op op;
     op.type = GRPC_SEND_DEADLINE;
     op.dir = GRPC_CALL_DOWN;
     op.flags = 0;
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index a7c9b90..fa63e85 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -56,23 +56,23 @@
       grpc_call_next_op(elem, op);
       break;
     case GRPC_RECV_METADATA:
-      grpc_call_recv_metadata(elem, op);
+      grpc_call_recv_metadata(elem, op->data.metadata);
       break;
     case GRPC_RECV_DEADLINE:
       gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
       break;
     case GRPC_RECV_MESSAGE:
-      grpc_call_recv_message(elem, op->data.message, op->done_cb,
-                             op->user_data);
+      grpc_call_recv_message(elem, op->data.message);
+      op->done_cb(op->user_data, GRPC_OP_OK);
       break;
     case GRPC_RECV_HALF_CLOSE:
-      grpc_call_recv_finish(elem, 0);
+      grpc_call_read_closed(elem);
       break;
     case GRPC_RECV_FINISH:
-      grpc_call_recv_finish(elem, 1);
+      grpc_call_stream_closed(elem);
       break;
     case GRPC_RECV_END_OF_INITIAL_METADATA:
-      grpc_call_client_initial_metadata_complete(elem);
+      grpc_call_initial_metadata_complete(elem);
       break;
     default:
       GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 2bf31c5..ae3b960 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -173,18 +173,6 @@
   gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
 }
 
-void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
-                                 grpc_call *call,
-                                 grpc_event_finish_func on_finish,
-                                 void *user_data, grpc_op_error error) {
-  event *ev;
-  gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
-  ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
-  ev->base.data.invoke_accepted = error;
-  end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
-  gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
 void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
                                 grpc_call *call,
                                 grpc_event_finish_func on_finish,
@@ -197,6 +185,17 @@
   gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
 }
 
+void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
+                       grpc_event_finish_func on_finish, void *user_data,
+                       grpc_op_error error) {
+  event *ev;
+  gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+  ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data);
+  ev->base.data.write_accepted = error;
+  end_op_locked(cc, GRPC_IOREQ);
+  gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+}
+
 void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
                                  grpc_call *call,
                                  grpc_event_finish_func on_finish,
@@ -389,7 +388,7 @@
   event *ev = (event *)base;
   ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
   if (ev->base.call) {
-    grpc_call_internal_unref(ev->base.call);
+    grpc_call_internal_unref(ev->base.call, 1);
   }
   gpr_free(ev);
 }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 8598407..fea8336 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -97,6 +97,10 @@
                          gpr_timespec deadline, size_t metadata_count,
                          grpc_metadata *metadata_elements);
 
+void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
+                       grpc_event_finish_func on_finish, void *user_data,
+                       grpc_op_error error);
+
 void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
 
 /* disable polling for some tests */
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 8975d31..7c76bf9 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -87,10 +87,10 @@
         gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
       }
       break;
-    case GRPC_INVOKE_ACCEPTED:
-      gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: "));
+    case GRPC_IOREQ:
+      gpr_strvec_add(&buf, gpr_strdup("IOREQ: "));
       addhdr(&buf, ev);
-      adderr(&buf, ev->data.invoke_accepted);
+      adderr(&buf, ev->data.ioreq);
       break;
     case GRPC_WRITE_ACCEPTED:
       gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 6098ac7..2f5eff5 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -50,26 +50,16 @@
   grpc_mdelem *message;
 } channel_data;
 
-static void do_nothing(void *data, grpc_op_error error) {}
-
 static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
                     grpc_call_op *op) {
   channel_data *channeld = elem->channel_data;
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
   switch (op->type) {
-    case GRPC_SEND_START: {
-      grpc_call_op set_status_op;
-      grpc_mdelem_ref(channeld->message);
-      memset(&set_status_op, 0, sizeof(grpc_call_op));
-      set_status_op.dir = GRPC_CALL_UP;
-      set_status_op.type = GRPC_RECV_METADATA;
-      set_status_op.done_cb = do_nothing;
-      set_status_op.data.metadata = channeld->message;
-      grpc_call_recv_metadata(elem, &set_status_op);
-      grpc_call_recv_finish(elem, 1);
+    case GRPC_SEND_START:
+      grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
+      grpc_call_stream_closed(elem);
       break;
-    }
     case GRPC_SEND_METADATA:
       grpc_mdelem_unref(op->data.metadata);
       break;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9e2e4d5..a057694 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -44,6 +44,7 @@
 #include "src/core/surface/call.h"
 #include "src/core/surface/channel.h"
 #include "src/core/surface/completion_queue.h"
+#include "src/core/transport/metadata.h"
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/useful.h>
@@ -63,11 +64,24 @@
 struct channel_data {
   grpc_server *server;
   grpc_channel *channel;
+  grpc_mdstr *path_key;
+  grpc_mdstr *authority_key;
   /* linked list of all channels on a server */
   channel_data *next;
   channel_data *prev;
 };
 
+typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
+                            grpc_metadata_array *initial_metadata,
+                            call_data *calld, void *user_data);
+
+typedef struct {
+  void *user_data;
+  grpc_completion_queue *cq;
+  grpc_metadata_array *initial_metadata;
+  new_call_cb cb;
+} requested_call;
+
 struct grpc_server {
   size_t channel_filter_count;
   const grpc_channel_filter **channel_filters;
@@ -76,9 +90,9 @@
 
   gpr_mu mu;
 
-  void **tags;
-  size_t ntags;
-  size_t tag_cap;
+  requested_call *requested_calls;
+  size_t requested_call_count;
+  size_t requested_call_capacity;
 
   gpr_uint8 shutdown;
   gpr_uint8 have_shutdown_tag;
@@ -107,11 +121,17 @@
   ZOMBIED
 } call_state;
 
+typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data;
+
 struct call_data {
   grpc_call *call;
 
   call_state state;
   gpr_timespec deadline;
+  grpc_mdstr *path;
+  grpc_mdstr *host;
+
+  legacy_data *legacy;
 
   gpr_uint8 included[CALL_LIST_COUNT];
   call_link links[CALL_LIST_COUNT];
@@ -179,7 +199,7 @@
     grpc_channel_args_destroy(server->channel_args);
     gpr_mu_destroy(&server->mu);
     gpr_free(server->channel_filters);
-    gpr_free(server->tags);
+    gpr_free(server->requested_calls);
     gpr_free(server);
   }
 }
@@ -210,62 +230,37 @@
   grpc_iomgr_add_callback(finish_destroy_channel, chand);
 }
 
-static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) {
-  grpc_call *call = calld->call;
-  grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call);
-  size_t count = grpc_metadata_buffer_count(mdbuf);
-  grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf);
-  const char *host = NULL;
-  const char *method = NULL;
-  size_t i;
-
-  for (i = 0; i < count; i++) {
-    if (0 == strcmp(elements[i].key, ":authority")) {
-      host = elements[i].value;
-    } else if (0 == strcmp(elements[i].key, ":path")) {
-      method = elements[i].value;
-    }
-  }
-
-  grpc_call_internal_ref(call);
-  grpc_cq_end_new_rpc(server->cq, tag, call,
-                      grpc_metadata_buffer_cleanup_elements, elements, method,
-                      host, calld->deadline, count, elements);
-}
-
 static void start_new_rpc(grpc_call_element *elem) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
   grpc_server *server = chand->server;
 
   gpr_mu_lock(&server->mu);
-  if (server->ntags) {
+  if (server->requested_call_count > 0) {
+    requested_call rc = server->requested_calls[--server->requested_call_count];
     calld->state = ACTIVATED;
-    queue_new_rpc(server, calld, server->tags[--server->ntags]);
+    gpr_mu_unlock(&server->mu);
+    rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
   } else {
     calld->state = PENDING;
     call_list_join(server, calld, PENDING_START);
+    gpr_mu_unlock(&server->mu);
   }
-  gpr_mu_unlock(&server->mu);
 }
 
 static void kill_zombie(void *elem, int success) {
   grpc_call_destroy(grpc_call_from_top_element(elem));
 }
 
-static void finish_rpc(grpc_call_element *elem, int is_full_close) {
+static void stream_closed(grpc_call_element *elem) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   gpr_mu_lock(&chand->server->mu);
   switch (calld->state) {
     case ACTIVATED:
-      grpc_call_recv_finish(elem, is_full_close);
+      grpc_call_stream_closed(elem);
       break;
     case PENDING:
-      if (!is_full_close) {
-        grpc_call_recv_finish(elem, is_full_close);
-        break;
-      }
       call_list_remove(chand->server, calld, PENDING_START);
     /* fallthrough intended */
     case NOT_STARTED:
@@ -278,25 +273,57 @@
   gpr_mu_unlock(&chand->server->mu);
 }
 
+static void read_closed(grpc_call_element *elem) {
+  call_data *calld = elem->call_data;
+  channel_data *chand = elem->channel_data;
+  gpr_mu_lock(&chand->server->mu);
+  switch (calld->state) {
+    case ACTIVATED:
+    case PENDING:
+      grpc_call_read_closed(elem);
+      break;
+    case NOT_STARTED:
+      calld->state = ZOMBIED;
+      grpc_iomgr_add_callback(kill_zombie, elem);
+      break;
+    case ZOMBIED:
+      break;
+  }
+  gpr_mu_unlock(&chand->server->mu);
+}
+
 static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
                     grpc_call_op *op) {
+  channel_data *chand = elem->channel_data;
+  call_data *calld = elem->call_data;
+  grpc_mdelem *md;
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
   switch (op->type) {
     case GRPC_RECV_METADATA:
-      grpc_call_recv_metadata(elem, op);
+      md = op->data.metadata;
+      if (md->key == chand->path_key) {
+        calld->path = grpc_mdstr_ref(md->value);
+        grpc_mdelem_unref(md);
+      } else if (md->key == chand->authority_key) {
+        calld->host = grpc_mdstr_ref(md->value);
+        grpc_mdelem_unref(md);
+      } else {
+        grpc_call_recv_metadata(elem, md);
+      }
       break;
     case GRPC_RECV_END_OF_INITIAL_METADATA:
       start_new_rpc(elem);
+      grpc_call_initial_metadata_complete(elem);
       break;
     case GRPC_RECV_MESSAGE:
-      grpc_call_recv_message(elem, op->data.message, op->done_cb,
-                             op->user_data);
+      grpc_call_recv_message(elem, op->data.message);
+      op->done_cb(op->user_data, GRPC_OP_OK);
       break;
     case GRPC_RECV_HALF_CLOSE:
-      finish_rpc(elem, 0);
+      read_closed(elem);
       break;
     case GRPC_RECV_FINISH:
-      finish_rpc(elem, 1);
+      stream_closed(elem);
       break;
     case GRPC_RECV_DEADLINE:
       grpc_call_set_deadline(elem, op->data.deadline);
@@ -371,6 +398,7 @@
 
 static void destroy_call_elem(grpc_call_element *elem) {
   channel_data *chand = elem->channel_data;
+  call_data *calld = elem->call_data;
   int i;
 
   gpr_mu_lock(&chand->server->mu);
@@ -383,6 +411,19 @@
   }
   gpr_mu_unlock(&chand->server->mu);
 
+  if (calld->host) {
+    grpc_mdstr_unref(calld->host);
+  }
+  if (calld->path) {
+    grpc_mdstr_unref(calld->path);
+  }
+
+  if (calld->legacy) {
+    gpr_free(calld->legacy->initial_metadata->metadata);
+    gpr_free(calld->legacy->initial_metadata);
+    gpr_free(calld->legacy);
+  }
+
   server_unref(chand->server);
 }
 
@@ -395,6 +436,8 @@
   GPR_ASSERT(!is_last);
   chand->server = NULL;
   chand->channel = NULL;
+  chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
+  chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
   chand->next = chand->prev = chand;
 }
 
@@ -406,6 +449,8 @@
     chand->prev->next = chand->next;
     chand->next = chand->prev = chand;
     gpr_mu_unlock(&chand->server->mu);
+    grpc_mdstr_unref(chand->path_key);
+    grpc_mdstr_unref(chand->authority_key);
     server_unref(chand->server);
   }
 }
@@ -413,17 +458,8 @@
 static const grpc_channel_filter server_surface_filter = {
     call_op,           channel_op,           sizeof(call_data),
     init_call_elem,    destroy_call_elem,    sizeof(channel_data),
-    init_channel_elem, destroy_channel_elem, "server", };
-
-static void early_terminate_requested_calls(grpc_completion_queue *cq,
-                                            void **tags, size_t ntags) {
-  size_t i;
-
-  for (i = 0; i < ntags; i++) {
-    grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL,
-                        gpr_inf_past, 0, NULL);
-  }
-}
+    init_channel_elem, destroy_channel_elem, "server",
+};
 
 grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
                                              grpc_channel_filter **filters,
@@ -517,8 +553,8 @@
 void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
                        void *shutdown_tag) {
   listener *l;
-  void **tags;
-  size_t ntags;
+  requested_call *requested_calls;
+  size_t requested_call_count;
   channel_data **channels;
   channel_data *c;
   size_t nchannels;
@@ -547,10 +583,10 @@
     i++;
   }
 
-  tags = server->tags;
-  ntags = server->ntags;
-  server->tags = NULL;
-  server->ntags = 0;
+  requested_calls = server->requested_calls;
+  requested_call_count = server->requested_call_count;
+  server->requested_calls = NULL;
+  server->requested_call_count = 0;
 
   server->shutdown = 1;
   server->have_shutdown_tag = have_shutdown_tag;
@@ -579,8 +615,12 @@
   gpr_free(channels);
 
   /* terminate all the requested calls */
-  early_terminate_requested_calls(server->cq, tags, ntags);
-  gpr_free(tags);
+  for (i = 0; i < requested_call_count; i++) {
+    requested_calls[i].cb(server, requested_calls[i].cq,
+                          requested_calls[i].initial_metadata, NULL,
+                          requested_calls[i].user_data);
+  }
+  gpr_free(requested_calls);
 
   /* Shutdown listeners */
   for (l = server->listeners; l; l = l->next) {
@@ -625,36 +665,105 @@
   server->listeners = l;
 }
 
-grpc_call_error grpc_server_request_call_old(grpc_server *server,
-                                             void *tag_new) {
+static grpc_call_error queue_call_request(grpc_server *server,
+                                          grpc_completion_queue *cq,
+                                          grpc_metadata_array *initial_metadata,
+                                          new_call_cb cb, void *user_data) {
   call_data *calld;
-
-  grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
-
+  requested_call *rc;
   gpr_mu_lock(&server->mu);
-
   if (server->shutdown) {
     gpr_mu_unlock(&server->mu);
-    early_terminate_requested_calls(server->cq, &tag_new, 1);
+    cb(server, cq, initial_metadata, NULL, user_data);
     return GRPC_CALL_OK;
   }
-
   calld = call_list_remove_head(server, PENDING_START);
   if (calld) {
     GPR_ASSERT(calld->state == PENDING);
     calld->state = ACTIVATED;
-    queue_new_rpc(server, calld, tag_new);
+    gpr_mu_unlock(&server->mu);
+    cb(server, cq, initial_metadata, calld, user_data);
+    return GRPC_CALL_OK;
   } else {
-    if (server->tag_cap == server->ntags) {
-      server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1);
-      server->tags =
-          gpr_realloc(server->tags, sizeof(void *) * server->tag_cap);
+    if (server->requested_call_count == server->requested_call_capacity) {
+      server->requested_call_capacity =
+          GPR_MAX(server->requested_call_capacity + 8,
+                  server->requested_call_capacity * 2);
+      server->requested_calls =
+          gpr_realloc(server->requested_calls,
+                      sizeof(requested_call) * server->requested_call_capacity);
     }
-    server->tags[server->ntags++] = tag_new;
+    rc = &server->requested_calls[server->requested_call_count++];
+    rc->cb = cb;
+    rc->cq = cq;
+    rc->user_data = user_data;
+    rc->initial_metadata = initial_metadata;
+    gpr_mu_unlock(&server->mu);
+    return GRPC_CALL_OK;
   }
-  gpr_mu_unlock(&server->mu);
+}
 
-  return GRPC_CALL_OK;
+static void begin_request(grpc_server *server, grpc_completion_queue *cq,
+                          grpc_metadata_array *initial_metadata,
+                          call_data *call_data, void *tag) {
+  abort();
+}
+
+grpc_call_error grpc_server_request_call(
+    grpc_server *server, grpc_call_details *details,
+    grpc_metadata_array *initial_metadata, grpc_completion_queue *cq,
+    void *tag) {
+  grpc_cq_begin_op(cq, NULL, GRPC_IOREQ);
+  return queue_call_request(server, cq, initial_metadata, begin_request, tag);
+}
+
+static void publish_legacy_request(grpc_call *call, grpc_op_error status,
+                                   void *tag) {
+  grpc_call_element *elem =
+      grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+  call_data *calld = elem->call_data;
+  channel_data *chand = elem->channel_data;
+  grpc_server *server = chand->server;
+
+  if (status == GRPC_OP_OK) {
+    grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
+                        grpc_mdstr_as_c_string(calld->path),
+                        grpc_mdstr_as_c_string(calld->host), calld->deadline,
+                        calld->legacy->initial_metadata->count,
+                        calld->legacy->initial_metadata->metadata);
+  } else {
+    abort();
+  }
+}
+
+static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
+                                 grpc_metadata_array *initial_metadata,
+                                 call_data *calld, void *tag) {
+  grpc_ioreq req;
+  if (!calld) {
+    gpr_free(initial_metadata);
+    grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
+                        gpr_inf_past, 0, NULL);
+    return;
+  }
+  req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+  req.data.recv_metadata = initial_metadata;
+  calld->legacy = gpr_malloc(sizeof(legacy_data));
+  memset(calld->legacy, 0, sizeof(legacy_data));
+  calld->legacy->initial_metadata = initial_metadata;
+  grpc_call_internal_ref(calld->call);
+  grpc_call_start_ioreq_and_call_back(calld->call, &req, 1,
+                                      publish_legacy_request, tag);
+}
+
+grpc_call_error grpc_server_request_call_old(grpc_server *server,
+                                             void *tag_new) {
+  grpc_metadata_array *client_metadata =
+      gpr_malloc(sizeof(grpc_metadata_array));
+  memset(client_metadata, 0, sizeof(*client_metadata));
+  grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
+  return queue_call_request(server, server->cq, client_metadata,
+                            begin_legacy_request, tag_new);
 }
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index c4e3ca5..2af18c3 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -432,7 +432,7 @@
 
 static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
                          framer_state *st) {
-  char timeout_str[32];
+  char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
   grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
   hpack_enc(c, grpc_mdelem_from_metadata_strings(
                    c->mdctx, grpc_mdstr_ref(c->timeout_key_str),
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 48a1005..f560417 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -957,7 +957,7 @@
       stream_list_join(t, s, WRITABLE);
     }
   } else {
-    grpc_stream_ops_unref_owned_objects(ops, ops_count);
+    grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
   }
   if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
     stream_list_join(t, s, PENDING_CALLBACKS);
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 5822e30..3f39364 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -102,6 +102,7 @@
   grpc_call *call = grpc_channel_create_call_old(
       c_channel_, method.name(), target_.c_str(), context->RawDeadline());
   context->set_call(call);
+
   grpc_event *ev;
   void *finished_tag = reinterpret_cast<char *>(call);
   void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
diff --git a/templates/Makefile.template b/templates/Makefile.template
index 142d188..c34949c 100644
--- a/templates/Makefile.template
+++ b/templates/Makefile.template
@@ -206,11 +206,13 @@
 ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
 PERFTOOLS_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/perftools.c -lprofiler $(LDFLAGS)
 
+ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
 HAS_SYSTEM_PERFTOOLS = $(shell $(PERFTOOLS_CHECK_CMD) 2> /dev/null && echo true || echo false)
 ifeq ($(HAS_SYSTEM_PERFTOOLS),true)
 DEFINES += GRPC_HAVE_PERFTOOLS
 LIBS += profiler
 endif
+endif
 
 ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
 HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index 287f83e..904ed77 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -70,6 +70,7 @@
   union {
     grpc_op_error finish_accepted;
     grpc_op_error write_accepted;
+    grpc_op_error ioreq;
     struct {
       const char *method;
       const char *host;
@@ -180,9 +181,6 @@
     case GRPC_WRITE_ACCEPTED:
       GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted);
       break;
-    case GRPC_INVOKE_ACCEPTED:
-      abort();
-      break;
     case GRPC_SERVER_RPC_NEW:
       GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method,
                                    ev->data.server_rpc_new.method));
@@ -222,6 +220,9 @@
         GPR_ASSERT(ev->data.read == NULL);
       }
       break;
+    case GRPC_IOREQ:
+      GPR_ASSERT(e->data.ioreq == ev->data.ioreq);
+      break;
     case GRPC_SERVER_SHUTDOWN:
       break;
     case GRPC_COMPLETION_DO_NOT_USE:
@@ -242,7 +243,9 @@
       gpr_asprintf(&tmp, "%c%s:%s", i ? ',' : '{', md->keys[i], md->values[i]);
       gpr_strvec_add(buf, tmp);
     }
-    gpr_strvec_add(buf, gpr_strdup("}"));
+    if (md->count) {
+      gpr_strvec_add(buf, gpr_strdup("}"));
+    }
   }
 }
 
@@ -261,8 +264,9 @@
                      e->data.write_accepted);
       gpr_strvec_add(buf, tmp);
       break;
-    case GRPC_INVOKE_ACCEPTED:
-      gpr_strvec_add(buf, gpr_strdup("GRPC_INVOKE_ACCEPTED"));
+    case GRPC_IOREQ:
+      gpr_asprintf(&tmp, "GRPC_IOREQ result=%d", e->data.ioreq);
+      gpr_strvec_add(buf, tmp);
       break;
     case GRPC_SERVER_RPC_NEW:
       timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now());
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index eeb454c..9d893f6 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -142,7 +142,6 @@
     cq_verify(v_client);
 
     cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
-    cq_verify(v_server);
     cq_expect_finished(v_server, tag(102), NULL);
     cq_verify(v_server);
 
diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c
index 1edb5b0..4cbaa65 100644
--- a/test/core/end2end/tests/census_simple_request.c
+++ b/test/core/end2end/tests/census_simple_request.c
@@ -135,7 +135,6 @@
   cq_verify(v_client);
 
   cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
-  cq_verify(v_server);
   cq_expect_finished(v_server, tag(102), NULL);
   cq_verify(v_server);
   grpc_call_destroy(c);
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index 0e26577..4830b85 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -138,7 +138,6 @@
   cq_verify(v_client);
 
   cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
-  cq_verify(v_server);
   cq_expect_finished(v_server, tag(102), NULL);
   cq_verify(v_server);
 
@@ -207,7 +206,7 @@
   /* The /alpha or /beta calls started above could be invoked (but NOT both);
    * check this here */
   /* We'll get tag 303 or 403, we want 300, 400 */
-  live_call = ((int)(gpr_intptr)ev->tag) - 3;
+  live_call = ((int)(gpr_intptr) ev->tag) - 3;
   grpc_event_finish(ev);
 
   cq_expect_server_rpc_new(v_server, &s1, tag(100),
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 192d1ab..db0d6d8 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -139,7 +139,6 @@
   cq_verify(v_client);
 
   cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
-  cq_verify(v_server);
   cq_expect_finished(v_server, tag(102), NULL);
   cq_verify(v_server);
 
@@ -180,16 +179,14 @@
 
   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write_status_old(
                                  s, GRPC_STATUS_UNIMPLEMENTED, "xyz", tag(5)));
-  cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
   cq_verify(v_server);
 
   cq_expect_client_metadata_read(v_client, tag(2), NULL);
-  cq_verify(v_client);
-
   cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_UNIMPLEMENTED,
                                  "xyz", NULL);
   cq_verify(v_client);
 
+  cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
   cq_expect_finished(v_server, tag(102), NULL);
   cq_verify(v_server);
 
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index cd2efc3..a91dfba 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -65,6 +65,7 @@
   grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
   grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
   grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+  grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
   grpc_call_destroy(call);
   call = NULL;
 }
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index dc459d6..875cf3e 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -105,32 +105,6 @@
   shutdown_and_destroy(cc);
 }
 
-static void test_cq_end_invoke_accepted(void) {
-  grpc_event *ev;
-  grpc_completion_queue *cc;
-  int on_finish_called = 0;
-  void *tag = create_test_tag();
-
-  LOG_TEST();
-
-  cc = grpc_completion_queue_create();
-
-  grpc_cq_begin_op(cc, NULL, GRPC_INVOKE_ACCEPTED);
-  grpc_cq_end_invoke_accepted(cc, tag, NULL, increment_int_on_finish,
-                              &on_finish_called, GRPC_OP_OK);
-
-  ev = grpc_completion_queue_next(cc, gpr_inf_past);
-  GPR_ASSERT(ev != NULL);
-  GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED);
-  GPR_ASSERT(ev->tag == tag);
-  GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
-  GPR_ASSERT(on_finish_called == 0);
-  grpc_event_finish(ev);
-  GPR_ASSERT(on_finish_called == 1);
-
-  shutdown_and_destroy(cc);
-}
-
 static void test_cq_end_write_accepted(void) {
   grpc_event *ev;
   grpc_completion_queue *cc;
@@ -421,7 +395,6 @@
   test_no_op();
   test_wait_empty();
   test_cq_end_read();
-  test_cq_end_invoke_accepted();
   test_cq_end_write_accepted();
   test_cq_end_finish_accepted();
   test_cq_end_client_metadata_read();
diff --git a/tools/dockerfile/grpc_node_base/Dockerfile b/tools/dockerfile/grpc_node_base/Dockerfile
index 4ca0e53..28bd7b2 100644
--- a/tools/dockerfile/grpc_node_base/Dockerfile
+++ b/tools/dockerfile/grpc_node_base/Dockerfile
@@ -15,5 +15,8 @@
   git pull --recurse-submodules && \
   git submodule update --init --recursive
 
+# Build the C core
+RUN make static_c shared_c -j12 -C /var/local/git/grpc
+
 # Define the default command.
 CMD ["bash"]
\ No newline at end of file
diff --git a/tools/dockerfile/grpc_php_base/Dockerfile b/tools/dockerfile/grpc_php_base/Dockerfile
index e0e86ea..ef58f3a 100644
--- a/tools/dockerfile/grpc_php_base/Dockerfile
+++ b/tools/dockerfile/grpc_php_base/Dockerfile
@@ -84,5 +84,8 @@
   && chmod +x phpunit.phar \
   && mv phpunit.phar /usr/local/bin/phpunit
 
+# Build the C core
+RUN make static_c shared_c -j12 -C /var/local/git/grpc
+
 # Define the default command.
 CMD ["bash"]
diff --git a/tools/dockerfile/grpc_ruby_base/Dockerfile b/tools/dockerfile/grpc_ruby_base/Dockerfile
index 787f129..ec4544d 100644
--- a/tools/dockerfile/grpc_ruby_base/Dockerfile
+++ b/tools/dockerfile/grpc_ruby_base/Dockerfile
@@ -53,3 +53,6 @@
   ./autogen.sh && \
   ./configure --prefix=/usr && \
   make -j12 && make check && make install && make clean
+
+# Build the C core
+RUN make static_c shared_c -j12 -C /var/local/git/grpc
diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj
index 21a1f06..8c12b2d 100644
--- a/vsprojects/vs2013/grpc.vcxproj
+++ b/vsprojects/vs2013/grpc.vcxproj
@@ -146,6 +146,7 @@
     <ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
     <ClInclude Include="..\..\src\core\statistics\hash_table.h" />
     <ClInclude Include="..\..\src\core\statistics\window_stats.h" />
+    <ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
     <ClInclude Include="..\..\src\core\surface\call.h" />
     <ClInclude Include="..\..\src\core\surface\channel.h" />
     <ClInclude Include="..\..\src\core\surface\client.h" />
@@ -312,6 +313,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\call.c">
diff --git a/vsprojects/vs2013/grpc.vcxproj.filters b/vsprojects/vs2013/grpc.vcxproj.filters
index 3af681a..62f0b60 100644
--- a/vsprojects/vs2013/grpc.vcxproj.filters
+++ b/vsprojects/vs2013/grpc.vcxproj.filters
@@ -202,6 +202,9 @@
     <ClCompile Include="..\..\src\core\surface\byte_buffer.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
@@ -521,6 +524,9 @@
     <ClInclude Include="..\..\src\core\statistics\window_stats.h">
       <Filter>src\core\statistics</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h">
+      <Filter>src\core\surface</Filter>
+    </ClInclude>
     <ClInclude Include="..\..\src\core\surface\call.h">
       <Filter>src\core\surface</Filter>
     </ClInclude>
diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj
index 21a1f06..8c12b2d 100644
--- a/vsprojects/vs2013/grpc_unsecure.vcxproj
+++ b/vsprojects/vs2013/grpc_unsecure.vcxproj
@@ -146,6 +146,7 @@
     <ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
     <ClInclude Include="..\..\src\core\statistics\hash_table.h" />
     <ClInclude Include="..\..\src\core\statistics\window_stats.h" />
+    <ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
     <ClInclude Include="..\..\src\core\surface\call.h" />
     <ClInclude Include="..\..\src\core\surface\channel.h" />
     <ClInclude Include="..\..\src\core\surface\client.h" />
@@ -312,6 +313,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\call.c">
diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters
index 4dadb61..5ed5e9b 100644
--- a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters
@@ -163,6 +163,9 @@
     <ClCompile Include="..\..\src\core\surface\byte_buffer.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
@@ -446,6 +449,9 @@
     <ClInclude Include="..\..\src\core\statistics\window_stats.h">
       <Filter>src\core\statistics</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h">
+      <Filter>src\core\surface</Filter>
+    </ClInclude>
     <ClInclude Include="..\..\src\core\surface\call.h">
       <Filter>src\core\surface</Filter>
     </ClInclude>