Merge github.com:grpc/grpc into one-pass

Conflicts:
	Makefile
diff --git a/BUILD b/BUILD
index e862898..e0b14b5 100644
--- a/BUILD
+++ b/BUILD
@@ -147,7 +147,6 @@
     "src/core/channel/client_setup.h",
     "src/core/channel/connected_channel.h",
     "src/core/channel/http_client_filter.h",
-    "src/core/channel/http_filter.h",
     "src/core/channel/http_server_filter.h",
     "src/core/channel/noop_filter.h",
     "src/core/compression/algorithm.h",
@@ -247,7 +246,6 @@
     "src/core/tsi/fake_transport_security.c",
     "src/core/tsi/ssl_transport_security.c",
     "src/core/tsi/transport_security.c",
-    "src/core/channel/call_op_string.c",
     "src/core/channel/census_filter.c",
     "src/core/channel/channel_args.c",
     "src/core/channel/channel_stack.c",
@@ -256,7 +254,6 @@
     "src/core/channel/client_setup.c",
     "src/core/channel/connected_channel.c",
     "src/core/channel/http_client_filter.c",
-    "src/core/channel/http_filter.c",
     "src/core/channel/http_server_filter.c",
     "src/core/channel/noop_filter.c",
     "src/core/compression/algorithm.c",
@@ -344,6 +341,7 @@
     "src/core/transport/metadata.c",
     "src/core/transport/stream_op.c",
     "src/core/transport/transport.c",
+    "src/core/transport/transport_op_string.c",
   ],
   hdrs = [
     "include/grpc/grpc_security.h",
@@ -375,7 +373,6 @@
     "src/core/channel/client_setup.h",
     "src/core/channel/connected_channel.h",
     "src/core/channel/http_client_filter.h",
-    "src/core/channel/http_filter.h",
     "src/core/channel/http_server_filter.h",
     "src/core/channel/noop_filter.h",
     "src/core/compression/algorithm.h",
@@ -456,7 +453,6 @@
     "src/core/transport/transport.h",
     "src/core/transport/transport_impl.h",
     "src/core/surface/init_unsecure.c",
-    "src/core/channel/call_op_string.c",
     "src/core/channel/census_filter.c",
     "src/core/channel/channel_args.c",
     "src/core/channel/channel_stack.c",
@@ -465,7 +461,6 @@
     "src/core/channel/client_setup.c",
     "src/core/channel/connected_channel.c",
     "src/core/channel/http_client_filter.c",
-    "src/core/channel/http_filter.c",
     "src/core/channel/http_server_filter.c",
     "src/core/channel/noop_filter.c",
     "src/core/compression/algorithm.c",
@@ -553,6 +548,7 @@
     "src/core/transport/metadata.c",
     "src/core/transport/stream_op.c",
     "src/core/transport/transport.c",
+    "src/core/transport/transport_op_string.c",
   ],
   hdrs = [
     "include/grpc/byte_buffer.h",
diff --git a/Makefile b/Makefile
index 2def6e2..7813d94 100644
--- a/Makefile
+++ b/Makefile
@@ -3146,7 +3146,6 @@
     src/core/tsi/fake_transport_security.c \
     src/core/tsi/ssl_transport_security.c \
     src/core/tsi/transport_security.c \
-    src/core/channel/call_op_string.c \
     src/core/channel/census_filter.c \
     src/core/channel/channel_args.c \
     src/core/channel/channel_stack.c \
@@ -3155,7 +3154,6 @@
     src/core/channel/client_setup.c \
     src/core/channel/connected_channel.c \
     src/core/channel/http_client_filter.c \
-    src/core/channel/http_filter.c \
     src/core/channel/http_server_filter.c \
     src/core/channel/noop_filter.c \
     src/core/compression/algorithm.c \
@@ -3243,6 +3241,7 @@
     src/core/transport/metadata.c \
     src/core/transport/stream_op.c \
     src/core/transport/transport.c \
+    src/core/transport/transport_op_string.c \
 
 PUBLIC_HEADERS_C += \
     include/grpc/grpc_security.h \
@@ -3395,7 +3394,6 @@
 
 LIBGRPC_UNSECURE_SRC = \
     src/core/surface/init_unsecure.c \
-    src/core/channel/call_op_string.c \
     src/core/channel/census_filter.c \
     src/core/channel/channel_args.c \
     src/core/channel/channel_stack.c \
@@ -3404,7 +3402,6 @@
     src/core/channel/client_setup.c \
     src/core/channel/connected_channel.c \
     src/core/channel/http_client_filter.c \
-    src/core/channel/http_filter.c \
     src/core/channel/http_server_filter.c \
     src/core/channel/noop_filter.c \
     src/core/compression/algorithm.c \
@@ -3492,6 +3489,7 @@
     src/core/transport/metadata.c \
     src/core/transport/stream_op.c \
     src/core/transport/transport.c \
+    src/core/transport/transport_op_string.c \
 
 PUBLIC_HEADERS_C += \
     include/grpc/byte_buffer.h \
diff --git a/build.json b/build.json
index e072ed6..5a33a65 100644
--- a/build.json
+++ b/build.json
@@ -100,7 +100,6 @@
         "src/core/channel/client_setup.h",
         "src/core/channel/connected_channel.h",
         "src/core/channel/http_client_filter.h",
-        "src/core/channel/http_filter.h",
         "src/core/channel/http_server_filter.h",
         "src/core/channel/noop_filter.h",
         "src/core/compression/algorithm.h",
@@ -182,7 +181,6 @@
         "src/core/transport/transport_impl.h"
       ],
       "src": [
-        "src/core/channel/call_op_string.c",
         "src/core/channel/census_filter.c",
         "src/core/channel/channel_args.c",
         "src/core/channel/channel_stack.c",
@@ -191,7 +189,6 @@
         "src/core/channel/client_setup.c",
         "src/core/channel/connected_channel.c",
         "src/core/channel/http_client_filter.c",
-        "src/core/channel/http_filter.c",
         "src/core/channel/http_server_filter.c",
         "src/core/channel/noop_filter.c",
         "src/core/compression/algorithm.c",
@@ -278,7 +275,8 @@
         "src/core/transport/chttp2_transport.c",
         "src/core/transport/metadata.c",
         "src/core/transport/stream_op.c",
-        "src/core/transport/transport.c"
+        "src/core/transport/transport.c",
+        "src/core/transport/transport_op_string.c"
       ]
     },
     {
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 9c0c20a..7e393a0 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@
   census_op_id op_id;
   census_rpc_stats stats;
   gpr_timespec start_ts;
+
+  /* recv callback */
+  grpc_stream_op_buffer* recv_ops;
+  void (*on_done_recv)(void* user_data, int success);
+  void* recv_user_data;
 } call_data;
 
 typedef struct channel_data {
@@ -60,57 +65,68 @@
   stats->cnt = 1;
 }
 
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
+                                            call_data* calld,
                                             channel_data* chand) {
   grpc_linked_mdelem* m;
-  for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
-    if (m->md->key == chand->path_str) {
-      gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
-      census_add_method_tag(
-          calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+  size_t i;
+  for (i = 0; i < sopb->nops; i++) {
+    grpc_stream_op* op = &sopb->ops[i];
+    if (op->type != GRPC_OP_METADATA) continue;
+    for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+      if (m->md->key == chand->path_str) {
+        gpr_log(GPR_DEBUG, "%s",
+                (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+        census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
+                                                m->md->value->slice));
+      }
     }
   }
 }
 
-static void client_call_op(grpc_call_element* elem,
-                           grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
   call_data* calld = elem->call_data;
   channel_data* chand = elem->channel_data;
-  GPR_ASSERT(calld != NULL);
-  GPR_ASSERT(chand != NULL);
-  GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
-      extract_and_annotate_method_tag(op, calld, chand);
-      break;
-    case GRPC_RECV_FINISH:
-      /* Should we stop timing the rpc here? */
-      break;
-    default:
-      break;
+  if (op->send_ops) {
+    extract_and_annotate_method_tag(op->send_ops, calld, chand);
   }
-  /* Always pass control up or down the stack depending on op->dir */
+}
+
+static void client_start_transport_op(grpc_call_element* elem,
+                                      grpc_transport_op* op) {
+  call_data* calld = elem->call_data;
+  GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+  client_mutate_op(elem, op);
   grpc_call_next_op(elem, op);
 }
 
-static void server_call_op(grpc_call_element* elem,
-                           grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void* ptr, int success) {
+  grpc_call_element* elem = ptr;
   call_data* calld = elem->call_data;
   channel_data* chand = elem->channel_data;
-  GPR_ASSERT(calld != NULL);
-  GPR_ASSERT(chand != NULL);
-  GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
-  switch (op->type) {
-    case GRPC_RECV_METADATA:
-      extract_and_annotate_method_tag(op, calld, chand);
-      break;
-    case GRPC_SEND_FINISH:
-      /* Should we stop timing the rpc here? */
-      break;
-    default:
-      break;
+  if (success) {
+    extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
   }
-  /* Always pass control up or down the stack depending on op->dir */
+  calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+  call_data* calld = elem->call_data;
+  if (op->recv_ops) {
+    /* substitute our callback for the op callback */
+    calld->recv_ops = op->recv_ops;
+    calld->on_done_recv = op->on_done_recv;
+    calld->recv_user_data = op->recv_user_data;
+    op->on_done_recv = server_on_done_recv;
+    op->recv_user_data = elem;
+  }
+}
+
+static void server_start_transport_op(grpc_call_element* elem,
+                                      grpc_transport_op* op) {
+  call_data* calld = elem->call_data;
+  GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+  server_mutate_op(elem, op);
   grpc_call_next_op(elem, op);
 }
 
@@ -128,12 +144,14 @@
 }
 
 static void client_init_call_elem(grpc_call_element* elem,
-                                  const void* server_transport_data) {
+                                  const void* server_transport_data,
+                                  grpc_transport_op* initial_op) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
   init_rpc_stats(&d->stats);
   d->start_ts = gpr_now();
   d->op_id = census_tracing_start_op();
+  if (initial_op) client_mutate_op(elem, initial_op);
 }
 
 static void client_destroy_call_elem(grpc_call_element* elem) {
@@ -144,12 +162,14 @@
 }
 
 static void server_init_call_elem(grpc_call_element* elem,
-                                  const void* server_transport_data) {
+                                  const void* server_transport_data,
+                                  grpc_transport_op* initial_op) {
   call_data* d = elem->call_data;
   GPR_ASSERT(d != NULL);
   init_rpc_stats(&d->stats);
   d->start_ts = gpr_now();
   d->op_id = census_tracing_start_op();
+  if (initial_op) server_mutate_op(elem, initial_op);
 }
 
 static void server_destroy_call_elem(grpc_call_element* elem) {
@@ -180,11 +200,11 @@
 }
 
 const grpc_channel_filter grpc_client_census_filter = {
-    client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
-    client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
-    destroy_channel_elem, "census-client"};
+    client_start_transport_op, channel_op, sizeof(call_data),
+    client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
+    init_channel_elem, destroy_channel_elem, "census-client"};
 
 const grpc_channel_filter grpc_server_census_filter = {
-    server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
-    server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
-    destroy_channel_elem, "census-server"};
+    server_start_transport_op, channel_op, sizeof(call_data),
+    server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
+    init_channel_elem, destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3a3a3a7..311f4f0 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
 #include <grpc/support/log.h>
 
 #include <stdlib.h>
+#include <string.h>
 
 int grpc_trace_channel = 0;
 
@@ -147,6 +148,7 @@
 
 void grpc_call_stack_init(grpc_channel_stack *channel_stack,
                           const void *transport_server_data,
+                          grpc_transport_op *initial_op,
                           grpc_call_stack *call_stack) {
   grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
   size_t count = channel_stack->count;
@@ -164,7 +166,8 @@
     call_elems[i].filter = channel_elems[i].filter;
     call_elems[i].channel_data = channel_elems[i].channel_data;
     call_elems[i].call_data = user_data;
-    call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
+    call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
+                                         initial_op);
     user_data +=
         ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
   }
@@ -181,12 +184,9 @@
   }
 }
 
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
-  grpc_call_element *next_elem = elem + op->dir;
-  if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
-    grpc_metadata_batch_assert_ok(&op->data.metadata);
-  }
-  next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+  grpc_call_element *next_elem = elem + 1;
+  next_elem->filter->start_transport_op(next_elem, op);
 }
 
 void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -205,39 +205,15 @@
       sizeof(grpc_call_stack)));
 }
 
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
 void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
-  grpc_call_op cancel_op;
-  cancel_op.type = GRPC_CANCEL_OP;
-  cancel_op.dir = GRPC_CALL_DOWN;
-  cancel_op.done_cb = do_nothing;
-  cancel_op.user_data = NULL;
-  cancel_op.flags = 0;
-  cancel_op.bind_pollset = NULL;
-  grpc_call_next_op(cur_elem, &cancel_op);
-}
-
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
-  grpc_call_op finish_op;
-  finish_op.type = GRPC_SEND_FINISH;
-  finish_op.dir = GRPC_CALL_DOWN;
-  finish_op.done_cb = do_nothing;
-  finish_op.user_data = NULL;
-  finish_op.flags = 0;
-  finish_op.bind_pollset = NULL;
-  grpc_call_next_op(cur_elem, &finish_op);
+  grpc_transport_op op;
+  memset(&op, 0, sizeof(op));
+  op.cancel_with_status = GRPC_STATUS_CANCELLED;
+  grpc_call_next_op(cur_elem, &op);
 }
 
 void grpc_call_element_recv_status(grpc_call_element *cur_elem,
                                    grpc_status_code status,
                                    const char *message) {
-  grpc_call_op op;
-  op.type = GRPC_RECV_SYNTHETIC_STATUS;
-  op.dir = GRPC_CALL_UP;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-  op.data.synthetic_status.status = status;
-  op.data.synthetic_status.message = message;
-  grpc_call_next_op(cur_elem, &op);
+  abort();
 }
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index addc92b..de0e4e4 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,78 +51,11 @@
 typedef struct grpc_channel_element grpc_channel_element;
 typedef struct grpc_call_element grpc_call_element;
 
-/* Call operations - things that can be sent and received.
-
-   Threading:
-     SEND, RECV, and CANCEL ops can be active on a call at the same time, but
-     only one SEND, one RECV, and one CANCEL can be active at a time.
-
-   If state is shared between send/receive/cancel operations, it is up to
-   filters to provide their own protection around that. */
-typedef enum {
-  /* send metadata to the channels peer */
-  GRPC_SEND_METADATA,
-  /* send a message to the channels peer */
-  GRPC_SEND_MESSAGE,
-  /* send a pre-formatted message to the channels peer */
-  GRPC_SEND_PREFORMATTED_MESSAGE,
-  /* send half-close to the channels peer */
-  GRPC_SEND_FINISH,
-  /* request that more data be allowed through flow control */
-  GRPC_REQUEST_DATA,
-  /* metadata was received from the channels peer */
-  GRPC_RECV_METADATA,
-  /* a message was received from the channels peer */
-  GRPC_RECV_MESSAGE,
-  /* half-close was received from the channels peer */
-  GRPC_RECV_HALF_CLOSE,
-  /* full close was received from the channels peer */
-  GRPC_RECV_FINISH,
-  /* a status has been sythesized locally */
-  GRPC_RECV_SYNTHETIC_STATUS,
-  /* the call has been abnormally terminated */
-  GRPC_CANCEL_OP
-} grpc_call_op_type;
-
 /* The direction of the call.
    The values of the enums (1, -1) matter here - they are used to increment
    or decrement a pointer to find the next element to call */
 typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
 
-/* A single filterable operation to be performed on a call */
-typedef struct {
-  /* The type of operation we're performing */
-  grpc_call_op_type type;
-  /* The directionality of this call - does the operation begin at the bottom
-     of the stack and flow up, or does the operation start at the top of the
-     stack and flow down through the filters. */
-  grpc_call_dir dir;
-
-  /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
-  gpr_uint32 flags;
-
-  /* Argument data, matching up with grpc_call_op_type names */
-  union {
-    grpc_byte_buffer *message;
-    grpc_metadata_batch metadata;
-    struct {
-      grpc_status_code status;
-      const char *message;
-    } synthetic_status;
-  } data;
-
-  grpc_pollset *bind_pollset;
-
-  /* Must be called when processing of this call-op is complete.
-     Signature chosen to match transport flow control callbacks */
-  void (*done_cb)(void *user_data, grpc_op_error error);
-  /* User data to be passed into done_cb */
-  void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
 typedef enum {
   /* send a goaway message to remote channels indicating that we are going
      to disconnect in the future */
@@ -170,8 +103,7 @@
 typedef struct {
   /* Called to eg. send/receive data on a call.
      See grpc_call_next_op on how to call the next element in the stack */
-  void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
-                  grpc_call_op *op);
+  void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
   /* Called to handle channel level operations - e.g. new calls, or transport
      closure.
      See grpc_channel_next_op on how to call the next element in the stack */
@@ -189,7 +121,8 @@
      transport and is on the server. Most filters want to ignore this
      argument.*/
   void (*init_call_elem)(grpc_call_element *elem,
-                         const void *server_transport_data);
+                         const void *server_transport_data,
+                         grpc_transport_op *initial_op);
   /* Destroy per call data.
      The filter does not need to do any chaining */
   void (*destroy_call_elem)(grpc_call_element *elem);
@@ -268,12 +201,13 @@
    server. */
 void grpc_call_stack_init(grpc_channel_stack *channel_stack,
                           const void *transport_server_data,
+                          grpc_transport_op *initial_op,
                           grpc_call_stack *call_stack);
 /* Destroy a call stack */
 void grpc_call_stack_destroy(grpc_call_stack *stack);
 
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
 /* Call the next operation (depending on call directionality) in a channel
    stack */
 void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -285,13 +219,9 @@
 grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
 
 void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
-                      grpc_call_element *elem, grpc_call_op *op);
+                      grpc_call_element *elem, grpc_transport_op *op);
 
 void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
-void grpc_call_element_recv_status(grpc_call_element *cur_elem,
-                                   grpc_status_code status,
-                                   const char *message);
 
 extern int grpc_trace_channel;
 
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb0382..a2f3c54 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -60,23 +60,11 @@
   gpr_uint8 sent_farewell;
 } lb_channel_data;
 
-typedef struct {
-  grpc_call_element *back;
-  grpc_child_channel *channel;
-} lb_call_data;
+typedef struct { grpc_child_channel *channel; } lb_call_data;
 
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                       grpc_call_op *op) {
-  lb_call_data *calld = elem->call_data;
-
-  switch (op->dir) {
-    case GRPC_CALL_UP:
-      calld->back->filter->call_op(calld->back, elem, op);
-      break;
-    case GRPC_CALL_DOWN:
-      grpc_call_next_op(elem, op);
-      break;
-  }
+static void lb_start_transport_op(grpc_call_element *elem,
+                                  grpc_transport_op *op) {
+  grpc_call_next_op(elem, op);
 }
 
 /* Currently we assume all channel operations should just be pushed up. */
@@ -132,7 +120,8 @@
 
 /* Constructor for call_data */
 static void lb_init_call_elem(grpc_call_element *elem,
-                              const void *server_transport_data) {}
+                              const void *server_transport_data,
+                              grpc_transport_op *initial_op) {}
 
 /* Destructor for call_data */
 static void lb_destroy_call_elem(grpc_call_element *elem) {}
@@ -165,9 +154,10 @@
 }
 
 const grpc_channel_filter grpc_child_channel_top_filter = {
-    lb_call_op,           lb_channel_op,           sizeof(lb_call_data),
-    lb_init_call_elem,    lb_destroy_call_elem,    sizeof(lb_channel_data),
-    lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
+    lb_start_transport_op, lb_channel_op,           sizeof(lb_call_data),
+    lb_init_call_elem,     lb_destroy_call_elem,    sizeof(lb_channel_data),
+    lb_init_channel_elem,  lb_destroy_channel_elem, "child-channel",
+};
 
 /* grpc_child_channel proper */
 
@@ -272,17 +262,17 @@
 }
 
 grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
-                                                grpc_call_element *parent) {
+                                                grpc_call_element *parent,
+                                                grpc_transport_op *initial_op) {
   grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
   grpc_call_element *lbelem;
   lb_call_data *lbcalld;
   lb_channel_data *lbchand;
 
-  grpc_call_stack_init(channel, NULL, stk);
+  grpc_call_stack_init(channel, NULL, initial_op, stk);
   lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
   lbchand = lbelem->channel_data;
   lbcalld = lbelem->call_data;
-  lbcalld->back = parent;
   lbcalld->channel = channel;
 
   gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 3869540..556a1c7 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -57,8 +57,9 @@
                                 int wait_for_callbacks);
 
 grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
-                                                grpc_call_element *parent);
+                                                grpc_call_element *parent,
+                                                grpc_transport_op *initial_op);
 grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
 void grpc_child_call_destroy(grpc_child_call *call);
 
-#endif  /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e5..78f8d06 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -58,6 +58,7 @@
 
   /* the sending child (may be null) */
   grpc_child_channel *active_child;
+  grpc_mdctx *mdctx;
 
   /* calls waiting for a channel to be ready */
   call_data **waiting_children;
@@ -82,8 +83,6 @@
   /* owning element */
   grpc_call_element *elem;
 
-  gpr_uint8 got_first_send;
-
   call_state state;
   gpr_timespec deadline;
   union {
@@ -91,7 +90,11 @@
       /* our child call stack */
       grpc_child_call *child_call;
     } active;
-    grpc_call_op waiting_op;
+    grpc_transport_op waiting_op;
+    struct {
+      grpc_linked_mdelem status;
+      grpc_linked_mdelem details;
+    } cancelled;
   } s;
 };
 
@@ -105,14 +108,14 @@
   calld->state = CALL_ACTIVE;
 
   /* create a child call */
-  calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
+  /* TODO(ctiller): pass the waiting op down here */
+  calld->s.active.child_call =
+      grpc_child_channel_create_call(on_child, elem, NULL);
 
   return 1;
 }
 
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
   call_data *calld = elem->call_data;
   grpc_call_element *child_elem =
       grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,57 +124,7 @@
 
   /* continue the start call down the stack, this nees to happen after metadata
      are flushed*/
-  child_elem->filter->call_op(child_elem, elem, op);
-}
-
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  gpr_mu_lock(&chand->mu);
-  if (calld->state == CALL_CANCELLED) {
-    gpr_mu_unlock(&chand->mu);
-    grpc_metadata_batch_destroy(&op->data.metadata);
-    op->done_cb(op->user_data, GRPC_OP_ERROR);
-    return;
-  }
-  GPR_ASSERT(calld->state == CALL_CREATED);
-  calld->state = CALL_WAITING;
-  if (chand->active_child) {
-    /* channel is connected - use the connected stack */
-    if (prepare_activate(elem, chand->active_child)) {
-      gpr_mu_unlock(&chand->mu);
-      /* activate the request (pass it down) outside the lock */
-      complete_activate(elem, op);
-    } else {
-      gpr_mu_unlock(&chand->mu);
-    }
-  } else {
-    /* check to see if we should initiate a connection (if we're not already),
-       but don't do so until outside the lock to avoid re-entrancy problems if
-       the callback is immediate */
-    int initiate_transport_setup = 0;
-    if (!chand->transport_setup_initiated) {
-      chand->transport_setup_initiated = 1;
-      initiate_transport_setup = 1;
-    }
-    /* add this call to the waiting set to be resumed once we have a child
-       channel stack, growing the waiting set if needed */
-    if (chand->waiting_child_count == chand->waiting_child_capacity) {
-      chand->waiting_child_capacity =
-          GPR_MAX(chand->waiting_child_capacity * 2, 8);
-      chand->waiting_children =
-          gpr_realloc(chand->waiting_children,
-                      chand->waiting_child_capacity * sizeof(call_data *));
-    }
-    calld->s.waiting_op = *op;
-    chand->waiting_children[chand->waiting_child_count++] = calld;
-    gpr_mu_unlock(&chand->mu);
-
-    /* finally initiate transport setup if needed */
-    if (initiate_transport_setup) {
-      grpc_transport_setup_initiate(chand->transport_setup);
-    }
-  }
+  child_elem->filter->start_transport_op(child_elem, op);
 }
 
 static void remove_waiting_child(channel_data *chand, call_data *calld) {
@@ -186,85 +139,128 @@
   chand->waiting_child_count = new_count;
 }
 
-static void send_up_cancelled_ops(grpc_call_element *elem) {
-  grpc_call_op finish_op;
-  /* send up a synthesized status */
-  grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
-  /* send up a finish */
-  finish_op.type = GRPC_RECV_FINISH;
-  finish_op.dir = GRPC_CALL_UP;
-  finish_op.flags = 0;
-  finish_op.done_cb = do_nothing;
-  finish_op.user_data = NULL;
-  grpc_call_next_op(elem, &finish_op);
+static void handle_op_after_cancellation(grpc_call_element *elem,
+                                         grpc_transport_op *op) {
+  call_data *calld = elem->call_data;
+  channel_data *chand = elem->channel_data;
+  if (op->send_ops) {
+    op->on_done_send(op->send_user_data, 0);
+  }
+  if (op->recv_ops) {
+    char status[GPR_LTOA_MIN_BUFSIZE];
+    grpc_metadata_batch mdb;
+    gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+    calld->s.cancelled.status.md =
+        grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+    calld->s.cancelled.details.md =
+        grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+    calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+    calld->s.cancelled.status.next = &calld->s.cancelled.details;
+    calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+    mdb.list.head = &calld->s.cancelled.status;
+    mdb.list.tail = &calld->s.cancelled.details;
+    mdb.garbage.head = mdb.garbage.tail = NULL;
+    mdb.deadline = gpr_inf_future;
+    grpc_sopb_add_metadata(op->recv_ops, mdb);
+    *op->recv_state = GRPC_STREAM_CLOSED;
+    op->on_done_recv(op->recv_user_data, 1);
+  }
 }
 
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+                                  grpc_transport_op *op) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   grpc_call_element *child_elem;
+  grpc_transport_op waiting_op;
+  GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
   gpr_mu_lock(&chand->mu);
   switch (calld->state) {
     case CALL_ACTIVE:
       child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
       gpr_mu_unlock(&chand->mu);
-      child_elem->filter->call_op(child_elem, elem, op);
-      return; /* early out */
-    case CALL_WAITING:
-      grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
-      remove_waiting_child(chand, calld);
-      calld->state = CALL_CANCELLED;
-      gpr_mu_unlock(&chand->mu);
-      send_up_cancelled_ops(elem);
-      calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
-      return; /* early out */
+      child_elem->filter->start_transport_op(child_elem, op);
+      break;
     case CALL_CREATED:
-      calld->state = CALL_CANCELLED;
-      gpr_mu_unlock(&chand->mu);
-      send_up_cancelled_ops(elem);
-      return; /* early out */
+      if (op->cancel_with_status != GRPC_STATUS_OK) {
+        calld->state = CALL_CANCELLED;
+        gpr_mu_unlock(&chand->mu);
+        handle_op_after_cancellation(elem, op);
+      } else {
+        calld->state = CALL_WAITING;
+        if (chand->active_child) {
+          /* channel is connected - use the connected stack */
+          if (prepare_activate(elem, chand->active_child)) {
+            gpr_mu_unlock(&chand->mu);
+            /* activate the request (pass it down) outside the lock */
+            complete_activate(elem, op);
+          } else {
+            gpr_mu_unlock(&chand->mu);
+          }
+        } else {
+          /* check to see if we should initiate a connection (if we're not
+             already),
+             but don't do so until outside the lock to avoid re-entrancy
+             problems if
+             the callback is immediate */
+          int initiate_transport_setup = 0;
+          if (!chand->transport_setup_initiated) {
+            chand->transport_setup_initiated = 1;
+            initiate_transport_setup = 1;
+          }
+          /* add this call to the waiting set to be resumed once we have a child
+             channel stack, growing the waiting set if needed */
+          if (chand->waiting_child_count == chand->waiting_child_capacity) {
+            chand->waiting_child_capacity =
+                GPR_MAX(chand->waiting_child_capacity * 2, 8);
+            chand->waiting_children = gpr_realloc(
+                chand->waiting_children,
+                chand->waiting_child_capacity * sizeof(call_data *));
+          }
+          calld->s.waiting_op = *op;
+          chand->waiting_children[chand->waiting_child_count++] = calld;
+          gpr_mu_unlock(&chand->mu);
+
+          /* finally initiate transport setup if needed */
+          if (initiate_transport_setup) {
+            grpc_transport_setup_initiate(chand->transport_setup);
+          }
+        }
+      }
+      break;
+    case CALL_WAITING:
+      if (op->cancel_with_status != GRPC_STATUS_OK) {
+        waiting_op = calld->s.waiting_op;
+        remove_waiting_child(chand, calld);
+        calld->state = CALL_CANCELLED;
+        gpr_mu_unlock(&chand->mu);
+        handle_op_after_cancellation(elem, &waiting_op);
+        handle_op_after_cancellation(elem, op);
+      } else {
+        GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
+                   (op->send_ops == NULL));
+        GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
+                   (op->recv_ops == NULL));
+        if (op->send_ops) {
+          calld->s.waiting_op.send_ops = op->send_ops;
+          calld->s.waiting_op.is_last_send = op->is_last_send;
+          calld->s.waiting_op.on_done_send = op->on_done_send;
+          calld->s.waiting_op.send_user_data = op->send_user_data;
+        }
+        if (op->recv_ops) {
+          calld->s.waiting_op.recv_ops = op->recv_ops;
+          calld->s.waiting_op.recv_state = op->recv_state;
+          calld->s.waiting_op.on_done_recv = op->on_done_recv;
+          calld->s.waiting_op.recv_user_data = op->recv_user_data;
+        }
+        gpr_mu_unlock(&chand->mu);
+      }
+      break;
     case CALL_CANCELLED:
       gpr_mu_unlock(&chand->mu);
-      return; /* early out */
-  }
-  gpr_log(GPR_ERROR, "should never reach here");
-  abort();
-}
-
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
-  call_data *calld = elem->call_data;
-  GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
-      if (!calld->got_first_send) {
-        /* filter out the start event to find which child to send on */
-        calld->got_first_send = 1;
-        start_rpc(elem, op);
-      } else {
-        grpc_call_next_op(elem, op);
-      }
-      break;
-    case GRPC_CANCEL_OP:
-      cancel_rpc(elem, op);
-      break;
-    case GRPC_SEND_MESSAGE:
-    case GRPC_SEND_FINISH:
-    case GRPC_REQUEST_DATA:
-      if (calld->state == CALL_ACTIVE) {
-        grpc_call_element *child_elem =
-            grpc_child_call_get_top_element(calld->s.active.child_call);
-        child_elem->filter->call_op(child_elem, elem, op);
-      } else {
-        op->done_cb(op->user_data, GRPC_OP_ERROR);
-      }
-      break;
-    default:
-      GPR_ASSERT(op->dir == GRPC_CALL_UP);
-      grpc_call_next_op(elem, op);
+      handle_op_after_cancellation(elem, op);
       break;
   }
 }
@@ -351,15 +347,18 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   call_data *calld = elem->call_data;
 
+  /* TODO(ctiller): is there something useful we can do here? */
+  GPR_ASSERT(initial_op == NULL);
+
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
   GPR_ASSERT(server_transport_data == NULL);
   calld->elem = elem;
   calld->state = CALL_CREATED;
   calld->deadline = gpr_inf_future;
-  calld->got_first_send = 0;
 }
 
 /* Destructor for call_data */
@@ -372,9 +371,7 @@
   if (calld->state == CALL_ACTIVE) {
     grpc_child_call_destroy(calld->s.active.child_call);
   }
-  if (calld->state == CALL_WAITING) {
-    grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
-  }
+  GPR_ASSERT(calld->state != CALL_WAITING);
 }
 
 /* Constructor for channel_data */
@@ -396,6 +393,7 @@
   chand->transport_setup = NULL;
   chand->transport_setup_initiated = 0;
   chand->args = grpc_channel_args_copy(args);
+  chand->mdctx = metadata_context;
 }
 
 /* Destructor for channel_data */
@@ -417,9 +415,9 @@
 }
 
 const grpc_channel_filter grpc_client_channel_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem,
-    "client-channel",
+    cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "client-channel",
 };
 
 grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -436,7 +434,7 @@
   call_data **waiting_children;
   size_t waiting_child_count;
   size_t i;
-  grpc_call_op *call_ops;
+  grpc_transport_op *call_ops;
 
   /* build the child filter stack */
   child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +470,13 @@
   chand->waiting_child_count = 0;
   chand->waiting_child_capacity = 0;
 
-  call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+  call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
 
   for (i = 0; i < waiting_child_count; i++) {
     call_ops[i] = waiting_children[i]->s.waiting_op;
     if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
       waiting_children[i] = NULL;
-      call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+      grpc_transport_op_finish_with_failure(&call_ops[i]);
     }
   }
 
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274b..14dda88 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,25 +45,12 @@
 #include <grpc/support/slice_buffer.h>
 
 #define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
 
 typedef struct connected_channel_channel_data {
   grpc_transport *transport;
-  gpr_uint32 max_message_length;
 } channel_data;
 
-typedef struct connected_channel_call_data {
-  grpc_call_element *elem;
-  grpc_stream_op_buffer outgoing_sopb;
-
-  gpr_uint32 max_message_length;
-  gpr_uint32 incoming_message_length;
-  gpr_uint8 reading_message;
-  gpr_uint8 got_read_close;
-  gpr_slice_buffer incoming_message;
-  gpr_uint32 outgoing_buffer_length_estimate;
-} call_data;
+typedef struct connected_channel_call_data { void *unused; } call_data;
 
 /* We perform a small hack to locate transport data alongside the connected
    channel data in call allocations, to allow everything to be pulled in minimal
@@ -72,91 +59,17 @@
 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
   (((call_data *)(transport_stream)) - 1)
 
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
-                                           grpc_stream_op_buffer *sopb) {
-  size_t i;
-
-  switch (byte_buffer->type) {
-    case GRPC_BB_SLICE_BUFFER:
-      for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
-        gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
-        gpr_slice_ref(slice);
-        grpc_sopb_add_slice(sopb, slice);
-      }
-      break;
-  }
-}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
-                              call_data *calld, int is_last) {
-  size_t nops;
-
-  if (op->flags & GRPC_WRITE_BUFFER_HINT) {
-    if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
-      op->done_cb(op->user_data, GRPC_OP_OK);
-      return;
-    }
-  }
-
-  calld->outgoing_buffer_length_estimate = 0;
-  grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
-  nops = calld->outgoing_sopb.nops;
-  calld->outgoing_sopb.nops = 0;
-  grpc_transport_send_batch(chand->transport,
-                            TRANSPORT_STREAM_FROM_CALL_DATA(calld),
-                            calld->outgoing_sopb.ops, nops, is_last);
-}
-
 /* Intercept a call operation and either push it directly up or translate it
    into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem,
+                                   grpc_transport_op *op) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
-  if (op->bind_pollset) {
-    grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
-  }
-
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
-      grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
-      end_bufferable_op(op, chand, calld, 0);
-      break;
-    case GRPC_SEND_MESSAGE:
-      grpc_sopb_add_begin_message(&calld->outgoing_sopb,
-                                  grpc_byte_buffer_length(op->data.message),
-                                  op->flags);
-    /* fall-through */
-    case GRPC_SEND_PREFORMATTED_MESSAGE:
-      copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
-      calld->outgoing_buffer_length_estimate +=
-          (5 + grpc_byte_buffer_length(op->data.message));
-      end_bufferable_op(op, chand, calld, 0);
-      break;
-    case GRPC_SEND_FINISH:
-      end_bufferable_op(op, chand, calld, 1);
-      break;
-    case GRPC_REQUEST_DATA:
-      /* re-arm window updates if they were disarmed by finish_message */
-      grpc_transport_set_allow_window_updates(
-          chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
-      break;
-    case GRPC_CANCEL_OP:
-      grpc_transport_abort_stream(chand->transport,
-                                  TRANSPORT_STREAM_FROM_CALL_DATA(calld),
-                                  GRPC_STATUS_CANCELLED);
-      break;
-    default:
-      GPR_ASSERT(op->dir == GRPC_CALL_UP);
-      grpc_call_next_op(elem, op);
-      break;
-  }
+  grpc_transport_perform_op(chand->transport,
+                            TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
 }
 
 /* Currently we assume all channel operations should just be pushed up. */
@@ -182,23 +95,16 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   int r;
 
   GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-  calld->elem = elem;
-  grpc_sopb_init(&calld->outgoing_sopb);
-
-  calld->reading_message = 0;
-  calld->got_read_close = 0;
-  calld->outgoing_buffer_length_estimate = 0;
-  calld->max_message_length = chand->max_message_length;
-  gpr_slice_buffer_init(&calld->incoming_message);
   r = grpc_transport_init_stream(chand->transport,
                                  TRANSPORT_STREAM_FROM_CALL_DATA(calld),
-                                 server_transport_data);
+                                 server_transport_data, initial_op);
   GPR_ASSERT(r == 0);
 }
 
@@ -207,8 +113,6 @@
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-  grpc_sopb_destroy(&calld->outgoing_sopb);
-  gpr_slice_buffer_destroy(&calld->incoming_message);
   grpc_transport_destroy_stream(chand->transport,
                                 TRANSPORT_STREAM_FROM_CALL_DATA(calld));
 }
@@ -218,28 +122,10 @@
                               const grpc_channel_args *args, grpc_mdctx *mdctx,
                               int is_first, int is_last) {
   channel_data *cd = (channel_data *)elem->channel_data;
-  size_t i;
   GPR_ASSERT(!is_first);
   GPR_ASSERT(is_last);
   GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
   cd->transport = NULL;
-
-  cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
-  if (args) {
-    for (i = 0; i < args->num_args; i++) {
-      if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
-        if (args->args[i].type != GRPC_ARG_INTEGER) {
-          gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
-                  GRPC_ARG_MAX_MESSAGE_LENGTH);
-        } else if (args->args[i].value.integer < 0) {
-          gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
-                  GRPC_ARG_MAX_MESSAGE_LENGTH);
-        } else {
-          cd->max_message_length = args->args[i].value.integer;
-        }
-      }
-    }
-  }
 }
 
 /* Destructor for channel_data */
@@ -250,15 +136,11 @@
 }
 
 const grpc_channel_filter grpc_connected_channel_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
+    con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "connected",
 };
 
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
-                                   grpc_stream *stream, size_t size_hint) {
-  return gpr_slice_malloc(size_hint);
-}
-
 /* Transport callback to accept a new stream... calls up to handle it */
 static void accept_stream(void *user_data, grpc_transport *transport,
                           const void *transport_server_data) {
@@ -276,168 +158,6 @@
   channel_op(elem, NULL, &op);
 }
 
-static void recv_error(channel_data *chand, call_data *calld, int line,
-                       const char *message) {
-  gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
-  if (chand->transport) {
-    grpc_transport_abort_stream(chand->transport,
-                                TRANSPORT_STREAM_FROM_CALL_DATA(calld),
-                                GRPC_STATUS_INVALID_ARGUMENT);
-  }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
-  grpc_call_element *elem = calld->elem;
-  grpc_call_op call_op;
-  call_op.dir = GRPC_CALL_UP;
-  call_op.flags = 0;
-  /* if we got all the bytes for this message, call up the stack */
-  call_op.type = GRPC_RECV_MESSAGE;
-  call_op.done_cb = do_nothing;
-  /* TODO(ctiller): this could be a lot faster if coded directly */
-  call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
-                                                 calld->incoming_message.count);
-  gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
-  /* disable window updates until we get a request more from above */
-  grpc_transport_set_allow_window_updates(
-      chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
-  GPR_ASSERT(calld->incoming_message.count == 0);
-  calld->reading_message = 0;
-  grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
-                         grpc_metadata_batch metadata) {
-  grpc_call_op op;
-  op.type = GRPC_RECV_METADATA;
-  op.dir = GRPC_CALL_UP;
-  op.flags = 0;
-  op.data.metadata = metadata;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-
-  grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
-   call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
-                       grpc_stream *stream, grpc_stream_op *ops,
-                       size_t ops_count, grpc_stream_state final_state) {
-  call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
-  grpc_call_element *elem = calld->elem;
-  channel_data *chand = elem->channel_data;
-  grpc_stream_op *stream_op;
-  grpc_call_op call_op;
-  size_t i;
-  gpr_uint32 length;
-
-  GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
-  for (i = 0; i < ops_count; i++) {
-    stream_op = ops + i;
-    switch (stream_op->type) {
-      case GRPC_OP_FLOW_CTL_CB:
-        stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
-        break;
-      case GRPC_NO_OP:
-        break;
-      case GRPC_OP_METADATA:
-        got_metadata(elem, stream_op->data.metadata);
-        break;
-      case GRPC_OP_BEGIN_MESSAGE:
-        /* can't begin a message when we're still reading a message */
-        if (calld->reading_message) {
-          char *message = NULL;
-          gpr_asprintf(&message,
-                       "Message terminated early; read %d bytes, expected %d",
-                       (int)calld->incoming_message.length,
-                       (int)calld->incoming_message_length);
-          recv_error(chand, calld, __LINE__, message);
-          gpr_free(message);
-          return;
-        }
-        /* stash away parameters, and prepare for incoming slices */
-        length = stream_op->data.begin_message.length;
-        if (length > calld->max_message_length) {
-          char *message = NULL;
-          gpr_asprintf(
-              &message,
-              "Maximum message length of %d exceeded by a message of length %d",
-              calld->max_message_length, length);
-          recv_error(chand, calld, __LINE__, message);
-          gpr_free(message);
-        } else if (length > 0) {
-          calld->reading_message = 1;
-          calld->incoming_message_length = length;
-        } else {
-          finish_message(chand, calld);
-        }
-        break;
-      case GRPC_OP_SLICE:
-        if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
-          gpr_slice_unref(stream_op->data.slice);
-          break;
-        }
-        /* we have to be reading a message to know what to do here */
-        if (!calld->reading_message) {
-          recv_error(chand, calld, __LINE__,
-                     "Received payload data while not reading a message");
-          return;
-        }
-        /* append the slice to the incoming buffer */
-        gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
-        if (calld->incoming_message.length > calld->incoming_message_length) {
-          /* if we got too many bytes, complain */
-          char *message = NULL;
-          gpr_asprintf(&message,
-                       "Receiving message overflow; read %d bytes, expected %d",
-                       (int)calld->incoming_message.length,
-                       (int)calld->incoming_message_length);
-          recv_error(chand, calld, __LINE__, message);
-          gpr_free(message);
-          return;
-        } else if (calld->incoming_message.length ==
-                   calld->incoming_message_length) {
-          finish_message(chand, calld);
-        }
-    }
-  }
-  /* if the stream closed, then call up the stack to let it know */
-  if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
-                                 final_state == GRPC_STREAM_CLOSED)) {
-    calld->got_read_close = 1;
-    if (calld->reading_message) {
-      char *message = NULL;
-      gpr_asprintf(&message,
-                   "Last message truncated; read %d bytes, expected %d",
-                   (int)calld->incoming_message.length,
-                   (int)calld->incoming_message_length);
-      recv_error(chand, calld, __LINE__, message);
-      gpr_free(message);
-    }
-    call_op.type = GRPC_RECV_HALF_CLOSE;
-    call_op.dir = GRPC_CALL_UP;
-    call_op.flags = 0;
-    call_op.done_cb = do_nothing;
-    call_op.user_data = NULL;
-    grpc_call_next_op(elem, &call_op);
-  }
-  if (final_state == GRPC_STREAM_CLOSED) {
-    call_op.type = GRPC_RECV_FINISH;
-    call_op.dir = GRPC_CALL_UP;
-    call_op.flags = 0;
-    call_op.done_cb = do_nothing;
-    call_op.user_data = NULL;
-    grpc_call_next_op(elem, &call_op);
-  }
-}
-
 static void transport_goaway(void *user_data, grpc_transport *transport,
                              grpc_status_code status, gpr_slice debug) {
   /* transport got goaway ==> call up and handle it */
@@ -470,8 +190,7 @@
 }
 
 const grpc_transport_callbacks connected_channel_transport_callbacks = {
-    alloc_recv_buffer, accept_stream,    recv_batch,
-    transport_goaway,  transport_closed,
+    accept_stream, transport_goaway, transport_closed,
 };
 
 grpc_transport_setup_result grpc_connected_channel_bind_transport(
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 56e1234..9805f32 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -39,6 +39,12 @@
   grpc_linked_mdelem scheme;
   grpc_linked_mdelem te_trailers;
   grpc_linked_mdelem content_type;
+  int sent_initial_metadata;
+
+  int got_initial_metadata;
+  grpc_stream_op_buffer *recv_ops;
+  void (*on_done_recv)(void *user_data, int success);
+  void *recv_user_data;
 } call_data;
 
 typedef struct channel_data {
@@ -64,22 +70,37 @@
   return md;
 }
 
-/* Called either:
-     - in response to an API call (or similar) from above, to send something
-     - a network event (or similar) from below, to receive something
-   op contains type and call direction information, in addition to the data
-   that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void hc_on_recv(void *user_data, int success) {
+  grpc_call_element *elem = user_data;
+  call_data *calld = elem->call_data;
+  if (success) {
+    size_t i;
+    size_t nops = calld->recv_ops->nops;
+    grpc_stream_op *ops = calld->recv_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
+      calld->got_initial_metadata = 1;
+      grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+    }
+  }
+  calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
+  size_t i;
+  if (op->send_ops && !calld->sent_initial_metadata) {
+    size_t nops = op->send_ops->nops;
+    grpc_stream_op *ops = op->send_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
+      calld->sent_initial_metadata = 1;
       /* Send : prefixed headers, which have to be before any application
-       * layer headers. */
+         layer headers. */
       grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
                                    grpc_mdelem_ref(channeld->method));
       grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@@ -88,17 +109,25 @@
                                    grpc_mdelem_ref(channeld->te_trailers));
       grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
                                    grpc_mdelem_ref(channeld->content_type));
-      grpc_call_next_op(elem, op);
       break;
-    case GRPC_RECV_METADATA:
-      grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
-      grpc_call_next_op(elem, op);
-      break;
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_call_next_op(elem, op);
-      break;
+    }
   }
+
+  if (op->recv_ops && !calld->got_initial_metadata) {
+    /* substitute our callback for the higher callback */
+    calld->recv_ops = op->recv_ops;
+    calld->on_done_recv = op->on_done_recv;
+    calld->recv_user_data = op->recv_user_data;
+    op->on_done_recv = hc_on_recv;
+    op->recv_user_data = elem;
+  }
+}
+
+static void hc_start_transport_op(grpc_call_element *elem,
+                                  grpc_transport_op *op) {
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+  hc_mutate_op(elem, op);
+  grpc_call_next_op(elem, op);
 }
 
 /* Called on special channel events, such as disconnection or new incoming
@@ -120,7 +149,13 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {}
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
+  call_data *calld = elem->call_data;
+  calld->sent_initial_metadata = 0;
+  calld->got_initial_metadata = 0;
+  if (initial_op) hc_mutate_op(elem, initial_op);
+}
 
 /* Destructor for call_data */
 static void destroy_call_elem(grpc_call_element *elem) {
@@ -181,6 +216,6 @@
 }
 
 const grpc_channel_filter grpc_http_client_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem,
-    "http-client"};
+    hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a042..0000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
-  int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
-  int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
-     - in response to an API call (or similar) from above, to send something
-     - a network event (or similar) from below, to receive something
-   op contains type and call direction information, in addition to the data
-   that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
-  /* grab pointers to our data from the call element */
-  call_data *calld = elem->call_data;
-  channel_data *channeld = elem->channel_data;
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
-  ignore_unused(calld);
-  ignore_unused(channeld);
-
-  switch (op->type) {
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_call_next_op(elem, op);
-      break;
-  }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
-   calls on the server */
-static void channel_op(grpc_channel_element *elem,
-                       grpc_channel_element *from_elem, grpc_channel_op *op) {
-  /* grab pointers to our data from the channel element */
-  channel_data *channeld = elem->channel_data;
-
-  ignore_unused(channeld);
-
-  switch (op->type) {
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_channel_next_op(elem, op);
-      break;
-  }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
-  /* grab pointers to our data from the call element */
-  call_data *calld = elem->call_data;
-  channel_data *channeld = elem->channel_data;
-
-  /* initialize members */
-  calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
-  /* grab pointers to our data from the call element */
-  call_data *calld = elem->call_data;
-  channel_data *channeld = elem->channel_data;
-
-  ignore_unused(calld);
-  ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
-                              const grpc_channel_args *args, grpc_mdctx *mdctx,
-                              int is_first, int is_last) {
-  /* grab pointers to our data from the channel element */
-  channel_data *channeld = elem->channel_data;
-
-  /* The first and the last filters tend to be implemented differently to
-     handle the case that there's no 'next' filter to call on the up or down
-     path */
-  GPR_ASSERT(!is_first);
-  GPR_ASSERT(!is_last);
-
-  /* initialize members */
-  channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
-  /* grab pointers to our data from the channel element */
-  channel_data *channeld = elem->channel_data;
-
-  ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
-    call_op,           channel_op,           sizeof(call_data),
-    init_call_elem,    destroy_call_elem,    sizeof(channel_data),
-    init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad..0000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
-   transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif  /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 0bfe2f2..1f64df6 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,12 +38,6 @@
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
-typedef struct {
-  grpc_mdelem *path;
-  grpc_mdelem *content_type;
-  grpc_byte_buffer *content;
-} gettable;
-
 typedef struct call_data {
   gpr_uint8 got_initial_metadata;
   gpr_uint8 seen_path;
@@ -52,6 +46,10 @@
   gpr_uint8 seen_scheme;
   gpr_uint8 seen_te_trailers;
   grpc_linked_mdelem status;
+
+  grpc_stream_op_buffer *recv_ops;
+  void (*on_done_recv)(void *user_data, int success);
+  void *recv_user_data;
 } call_data;
 
 typedef struct channel_data {
@@ -69,9 +67,6 @@
   grpc_mdstr *host_key;
 
   grpc_mdctx *mdctx;
-
-  size_t gettable_count;
-  gettable *gettables;
 } channel_data;
 
 /* used to silence 'variable not used' warnings */
@@ -143,66 +138,80 @@
   }
 }
 
-/* Called either:
-     - in response to an API call (or similar) from above, to send something
-     - a network event (or similar) from below, to receive something
-   op contains type and call direction information, in addition to the data
-   that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void hs_on_recv(void *user_data, int success) {
+  grpc_call_element *elem = user_data;
+  call_data *calld = elem->call_data;
+  if (success) {
+    size_t i;
+    size_t nops = calld->recv_ops->nops;
+    grpc_stream_op *ops = calld->recv_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
+      calld->got_initial_metadata = 1;
+      grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+      /* Have we seen the required http2 transport headers?
+         (:method, :scheme, content-type, with :path and :authority covered
+         at the channel level right now) */
+      if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+          calld->seen_path) {
+        /* do nothing */
+      } else {
+        if (!calld->seen_path) {
+          gpr_log(GPR_ERROR, "Missing :path header");
+        }
+        if (!calld->seen_post) {
+          gpr_log(GPR_ERROR, "Missing :method header");
+        }
+        if (!calld->seen_scheme) {
+          gpr_log(GPR_ERROR, "Missing :scheme header");
+        }
+        if (!calld->seen_te_trailers) {
+          gpr_log(GPR_ERROR, "Missing te trailers header");
+        }
+        /* Error this call out */
+        success = 0;
+        grpc_call_element_send_cancel(elem);
+      }
+    }
+  }
+  calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+  size_t i;
 
-  switch (op->type) {
-    case GRPC_RECV_METADATA:
-      grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
-      if (!calld->got_initial_metadata) {
-        calld->got_initial_metadata = 1;
-        /* Have we seen the required http2 transport headers?
-           (:method, :scheme, content-type, with :path and :authority covered
-           at the channel level right now) */
-        if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
-            calld->seen_path) {
-          grpc_call_next_op(elem, op);
-        } else {
-          if (!calld->seen_path) {
-            gpr_log(GPR_ERROR, "Missing :path header");
-          }
-          if (!calld->seen_post) {
-            gpr_log(GPR_ERROR, "Missing :method header");
-          }
-          if (!calld->seen_scheme) {
-            gpr_log(GPR_ERROR, "Missing :scheme header");
-          }
-          if (!calld->seen_te_trailers) {
-            gpr_log(GPR_ERROR, "Missing te trailers header");
-          }
-          /* Error this call out */
-          grpc_metadata_batch_destroy(&op->data.metadata);
-          op->done_cb(op->user_data, GRPC_OP_OK);
-          grpc_call_element_send_cancel(elem);
-        }
-      } else {
-        grpc_call_next_op(elem, op);
-      }
+  if (op->send_ops && !calld->sent_status) {
+    size_t nops = op->send_ops->nops;
+    grpc_stream_op *ops = op->send_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
+      calld->sent_status = 1;
+      grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+                                   grpc_mdelem_ref(channeld->status_ok));
       break;
-    case GRPC_SEND_METADATA:
-      /* If we haven't sent status 200 yet, we need to so so because it needs to
-         come before any non : prefixed metadata. */
-      if (!calld->sent_status) {
-        calld->sent_status = 1;
-        grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
-                                     grpc_mdelem_ref(channeld->status_ok));
-      }
-      grpc_call_next_op(elem, op);
-      break;
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_call_next_op(elem, op);
-      break;
+    }
   }
+
+  if (op->recv_ops && !calld->got_initial_metadata) {
+    /* substitute our callback for the higher callback */
+    calld->recv_ops = op->recv_ops;
+    calld->on_done_recv = op->on_done_recv;
+    calld->recv_user_data = op->recv_user_data;
+    op->on_done_recv = hs_on_recv;
+    op->recv_user_data = elem;
+  }
+}
+
+static void hs_start_transport_op(grpc_call_element *elem,
+                                  grpc_transport_op *op) {
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+  hs_mutate_op(elem, op);
+  grpc_call_next_op(elem, op);
 }
 
 /* Called on special channel events, such as disconnection or new incoming
@@ -224,15 +233,13 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
-  channel_data *channeld = elem->channel_data;
-
-  ignore_unused(channeld);
-
   /* initialize members */
   memset(calld, 0, sizeof(*calld));
+  if (initial_op) hs_mutate_op(elem, initial_op);
 }
 
 /* Destructor for call_data */
@@ -242,9 +249,6 @@
 static void init_channel_elem(grpc_channel_element *elem,
                               const grpc_channel_args *args, grpc_mdctx *mdctx,
                               int is_first, int is_last) {
-  size_t i;
-  size_t gettable_capacity = 0;
-
   /* grab pointers to our data from the channel element */
   channel_data *channeld = elem->channel_data;
 
@@ -270,46 +274,13 @@
       grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
 
   channeld->mdctx = mdctx;
-
-  /* initialize http download support */
-  channeld->gettable_count = 0;
-  channeld->gettables = NULL;
-  for (i = 0; i < args->num_args; i++) {
-    if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) {
-      gettable *g;
-      gpr_slice slice;
-      grpc_http_server_page *p = args->args[i].value.pointer.p;
-      if (channeld->gettable_count == gettable_capacity) {
-        gettable_capacity =
-            GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1);
-        channeld->gettables = gpr_realloc(channeld->gettables,
-                                          gettable_capacity * sizeof(gettable));
-      }
-      g = &channeld->gettables[channeld->gettable_count++];
-      g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path);
-      g->content_type =
-          grpc_mdelem_from_strings(mdctx, "content-type", p->content_type);
-      slice = gpr_slice_from_copied_string(p->content);
-      g->content = grpc_byte_buffer_create(&slice, 1);
-      gpr_slice_unref(slice);
-    }
-  }
 }
 
 /* Destructor for channel data */
 static void destroy_channel_elem(grpc_channel_element *elem) {
-  size_t i;
-
   /* grab pointers to our data from the channel element */
   channel_data *channeld = elem->channel_data;
 
-  for (i = 0; i < channeld->gettable_count; i++) {
-    grpc_mdelem_unref(channeld->gettables[i].path);
-    grpc_mdelem_unref(channeld->gettables[i].content_type);
-    grpc_byte_buffer_destroy(channeld->gettables[i].content);
-  }
-  gpr_free(channeld->gettables);
-
   grpc_mdelem_unref(channeld->te_trailers);
   grpc_mdelem_unref(channeld->status_ok);
   grpc_mdelem_unref(channeld->status_not_found);
@@ -324,6 +295,6 @@
 }
 
 const grpc_channel_filter grpc_http_server_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem,
-    "http-server"};
+    hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2..1d2be71 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,13 +45,7 @@
 /* used to silence 'variable not used' warnings */
 static void ignore_unused(void *ignored) {}
 
-/* Called either:
-     - in response to an API call (or similar) from above, to send something
-     - a network event (or similar) from below, to receive something
-   op contains type and call direction information, in addition to the data
-   that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
@@ -59,12 +53,20 @@
   ignore_unused(calld);
   ignore_unused(channeld);
 
-  switch (op->type) {
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_call_next_op(elem, op);
-      break;
-  }
+  /* do nothing */
+}
+
+/* Called either:
+     - in response to an API call (or similar) from above, to send something
+     - a network event (or similar) from below, to receive something
+   op contains type and call direction information, in addition to the data
+   that is being sent or received. */
+static void noop_start_transport_op(grpc_call_element *elem,
+                                    grpc_transport_op *op) {
+  noop_mutate_op(elem, op);
+
+  /* pass control down the stack */
+  grpc_call_next_op(elem, op);
 }
 
 /* Called on special channel events, such as disconnection or new incoming
@@ -86,13 +88,16 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
 
   /* initialize members */
   calld->unused = channeld->unused;
+
+  if (initial_op) noop_mutate_op(elem, initial_op);
 }
 
 /* Destructor for call_data */
@@ -131,6 +136,6 @@
 }
 
 const grpc_channel_filter grpc_no_op_filter = {
-    call_op,           channel_op,           sizeof(call_data),
-    init_call_elem,    destroy_call_elem,    sizeof(channel_data),
-    init_channel_elem, destroy_channel_elem, "no-op"};
+    noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "no-op"};
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 4af2c67..2322c12 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -51,7 +51,9 @@
   grpc_credentials *creds;
   grpc_mdstr *host;
   grpc_mdstr *method;
-  grpc_call_op op;
+  grpc_transport_op op;
+  size_t op_md_idx;
+  int sent_initial_metadata;
   grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
 } call_data;
 
@@ -65,24 +67,23 @@
   grpc_mdstr *status_key;
 } channel_data;
 
-static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
-  grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
-  grpc_call_element_send_cancel(elem);
-}
-
 static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
                                     size_t num_md,
                                     grpc_credentials_status status) {
   grpc_call_element *elem = (grpc_call_element *)user_data;
   call_data *calld = elem->call_data;
-  grpc_call_op op = calld->op;
+  grpc_transport_op *op = &calld->op;
+  grpc_metadata_batch *mdb;
   size_t i;
   GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
+  GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx &&
+             op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
+  mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata;
   for (i = 0; i < num_md; i++) {
-    grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i],
+    grpc_metadata_batch_add_tail(mdb, &calld->md_links[i],
                                  grpc_mdelem_ref(md_elems[i]));
   }
-  grpc_call_next_op(elem, &op);
+  grpc_call_next_op(elem, op);
 }
 
 static char *build_service_url(const char *url_scheme, call_data *calld) {
@@ -105,7 +106,8 @@
   return service_url;
 }
 
-static void send_security_metadata(grpc_call_element *elem, grpc_call_op *op) {
+static void send_security_metadata(grpc_call_element *elem,
+                                   grpc_transport_op *op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
@@ -136,6 +138,7 @@
 static void on_host_checked(void *user_data, grpc_security_status status) {
   grpc_call_element *elem = (grpc_call_element *)user_data;
   call_data *calld = elem->call_data;
+  channel_data *chand = elem->channel_data;
 
   if (status == GRPC_SECURITY_OK) {
     send_security_metadata(elem, &calld->op);
@@ -143,10 +146,11 @@
     char *error_msg;
     gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
                  grpc_mdstr_as_c_string(calld->host));
-    bubbleup_error(elem, error_msg);
-    grpc_metadata_batch_destroy(&calld->op.data.metadata);
+    grpc_transport_op_add_cancellation(
+        &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+        grpc_mdstr_from_string(chand->md_ctx, error_msg));
     gpr_free(error_msg);
-    calld->op.done_cb(calld->op.user_data, GRPC_OP_ERROR);
+    grpc_call_next_op(elem, &calld->op);
   }
 }
 
@@ -155,16 +159,23 @@
      - a network event (or similar) from below, to receive something
    op contains type and call direction information, in addition to the data
    that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void auth_start_transport_op(grpc_call_element *elem,
+                                    grpc_transport_op *op) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
   grpc_linked_mdelem *l;
+  size_t i;
 
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
-      for (l = op->data.metadata.list.head; l != NULL; l = l->next) {
+  if (op->send_ops && !calld->sent_initial_metadata) {
+    size_t nops = op->send_ops->nops;
+    grpc_stream_op *ops = op->send_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *sop = &ops[i];
+      if (sop->type != GRPC_OP_METADATA) continue;
+      calld->op_md_idx = i;
+      calld->sent_initial_metadata = 1;
+      for (l = sop->data.metadata.list.head; l != NULL; l = l->next) {
         grpc_mdelem *md = l->md;
         /* Pointer comparison is OK for md_elems created from the same context.
          */
@@ -188,21 +199,22 @@
             gpr_asprintf(&error_msg,
                          "Invalid host %s set in :authority metadata.",
                          call_host);
-            bubbleup_error(elem, error_msg);
-            grpc_metadata_batch_destroy(&calld->op.data.metadata);
+            grpc_transport_op_add_cancellation(
+                &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+                grpc_mdstr_from_string(channeld->md_ctx, error_msg));
             gpr_free(error_msg);
-            op->done_cb(op->user_data, GRPC_OP_ERROR);
+            grpc_call_next_op(elem, &calld->op);
           }
-          break;
+          return; /* early exit */
         }
       }
       send_security_metadata(elem, op);
-      break;
-    default:
-      /* pass control up or down the stack depending on op->dir */
-      grpc_call_next_op(elem, op);
-      break;
+      return; /* early exit */
+    }
   }
+
+  /* pass control up or down the stack */
+  grpc_call_next_op(elem, op);
 }
 
 /* Called on special channel events, such as disconnection or new incoming
@@ -214,13 +226,17 @@
 
 /* Constructor for call_data */
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   /* TODO(jboeuf):
      Find a way to pass-in the credentials from the caller here.  */
   call_data *calld = elem->call_data;
   calld->creds = NULL;
   calld->host = NULL;
   calld->method = NULL;
+  calld->sent_initial_metadata = 0;
+
+  GPR_ASSERT(!initial_op || !initial_op->send_ops);
 }
 
 /* Destructor for call_data */
@@ -288,5 +304,6 @@
 }
 
 const grpc_channel_filter grpc_client_auth_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem, "auth"};
+    auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "auth"};
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 5b7d99c..db9d545 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,7 +35,6 @@
 
 #include <string.h>
 
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/iomgr/endpoint.h"
 #include "src/core/iomgr/resolve_address.h"
@@ -73,8 +72,8 @@
 static grpc_transport_setup_result setup_transport(void *server,
                                                    grpc_transport *transport,
                                                    grpc_mdctx *mdctx) {
-  static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
-                                                       &grpc_http_filter};
+  static grpc_channel_filter const *extra_filters[] = {
+      &grpc_http_server_filter};
   return grpc_server_setup_transport(server, transport, extra_filters,
                                      GPR_ARRAY_SIZE(extra_filters), mdctx);
 }
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2949805..a65bfb8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -81,9 +81,9 @@
   grpc_ioreq_completion_func on_complete;
   void *user_data;
   /* a bit mask of which request ops are needed (1u << opid) */
-  gpr_uint32 need_mask;
+  gpr_uint16 need_mask;
   /* a bit mask of which request ops are now completed */
-  gpr_uint32 complete_mask;
+  gpr_uint16 complete_mask;
 } reqinfo_master;
 
 /* Status data for a request can come from several sources; this
@@ -144,12 +144,17 @@
   gpr_uint8 have_alarm;
   /* are we currently performing a send operation */
   gpr_uint8 sending;
+  /* are we currently performing a recv operation */
+  gpr_uint8 receiving;
   /* are we currently completing requests */
   gpr_uint8 completing;
   /* pairs with completed_requests */
   gpr_uint8 num_completed_requests;
-  /* flag that we need to request more data */
-  gpr_uint8 need_more_data;
+  /* are we currently reading a message? */
+  gpr_uint8 reading_message;
+  /* flags with bits corresponding to write states allowing us to determine
+     what was sent */
+  gpr_uint16 last_send_contains;
 
   /* Active ioreqs.
      request_set and request_data contain one element per active ioreq
@@ -214,6 +219,13 @@
   size_t send_initial_metadata_count;
   gpr_timespec send_deadline;
 
+  grpc_stream_op_buffer send_ops;
+  grpc_stream_op_buffer recv_ops;
+  grpc_stream_state recv_state;
+
+  gpr_slice_buffer incoming_message;
+  gpr_uint32 incoming_message_length;
+
   /* Data that the legacy api needs to track. To be deleted at some point
      soon */
   legacy_state *legacy_state;
@@ -234,9 +246,13 @@
   } while (0)
 
 static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
 static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
 
 grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
                             const void *server_transport_data,
@@ -244,6 +260,8 @@
                             size_t add_initial_metadata_count,
                             gpr_timespec send_deadline) {
   size_t i;
+  grpc_transport_op initial_op;
+  grpc_transport_op *initial_op_ptr = NULL;
   grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
   grpc_call *call =
       gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -267,10 +285,24 @@
   call->send_deadline = send_deadline;
   grpc_channel_internal_ref(channel);
   call->metadata_context = grpc_channel_get_metadata_context(channel);
-  /* one ref is dropped in response to destroy, the other in
-     stream_closed */
-  gpr_ref_init(&call->internal_refcount, 2);
-  grpc_call_stack_init(channel_stack, server_transport_data,
+  grpc_sopb_init(&call->send_ops);
+  grpc_sopb_init(&call->recv_ops);
+  gpr_slice_buffer_init(&call->incoming_message);
+  /* dropped in destroy */
+  gpr_ref_init(&call->internal_refcount, 1);
+  /* server hack: start reads immediately so we can get initial metadata.
+     TODO(ctiller): figure out a cleaner solution */
+  if (!call->is_client) {
+    memset(&initial_op, 0, sizeof(initial_op));
+    initial_op.recv_ops = &call->recv_ops;
+    initial_op.recv_state = &call->recv_state;
+    initial_op.on_done_recv = call_on_done_recv;
+    initial_op.recv_user_data = call;
+    call->receiving = 1;
+    grpc_call_internal_ref(call);
+    initial_op_ptr = &initial_op;
+  }
+  grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
                        CALL_STACK_FROM_CALL(call));
   if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
     set_deadline_alarm(call, send_deadline);
@@ -287,7 +319,8 @@
   return call->cq;
 }
 
-void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
+void grpc_call_internal_ref(grpc_call *c) { 
+  gpr_ref(&c->internal_refcount); }
 
 static void destroy_call(void *call, int ignored_success) {
   size_t i;
@@ -314,6 +347,9 @@
     destroy_legacy_state(c->legacy_state);
   }
   grpc_bbq_destroy(&c->incoming_queue);
+  grpc_sopb_destroy(&c->send_ops);
+  grpc_sopb_destroy(&c->recv_ops);
+  gpr_slice_buffer_destroy(&c->incoming_message);
   gpr_free(c);
 }
 
@@ -359,20 +395,6 @@
   return GRPC_CALL_OK;
 }
 
-static void request_more_data(grpc_call *call) {
-  grpc_call_op op;
-
-  /* call down */
-  op.type = GRPC_REQUEST_DATA;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = 0;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-  op.bind_pollset = NULL;
-
-  grpc_call_execute_op(call, &op);
-}
-
 static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
   gpr_uint8 set = call->request_set[op];
   reqinfo_master *master;
@@ -383,17 +405,41 @@
 
 static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
 
+static int need_more_data(grpc_call *call) {
+  return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+         is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+         is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+         is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+         is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+         (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) ||
+         (call->write_state == WRITE_STATE_INITIAL && !call->is_client && call->read_state != READ_STATE_STREAM_CLOSED);
+}
+
 static void unlock(grpc_call *call) {
-  send_action sa = SEND_NOTHING;
+  grpc_transport_op op;
   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
   int completing_requests = 0;
-  int need_more_data =
-      call->need_more_data &&
-      (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+  int start_op = 0;
   int i;
 
-  if (need_more_data) {
-    call->need_more_data = 0;
+  memset(&op, 0, sizeof(op));
+
+  if (!call->receiving && need_more_data(call)) {
+    op.recv_ops = &call->recv_ops;
+    op.recv_state = &call->recv_state;
+    op.on_done_recv = call_on_done_recv;
+    op.recv_user_data = call;
+    call->receiving = 1;
+    grpc_call_internal_ref(call);
+    start_op = 1;
+  }
+
+  if (!call->sending) {
+    if (fill_send_ops(call, &op)) {
+      call->sending = 1;
+      grpc_call_internal_ref(call);
+      start_op = 1;
+    }
   }
 
   if (!call->completing && call->num_completed_requests != 0) {
@@ -405,22 +451,10 @@
     grpc_call_internal_ref(call);
   }
 
-  if (!call->sending) {
-    sa = choose_send_action(call);
-    if (sa != SEND_NOTHING) {
-      call->sending = 1;
-      grpc_call_internal_ref(call);
-    }
-  }
-
   gpr_mu_unlock(&call->mu);
 
-  if (need_more_data) {
-    request_more_data(call);
-  }
-
-  if (sa != SEND_NOTHING) {
-    enact_send_action(call, sa);
+  if (start_op) {
+    execute_op(call, &op);
   }
 
   if (completing_requests > 0) {
@@ -495,7 +529,6 @@
   master->complete_mask |= 1u << op;
   if (status != GRPC_OP_OK) {
     master->status = status;
-    master->complete_mask = master->need_mask;
   }
   if (master->complete_mask == master->need_mask) {
     for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
@@ -554,64 +587,144 @@
   }
 }
 
-static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws,
-                           grpc_op_error error) {
+static void call_on_done_send(void *pc, int success) {
+  grpc_call *call = pc;
+  grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
   lock(call);
-  finish_ioreq_op(call, op, error);
+  if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+    finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+  }
+  if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
+    finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
+  }
+  if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
+    finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
+    finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
+    finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+  }
+  call->last_send_contains = 0;
   call->sending = 0;
-  call->write_state = ws;
   unlock(call);
   grpc_call_internal_unref(call, 0);
 }
 
-static void finish_write_step(void *pc, grpc_op_error error) {
-  finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error);
+static void finish_message(grpc_call *call) {
+  /* TODO(ctiller): this could be a lot faster if coded directly */
+  grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+      call->incoming_message.slices, call->incoming_message.count);
+  gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+  grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+  GPR_ASSERT(call->incoming_message.count == 0);
+  call->reading_message = 0;
 }
 
-static void finish_finish_step(void *pc, grpc_op_error error) {
-  finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error);
-}
-
-static void finish_start_step(void *pc, grpc_op_error error) {
-  finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED,
-                 error);
-}
-
-static send_action choose_send_action(grpc_call *call) {
-  switch (call->write_state) {
-    case WRITE_STATE_INITIAL:
-      if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
-        if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) ||
-            is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
-          return SEND_BUFFERED_INITIAL_METADATA;
-        } else {
-          return SEND_INITIAL_METADATA;
-        }
-      }
-      return SEND_NOTHING;
-    case WRITE_STATE_STARTED:
-      if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
-        if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
-          return SEND_BUFFERED_MESSAGE;
-        } else {
-          return SEND_MESSAGE;
-        }
-      } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
-        finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
-        finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
-        if (call->is_client) {
-          return SEND_FINISH;
-        } else {
-          return SEND_TRAILING_METADATA_AND_FINISH;
-        }
-      }
-      return SEND_NOTHING;
-    case WRITE_STATE_WRITE_CLOSED:
-      return SEND_NOTHING;
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+  /* can't begin a message when we're still reading a message */
+  if (call->reading_message) {
+    char *message = NULL;
+    gpr_asprintf(
+        &message, "Message terminated early; read %d bytes, expected %d",
+        (int)call->incoming_message.length, (int)call->incoming_message_length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
   }
-  gpr_log(GPR_ERROR, "should never reach here");
-  abort();
-  return SEND_NOTHING;
+  /* stash away parameters, and prepare for incoming slices */
+  if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+    char *message = NULL;
+    gpr_asprintf(
+        &message,
+        "Maximum message length of %d exceeded by a message of length %d",
+        grpc_channel_get_max_message_length(call->channel), msg.length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
+  } else if (msg.length > 0) {
+    call->reading_message = 1;
+    call->incoming_message_length = msg.length;
+    return 1;
+  } else {
+    finish_message(call);
+    return 1;
+  }
+}
+
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+  if (GPR_SLICE_LENGTH(slice) == 0) {
+    gpr_slice_unref(slice);
+    return 1;
+  }
+  /* we have to be reading a message to know what to do here */
+  if (!call->reading_message) {
+    grpc_call_cancel_with_status(
+        call, GRPC_STATUS_INVALID_ARGUMENT,
+        "Received payload data while not reading a message");
+    return 0;
+  }
+  /* append the slice to the incoming buffer */
+  gpr_slice_buffer_add(&call->incoming_message, slice);
+  if (call->incoming_message.length > call->incoming_message_length) {
+    /* if we got too many bytes, complain */
+    char *message = NULL;
+    gpr_asprintf(
+        &message, "Receiving message overflow; read %d bytes, expected %d",
+        (int)call->incoming_message.length, (int)call->incoming_message_length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
+  } else if (call->incoming_message.length == call->incoming_message_length) {
+    finish_message(call);
+    return 1;
+  } else {
+    return 1;
+  }
+}
+
+static void call_on_done_recv(void *pc, int success) {
+  grpc_call *call = pc;
+  size_t i;
+  lock(call);
+  call->receiving = 0;
+  if (success) {
+    for (i = 0; success && i < call->recv_ops.nops; i++) {
+      grpc_stream_op *op = &call->recv_ops.ops[i];
+      switch (op->type) {
+        case GRPC_NO_OP:
+          break;
+        case GRPC_OP_METADATA:
+          recv_metadata(call, &op->data.metadata);
+          break;
+        case GRPC_OP_BEGIN_MESSAGE:
+          success = begin_message(call, op->data.begin_message);
+          break;
+        case GRPC_OP_SLICE:
+          success = add_slice_to_message(call, op->data.slice);
+          break;
+      }
+    }
+    if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+      GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+      call->read_state = READ_STATE_READ_CLOSED;
+    }
+    if (call->recv_state == GRPC_STREAM_CLOSED) {
+      GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+      call->read_state = READ_STATE_STREAM_CLOSED;
+    }
+    finish_read_ops(call);
+  } else {
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
+    finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
+  }
+  call->recv_ops.nops = 0;
+  unlock(call);
+
+  grpc_call_internal_unref(call, 0);
 }
 
 static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
@@ -639,99 +752,104 @@
   return out;
 }
 
-static void enact_send_action(grpc_call *call, send_action sa) {
-  grpc_ioreq_data data;
-  grpc_call_op op;
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+                                           grpc_stream_op_buffer *sopb) {
   size_t i;
-  gpr_uint32 flags = 0;
-  char status_str[GPR_LTOA_MIN_BUFSIZE];
 
-  switch (sa) {
-    case SEND_NOTHING:
-      abort();
-      break;
-    case SEND_BUFFERED_INITIAL_METADATA:
-      flags |= GRPC_WRITE_BUFFER_HINT;
-    /* fallthrough */
-    case SEND_INITIAL_METADATA:
-      data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
-      op.type = GRPC_SEND_METADATA;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = flags;
-      op.data.metadata.list = chain_metadata_from_app(
-          call, data.send_metadata.count, data.send_metadata.metadata);
-      op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
-      op.data.metadata.deadline = call->send_deadline;
-      for (i = 0; i < call->send_initial_metadata_count; i++) {
-        grpc_metadata_batch_link_head(&op.data.metadata,
-                                      &call->send_initial_metadata[i]);
+  switch (byte_buffer->type) {
+    case GRPC_BB_SLICE_BUFFER:
+      for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+        gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+        gpr_slice_ref(slice);
+        grpc_sopb_add_slice(sopb, slice);
       }
-      call->send_initial_metadata_count = 0;
-      op.done_cb = finish_start_step;
-      op.user_data = call;
-      op.bind_pollset = grpc_cq_pollset(call->cq);
-      grpc_call_execute_op(call, &op);
-      break;
-    case SEND_BUFFERED_MESSAGE:
-      flags |= GRPC_WRITE_BUFFER_HINT;
-    /* fallthrough */
-    case SEND_MESSAGE:
-      data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
-      op.type = GRPC_SEND_MESSAGE;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = flags;
-      op.data.message = data.send_message;
-      op.done_cb = finish_write_step;
-      op.user_data = call;
-      op.bind_pollset = NULL;
-      grpc_call_execute_op(call, &op);
-      break;
-    case SEND_TRAILING_METADATA_AND_FINISH:
-      /* send trailing metadata */
-      data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
-      op.type = GRPC_SEND_METADATA;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = flags;
-      op.data.metadata.list = chain_metadata_from_app(
-          call, data.send_metadata.count, data.send_metadata.metadata);
-      op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
-      op.data.metadata.deadline = call->send_deadline;
-      op.bind_pollset = NULL;
-      /* send status */
-      /* TODO(ctiller): cache common status values */
-      data = call->request_data[GRPC_IOREQ_SEND_STATUS];
-      gpr_ltoa(data.send_status.code, status_str);
-      grpc_metadata_batch_add_tail(
-          &op.data.metadata, &call->status_link,
-          grpc_mdelem_from_metadata_strings(
-              call->metadata_context,
-              grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
-              grpc_mdstr_from_string(call->metadata_context, status_str)));
-      if (data.send_status.details) {
-        grpc_metadata_batch_add_tail(
-            &op.data.metadata, &call->details_link,
-            grpc_mdelem_from_metadata_strings(
-                call->metadata_context,
-                grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
-                grpc_mdstr_from_string(call->metadata_context,
-                                       data.send_status.details)));
-      }
-      op.done_cb = do_nothing;
-      op.user_data = NULL;
-      grpc_call_execute_op(call, &op);
-    /* fallthrough: see choose_send_action for details */
-    case SEND_FINISH:
-      op.type = GRPC_SEND_FINISH;
-      op.dir = GRPC_CALL_DOWN;
-      op.flags = 0;
-      op.done_cb = finish_finish_step;
-      op.user_data = call;
-      op.bind_pollset = NULL;
-      grpc_call_execute_op(call, &op);
       break;
   }
 }
 
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
+  grpc_ioreq_data data;
+  grpc_metadata_batch mdb;
+  size_t i;
+  char status_str[GPR_LTOA_MIN_BUFSIZE];
+  GPR_ASSERT(op->send_ops == NULL);
+
+  switch (call->write_state) {
+    case WRITE_STATE_INITIAL:
+      if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+        break;
+      }
+      data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
+      mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+                                         data.send_metadata.metadata);
+      mdb.garbage.head = mdb.garbage.tail = NULL;
+      mdb.deadline = call->send_deadline;
+      for (i = 0; i < call->send_initial_metadata_count; i++) {
+        grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
+      }
+      grpc_sopb_add_metadata(&call->send_ops, mdb);
+      op->send_ops = &call->send_ops;
+      op->bind_pollset = grpc_cq_pollset(call->cq);
+      call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
+      call->write_state = WRITE_STATE_STARTED;
+      call->send_initial_metadata_count = 0;
+    /* fall through intended */
+    case WRITE_STATE_STARTED:
+      if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+        data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+        grpc_sopb_add_begin_message(
+            &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+        copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
+        op->send_ops = &call->send_ops;
+        call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
+      }
+      if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
+        op->is_last_send = 1;
+        op->send_ops = &call->send_ops;
+        call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
+        call->write_state = WRITE_STATE_WRITE_CLOSED;
+        if (!call->is_client) {
+          /* send trailing metadata */
+          data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
+          mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+                                             data.send_metadata.metadata);
+          mdb.garbage.head = mdb.garbage.tail = NULL;
+          mdb.deadline = gpr_inf_future;
+          /* send status */
+          /* TODO(ctiller): cache common status values */
+          data = call->request_data[GRPC_IOREQ_SEND_STATUS];
+          gpr_ltoa(data.send_status.code, status_str);
+          grpc_metadata_batch_add_tail(
+              &mdb, &call->status_link,
+              grpc_mdelem_from_metadata_strings(
+                  call->metadata_context,
+                  grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+                  grpc_mdstr_from_string(call->metadata_context, status_str)));
+          if (data.send_status.details) {
+            grpc_metadata_batch_add_tail(
+                &mdb, &call->details_link,
+                grpc_mdelem_from_metadata_strings(
+                    call->metadata_context,
+                    grpc_mdstr_ref(
+                        grpc_channel_get_message_string(call->channel)),
+                    grpc_mdstr_from_string(call->metadata_context,
+                                           data.send_status.details)));
+          }
+          grpc_sopb_add_metadata(&call->send_ops, mdb);
+        }
+      }
+      break;
+    case WRITE_STATE_WRITE_CLOSED:
+      break;
+  }
+  if (op->send_ops) {
+    op->on_done_send = call_on_done_send;
+    op->send_user_data = call;
+  }
+  return op->send_ops != NULL;
+}
+
 static grpc_call_error start_ioreq_error(grpc_call *call,
                                          gpr_uint32 mutated_ops,
                                          grpc_call_error ret) {
@@ -838,10 +956,6 @@
   master->on_complete = completion;
   master->user_data = user_data;
 
-  if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
-    call->need_more_data = 1;
-  }
-
   finish_read_ops(call);
   early_out_write_ops(call);
 
@@ -871,41 +985,34 @@
   grpc_call_internal_unref(c, 1);
 }
 
-grpc_call_error grpc_call_cancel(grpc_call *c) {
-  grpc_call_element *elem;
-  grpc_call_op op;
-
-  op.type = GRPC_CANCEL_OP;
-  op.dir = GRPC_CALL_DOWN;
-  op.flags = 0;
-  op.done_cb = do_nothing;
-  op.user_data = NULL;
-  op.bind_pollset = NULL;
-
-  elem = CALL_ELEM_FROM_CALL(c, 0);
-  elem->filter->call_op(elem, NULL, &op);
-
-  return GRPC_CALL_OK;
+grpc_call_error grpc_call_cancel(grpc_call *call) {
+  return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
 }
 
 grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
                                              grpc_status_code status,
                                              const char *description) {
+  grpc_transport_op op;
   grpc_mdstr *details =
       description ? grpc_mdstr_from_string(c->metadata_context, description)
                   : NULL;
+  memset(&op, 0, sizeof(op));
+  op.cancel_with_status = status;
+
   lock(c);
   set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
   set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
   unlock(c);
-  return grpc_call_cancel(c);
+
+  execute_op(c, &op);
+
+  return GRPC_CALL_OK;
 }
 
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_op *op) {
   grpc_call_element *elem;
-  GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
   elem = CALL_ELEM_FROM_CALL(call, 0);
-  elem->filter->call_op(elem, NULL, op);
+  elem->filter->start_transport_op(elem, op);
 }
 
 grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -928,34 +1035,14 @@
 static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
   if (call->have_alarm) {
     gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
+    assert(0);
+    return;
   }
   grpc_call_internal_ref(call);
   call->have_alarm = 1;
   grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
 }
 
-static void set_read_state_locked(grpc_call *call, read_state state) {
-  GPR_ASSERT(call->read_state < state);
-  call->read_state = state;
-  finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
-  lock(call);
-  set_read_state_locked(call, state);
-  unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
-  set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  set_read_state(call, READ_STATE_STREAM_CLOSED);
-  grpc_call_internal_unref(call, 0);
-}
-
 /* we offset status by a small amount when storing it into transport metadata
    as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
    */
@@ -979,35 +1066,13 @@
   return status;
 }
 
-void grpc_call_recv_message(grpc_call_element *elem,
-                            grpc_byte_buffer *byte_buffer) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  lock(call);
-  grpc_bbq_push(&call->incoming_queue, byte_buffer);
-  finish_read_ops(call);
-  unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
-                                     grpc_status_code status,
-                                     const char *message) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  lock(call);
-  set_status_code(call, STATUS_FROM_CORE, status);
-  set_status_details(call, STATUS_FROM_CORE,
-                     grpc_mdstr_from_string(call->metadata_context, message));
-  unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
   grpc_linked_mdelem *l;
   grpc_metadata_array *dest;
   grpc_metadata *mdusr;
   int is_trailing;
   grpc_mdctx *mdctx = call->metadata_context;
 
-  lock(call);
   is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
   for (l = md->list.head; l != NULL; l = l->next) {
     grpc_mdelem *md = l->md;
@@ -1043,9 +1108,8 @@
     set_deadline_alarm(call, md->deadline);
   }
   if (!is_trailing) {
-    set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+    call->read_state = READ_STATE_GOT_INITIAL_METADATA;
   }
-  unlock(call);
 
   grpc_mdctx_lock(mdctx);
   for (l = md->list.head; l; l = l->next) {
@@ -1055,8 +1119,6 @@
     grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
   }
   grpc_mdctx_unlock(mdctx);
-
-  return !is_trailing;
 }
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915..199beb1 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -96,27 +96,12 @@
 void grpc_call_internal_ref(grpc_call *call);
 void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
 
-/* Helpers for grpc_client, grpc_server filters to publish received data to
-   the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
-                            grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
-                            grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
 grpc_call_error grpc_call_start_ioreq_and_call_back(
     grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
     grpc_ioreq_completion_func on_complete, void *user_data);
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
 
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
-                                     grpc_status_code status,
-                                     const char *message);
-
 /* Given the top call_element, get the call object. */
 grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
 
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e..78f9144 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@
 struct grpc_channel {
   int is_client;
   gpr_refcount refs;
+  gpr_uint32 max_message_length;
   grpc_mdctx *metadata_context;
   grpc_mdstr *grpc_status_string;
   grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@
 #define CHANNEL_FROM_TOP_ELEM(top_elem) \
   CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
 
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
 grpc_channel *grpc_channel_create_from_filters(
     const grpc_channel_filter **filters, size_t num_filters,
     const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+  size_t i;
   size_t size =
       sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
   grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@
                           CHANNEL_STACK_FROM_CHANNEL(channel));
   gpr_mu_init(&channel->registered_call_mu);
   channel->registered_calls = NULL;
+
+  channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+  if (args) {
+    for (i = 0; i < args->num_args; i++) {
+      if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+        if (args->args[i].type != GRPC_ARG_INTEGER) {
+          gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+                  GRPC_ARG_MAX_MESSAGE_LENGTH);
+        } else if (args->args[i].value.integer < 0) {
+          gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+                  GRPC_ARG_MAX_MESSAGE_LENGTH);
+        } else {
+          channel->max_message_length = args->args[i].value.integer;
+        }
+      }
+    }
+  }
+
   return channel;
 }
 
@@ -219,3 +242,7 @@
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
   return channel->grpc_message_string;
 }
+
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
+  return channel->max_message_length;
+}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e5118..388be35 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,10 +44,11 @@
 grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
 
 void grpc_client_channel_closed(grpc_channel_element *elem);
 
 void grpc_channel_internal_ref(grpc_channel *channel);
 void grpc_channel_internal_unref(grpc_channel *channel);
 
-#endif  /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d..daa8d3a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
 #include "src/core/channel/client_setup.h"
 #include "src/core/channel/connected_channel.h"
 #include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/iomgr/endpoint.h"
 #include "src/core/iomgr/resolve_address.h"
 #include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,8 @@
 static grpc_transport_setup_result complete_setup(void *channel_stack,
                                                   grpc_transport *transport,
                                                   grpc_mdctx *mdctx) {
-  static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
-                                                       &grpc_http_filter};
+  static grpc_channel_filter const *extra_filters[] = {
+      &grpc_http_client_filter};
   return grpc_client_channel_transport_setup_complete(
       channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
       mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff..8ac4dd1 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,10 @@
 
 typedef struct { void *unused; } channel_data;
 
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem,
+                                      grpc_transport_op *op) {
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
-  switch (op->type) {
-    case GRPC_RECV_METADATA:
-      grpc_call_recv_metadata(elem, &op->data.metadata);
-      break;
-    case GRPC_RECV_MESSAGE:
-      grpc_call_recv_message(elem, op->data.message);
-      op->done_cb(op->user_data, GRPC_OP_OK);
-      break;
-    case GRPC_RECV_HALF_CLOSE:
-      grpc_call_read_closed(elem);
-      break;
-    case GRPC_RECV_FINISH:
-      grpc_call_stream_closed(elem);
-      break;
-    case GRPC_RECV_SYNTHETIC_STATUS:
-      grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
-                                      op->data.synthetic_status.message);
-      break;
-    default:
-      GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
-      grpc_call_next_op(elem, op);
-  }
+  grpc_call_next_op(elem, op);
 }
 
 static void channel_op(grpc_channel_element *elem,
@@ -90,7 +68,8 @@
 }
 
 static void init_call_elem(grpc_call_element *elem,
-                           const void *transport_server_data) {}
+                           const void *transport_server_data,
+                           grpc_transport_op *initial_op) {}
 
 static void destroy_call_elem(grpc_call_element *elem) {}
 
@@ -104,6 +83,7 @@
 static void destroy_channel_elem(grpc_channel_element *elem) {}
 
 const grpc_channel_filter grpc_client_surface_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
+    client_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "client",
 };
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 7817080..c932223 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -46,22 +46,10 @@
 
 typedef struct { void *unused; } channel_data;
 
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
-                    grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem,
+                                    grpc_transport_op *op) {
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
-  switch (op->type) {
-    case GRPC_SEND_METADATA:
-      grpc_metadata_batch_destroy(&op->data.metadata);
-      grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
-                                      "Rpc sent on a lame channel.");
-      grpc_call_stream_closed(elem);
-      break;
-    default:
-      break;
-  }
-
-  op->done_cb(op->user_data, GRPC_OP_ERROR);
+  grpc_transport_op_finish_with_failure(op);
 }
 
 static void channel_op(grpc_channel_element *elem,
@@ -79,7 +67,12 @@
 }
 
 static void init_call_elem(grpc_call_element *elem,
-                           const void *transport_server_data) {}
+                           const void *transport_server_data,
+                           grpc_transport_op *initial_op) {
+  if (initial_op) {
+    grpc_transport_op_finish_with_failure(initial_op);
+  }
+}
 
 static void destroy_call_elem(grpc_call_element *elem) {}
 
@@ -93,9 +86,9 @@
 static void destroy_channel_elem(grpc_channel_element *elem) {}
 
 static const grpc_channel_filter lame_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem,
-    "lame-client",
+    lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "lame-client",
 };
 
 grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index e7d223b..3e33129 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -44,7 +44,6 @@
 #include "src/core/channel/client_setup.h"
 #include "src/core/channel/connected_channel.h"
 #include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/iomgr/resolve_address.h"
 #include "src/core/iomgr/tcp_client.h"
 #include "src/core/security/auth.h"
@@ -193,7 +192,7 @@
                                                   grpc_transport *transport,
                                                   grpc_mdctx *mdctx) {
   static grpc_channel_filter const *extra_filters[] = {
-      &grpc_client_auth_filter, &grpc_http_client_filter, &grpc_http_filter};
+      &grpc_client_auth_filter, &grpc_http_client_filter};
   return grpc_client_channel_transport_setup_complete(
       channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
       mdctx);
@@ -211,7 +210,7 @@
   grpc_arg connector_arg;
   grpc_channel_args *args_copy;
   grpc_channel_args *new_args_from_connector;
-  grpc_channel_security_connector* connector;
+  grpc_channel_security_connector *connector;
   grpc_mdctx *mdctx;
 #define MAX_FILTERS 3
   const grpc_channel_filter *filters[MAX_FILTERS];
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929..2f00ad0 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@
   grpc_call *call;
 
   call_state state;
-  gpr_timespec deadline;
   grpc_mdstr *path;
   grpc_mdstr *host;
+  gpr_timespec deadline;
+  int got_initial_metadata;
 
   legacy_data *legacy;
   grpc_completion_queue *cq_new;
 
+  grpc_stream_op_buffer *recv_ops;
+  grpc_stream_state *recv_state;
+  void (*on_done_recv)(void *user_data, int success);
+  void *recv_user_data;
+
   call_data **root[CALL_LIST_COUNT];
   call_link links[CALL_LIST_COUNT];
 };
@@ -371,46 +377,6 @@
   grpc_call_destroy(grpc_call_from_top_element(elem));
 }
 
-static void stream_closed(grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  gpr_mu_lock(&chand->server->mu);
-  switch (calld->state) {
-    case ACTIVATED:
-      break;
-    case PENDING:
-      call_list_remove(calld, PENDING_START);
-    /* fallthrough intended */
-    case NOT_STARTED:
-      calld->state = ZOMBIED;
-      grpc_iomgr_add_callback(kill_zombie, elem);
-      break;
-    case ZOMBIED:
-      break;
-  }
-  gpr_mu_unlock(&chand->server->mu);
-  grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  channel_data *chand = elem->channel_data;
-  gpr_mu_lock(&chand->server->mu);
-  switch (calld->state) {
-    case ACTIVATED:
-    case PENDING:
-      grpc_call_read_closed(elem);
-      break;
-    case NOT_STARTED:
-      calld->state = ZOMBIED;
-      grpc_iomgr_add_callback(kill_zombie, elem);
-      break;
-    case ZOMBIED:
-      break;
-  }
-  gpr_mu_unlock(&chand->server->mu);
-}
-
 static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
   grpc_call_element *elem = user_data;
   channel_data *chand = elem->channel_data;
@@ -425,33 +391,75 @@
   return md;
 }
 
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
-                    grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+  grpc_call_element *elem = ptr;
   call_data *calld = elem->call_data;
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-  switch (op->type) {
-    case GRPC_RECV_METADATA:
+  channel_data *chand = elem->channel_data;
+
+  if (success && !calld->got_initial_metadata) {
+    size_t i;
+    size_t nops = calld->recv_ops->nops;
+    grpc_stream_op *ops = calld->recv_ops->ops;
+    for (i = 0; i < nops; i++) {
+      grpc_stream_op *op = &ops[i];
+      if (op->type != GRPC_OP_METADATA) continue;
       grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
-      if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+      if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
         calld->deadline = op->data.metadata.deadline;
-        start_new_rpc(elem);
       }
+      calld->got_initial_metadata = 1;
+      start_new_rpc(elem);
       break;
-    case GRPC_RECV_MESSAGE:
-      grpc_call_recv_message(elem, op->data.message);
-      op->done_cb(op->user_data, GRPC_OP_OK);
+    }
+  }
+
+  switch (*calld->recv_state) {
+    case GRPC_STREAM_OPEN:
       break;
-    case GRPC_RECV_HALF_CLOSE:
-      read_closed(elem);
+    case GRPC_STREAM_SEND_CLOSED:
       break;
-    case GRPC_RECV_FINISH:
-      stream_closed(elem);
+    case GRPC_STREAM_RECV_CLOSED:
+      gpr_mu_lock(&chand->server->mu);
+      if (calld->state == NOT_STARTED) {
+        calld->state = ZOMBIED;
+        grpc_iomgr_add_callback(kill_zombie, elem);
+      }
+      gpr_mu_unlock(&chand->server->mu);
       break;
-    default:
-      GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
-      grpc_call_next_op(elem, op);
+    case GRPC_STREAM_CLOSED:
+      gpr_mu_lock(&chand->server->mu);
+      if (calld->state == NOT_STARTED) {
+        calld->state = ZOMBIED;
+        grpc_iomgr_add_callback(kill_zombie, elem);
+      } else if (calld->state == PENDING) {
+        call_list_remove(calld, PENDING_START);
+      }
+      gpr_mu_unlock(&chand->server->mu);
       break;
   }
+
+  calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+  call_data *calld = elem->call_data;
+
+  if (op->recv_ops) {
+    /* substitute our callback for the higher callback */
+    calld->recv_ops = op->recv_ops;
+    calld->recv_state = op->recv_state;
+    calld->on_done_recv = op->on_done_recv;
+    calld->recv_user_data = op->recv_user_data;
+    op->on_done_recv = server_on_recv;
+    op->recv_user_data = elem;
+  }
+}
+
+static void server_start_transport_op(grpc_call_element *elem,
+                                      grpc_transport_op *op) {
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+  server_mutate_op(elem, op);
+  grpc_call_next_op(elem, op);
 }
 
 static void channel_op(grpc_channel_element *elem,
@@ -502,7 +510,8 @@
 }
 
 static void init_call_elem(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   memset(calld, 0, sizeof(call_data));
@@ -514,6 +523,8 @@
   gpr_mu_unlock(&chand->server->mu);
 
   server_ref(chand->server);
+
+  if (initial_op) server_mutate_op(elem, initial_op);
 }
 
 static void destroy_call_elem(grpc_call_element *elem) {
@@ -592,8 +603,9 @@
 }
 
 static const grpc_channel_filter server_surface_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+    server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+    destroy_call_elem, sizeof(channel_data), init_channel_elem,
+    destroy_channel_elem, "server",
 };
 
 static void addcq(grpc_server *server, grpc_completion_queue *cq) {
@@ -913,6 +925,8 @@
   channel_data *c;
   listener *l;
   size_t i;
+  call_data *calld;
+
   gpr_mu_lock(&server->mu);
   if (!server->shutdown) {
     gpr_mu_unlock(&server->mu);
@@ -937,6 +951,15 @@
     gpr_free(l);
   }
 
+  while ((calld = call_list_remove_head(&server->lists[PENDING_START],
+                                        PENDING_START)) != NULL) {
+    gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
+    calld->state = ZOMBIED;
+    grpc_iomgr_add_callback(
+        kill_zombie,
+        grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+  }
+
   for (c = server->root_channel_data.next; c != &server->root_channel_data;
        c = c->next) {
     shutdown_channel(c);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219..7b5c2f2 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
 
 #include <grpc/grpc.h>
 
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/iomgr/resolve_address.h"
 #include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,8 @@
 static grpc_transport_setup_result setup_transport(void *server,
                                                    grpc_transport *transport,
                                                    grpc_mdctx *mdctx) {
-  static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
-                                                       &grpc_http_filter};
+  static grpc_channel_filter const *extra_filters[] = {
+      &grpc_http_server_filter};
   return grpc_server_setup_transport(server, transport, extra_filters,
                                      GPR_ARRAY_SIZE(extra_filters), mdctx);
 }
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5ca31d6..cf1e66b 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -481,7 +481,6 @@
         break;
       case GRPC_OP_METADATA:
         grpc_metadata_batch_assert_ok(&op->data.metadata);
-      case GRPC_OP_FLOW_CTL_CB:
         /* these just get copied as they don't impact the number of flow
            controlled bytes */
         grpc_sopb_append(outops, op, 1);
@@ -567,10 +566,6 @@
             GPR_ERROR,
             "These stream ops should be filtered out by grpc_chttp2_preencode");
         abort();
-      case GRPC_OP_FLOW_CTL_CB:
-        op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
-        curop++;
-        break;
       case GRPC_OP_METADATA:
         /* Encode a metadata batch; store the returned values, representing
            a metadata element that needs to be unreffed back into the metadata
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee28..9f90056 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,9 @@
   /* streams that are waiting to start because there are too many concurrent
      streams on the connection */
   WAITING_FOR_CONCURRENCY,
-  /* streams that want to callback the application */
-  PENDING_CALLBACKS,
-  /* streams that *ARE* calling back to the application */
-  EXECUTING_CALLBACKS,
+  /* streams that have finished reading: we wait until unlock to coalesce
+     all changes into one callback */
+  FINISHED_READ_OP,
   STREAM_LIST_COUNT /* must be last */
 } stream_list_id;
 
@@ -141,6 +140,12 @@
   DTS_FRAME
 } deframe_transport_state;
 
+typedef enum {
+  WRITE_STATE_OPEN,
+  WRITE_STATE_QUEUED_CLOSE,
+  WRITE_STATE_SENT_CLOSE
+} WRITE_STATE;
+
 typedef struct {
   stream *head;
   stream *tail;
@@ -182,6 +187,18 @@
   gpr_slice debug;
 } pending_goaway;
 
+typedef struct {
+  void (*cb)(void *user_data, int success);
+  void *user_data;
+  int success;
+} op_closure;
+
+typedef struct {
+  op_closure *callbacks;
+  size_t count;
+  size_t capacity;
+} op_closure_array;
+
 struct transport {
   grpc_transport base; /* must be first */
   const grpc_transport_callbacks *cb;
@@ -202,6 +219,10 @@
   gpr_uint8 closed;
   error_state error_state;
 
+  /* queued callbacks */
+  op_closure_array pending_callbacks;
+  op_closure_array executing_callbacks;
+
   /* stream indexing */
   gpr_uint32 next_stream_id;
   gpr_uint32 last_incoming_stream_id;
@@ -281,13 +302,13 @@
   /* when the application requests writes be closed, the write_closed is
      'queued'; when the close is flow controlled into the send path, we are
      'sending' it; when the write has been performed it is 'sent' */
-  gpr_uint8 queued_write_closed;
-  gpr_uint8 sending_write_closed;
-  gpr_uint8 sent_write_closed;
+  WRITE_STATE write_state;
+  gpr_uint8 send_closed;
   gpr_uint8 read_closed;
   gpr_uint8 cancelled;
-  gpr_uint8 allow_window_updates;
-  gpr_uint8 published_close;
+
+  op_closure send_done_closure;
+  op_closure recv_done_closure;
 
   stream_link links[STREAM_LIST_COUNT];
   gpr_uint8 included[STREAM_LIST_COUNT];
@@ -296,10 +317,14 @@
   grpc_linked_mdelem *incoming_metadata;
   size_t incoming_metadata_count;
   size_t incoming_metadata_capacity;
+  grpc_linked_mdelem *old_incoming_metadata;
   gpr_timespec incoming_deadline;
 
   /* sops from application */
-  grpc_stream_op_buffer outgoing_sopb;
+  grpc_stream_op_buffer *outgoing_sopb;
+  grpc_stream_op_buffer *incoming_sopb;
+  grpc_stream_state *publish_state;
+  grpc_stream_state published_state;
   /* sops that have passed flow control to be written */
   grpc_stream_op_buffer writing_sopb;
 
@@ -337,7 +362,8 @@
                              grpc_chttp2_error_code error_code, int send_rst);
 static void cancel_stream(transport *t, stream *s,
                           grpc_status_code local_status,
-                          grpc_chttp2_error_code error_code, int send_rst);
+                          grpc_chttp2_error_code error_code,
+                          grpc_mdstr *optional_message, int send_rst);
 static void finalize_cancellations(transport *t);
 static stream *lookup_stream(transport *t, gpr_uint32 id);
 static void remove_from_stream_map(transport *t, stream *s);
@@ -348,6 +374,14 @@
 static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
                       grpc_endpoint_cb_status error);
 
+static void schedule_cb(transport *t, op_closure closure, int success);
+static void maybe_finish_read(transport *t, stream *s);
+static void maybe_join_window_updates(transport *t, stream *s);
+static void finish_reads(transport *t);
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
+static void add_metadata_batch(transport *t, stream *s);
+
 /*
  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
  */
@@ -387,6 +421,9 @@
   }
   gpr_free(t->pings);
 
+  gpr_free(t->pending_callbacks.callbacks);
+  gpr_free(t->executing_callbacks.callbacks);
+
   for (i = 0; i < t->num_pending_goaways; i++) {
     gpr_slice_unref(t->pending_goaways[i].debug);
   }
@@ -416,6 +453,8 @@
 
   GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
 
+  memset(t, 0, sizeof(*t));
+
   t->base.vtable = &vtable;
   t->ep = ep;
   /* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +466,16 @@
   t->str_grpc_timeout =
       grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
   t->reading = 1;
-  t->writing = 0;
   t->error_state = ERROR_STATE_NONE;
   t->next_stream_id = is_client ? 1 : 2;
-  t->last_incoming_stream_id = 0;
-  t->destroying = 0;
-  t->closed = 0;
   t->is_client = is_client;
   t->outgoing_window = DEFAULT_WINDOW;
   t->incoming_window = DEFAULT_WINDOW;
   t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
   t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
-  t->expect_continuation_stream_id = 0;
-  t->pings = NULL;
-  t->ping_count = 0;
-  t->ping_capacity = 0;
   t->ping_counter = gpr_now().tv_nsec;
   grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
   grpc_chttp2_goaway_parser_init(&t->goaway_parser);
-  t->pending_goaways = NULL;
-  t->num_pending_goaways = 0;
-  t->cap_pending_goaways = 0;
   gpr_slice_buffer_init(&t->outbuf);
   gpr_slice_buffer_init(&t->qbuf);
   grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +490,6 @@
      needed.
      TODO(ctiller): tune this */
   grpc_chttp2_stream_map_init(&t->stream_map, 8);
-  memset(&t->lists, 0, sizeof(t->lists));
 
   /* copy in initial settings to all setting sets */
   for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -503,7 +530,7 @@
 
   gpr_mu_lock(&t->mu);
   t->calling_back = 1;
-  ref_transport(t);
+  ref_transport(t); /* matches unref at end of this function */
   gpr_mu_unlock(&t->mu);
 
   sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +542,7 @@
   if (t->destroying) gpr_cv_signal(&t->cv);
   unlock(t);
 
-  ref_transport(t);
+  ref_transport(t); /* matches unref inside recv_data */
   recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
 
   unref_transport(t);
@@ -573,16 +600,19 @@
 }
 
 static int init_stream(grpc_transport *gt, grpc_stream *gs,
-                       const void *server_data) {
+                       const void *server_data, grpc_transport_op *initial_op) {
   transport *t = (transport *)gt;
   stream *s = (stream *)gs;
 
+  memset(s, 0, sizeof(*s));
+
   ref_transport(t);
 
   if (!server_data) {
     lock(t);
     s->id = 0;
   } else {
+    /* already locked */
     s->id = (gpr_uint32)(gpr_uintptr)server_data;
     t->incoming_stream = s;
     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@@ -592,24 +622,13 @@
       t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
   s->incoming_window =
       t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
-  s->queued_write_closed = 0;
-  s->sending_write_closed = 0;
-  s->sent_write_closed = 0;
-  s->read_closed = 0;
-  s->cancelled = 0;
-  s->allow_window_updates = 0;
-  s->published_close = 0;
-  s->incoming_metadata_count = 0;
-  s->incoming_metadata_capacity = 0;
-  s->incoming_metadata = NULL;
   s->incoming_deadline = gpr_inf_future;
-  memset(&s->links, 0, sizeof(s->links));
-  memset(&s->included, 0, sizeof(s->included));
-  grpc_sopb_init(&s->outgoing_sopb);
   grpc_sopb_init(&s->writing_sopb);
   grpc_sopb_init(&s->callback_sopb);
   grpc_chttp2_data_parser_init(&s->parser);
 
+  if (initial_op) perform_op_locked(t, s, initial_op);
+
   if (!server_data) {
     unlock(t);
   }
@@ -642,10 +661,16 @@
 
   gpr_mu_unlock(&t->mu);
 
-  grpc_sopb_destroy(&s->outgoing_sopb);
+  GPR_ASSERT(s->outgoing_sopb == NULL);
+  GPR_ASSERT(s->incoming_sopb == NULL);
   grpc_sopb_destroy(&s->writing_sopb);
   grpc_sopb_destroy(&s->callback_sopb);
   grpc_chttp2_data_parser_destroy(&s->parser);
+  for (i = 0; i < s->incoming_metadata_count; i++) {
+    grpc_mdelem_unref(s->incoming_metadata[i].md);
+  }
+  gpr_free(s->incoming_metadata);
+  gpr_free(s->old_incoming_metadata);
 
   unref_transport(t);
 }
@@ -708,8 +733,6 @@
 }
 
 static void stream_list_join(transport *t, stream *s, stream_list_id id) {
-  if (id == PENDING_CALLBACKS)
-    GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
   if (s->included[id]) {
     return;
   }
@@ -718,6 +741,8 @@
 
 static void remove_from_stream_map(transport *t, stream *s) {
   if (s->id == 0) return;
+  IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
+                     t->is_client ? "CLI" : "SVR", s->id));
   if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
     maybe_start_some_streams(t);
   }
@@ -762,6 +787,8 @@
     finalize_cancellations(t);
   }
 
+  finish_reads(t);
+
   /* gather any callbacks that need to be made */
   if (!t->calling_back && cb) {
     perform_callbacks = prepare_callbacks(t);
@@ -865,21 +892,24 @@
   while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
          s->outgoing_window > 0) {
     window_delta = grpc_chttp2_preencode(
-        s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+        s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
         GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
     t->outgoing_window -= window_delta;
     s->outgoing_window -= window_delta;
 
-    s->sending_write_closed =
-        s->queued_write_closed && s->outgoing_sopb.nops == 0;
-    if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+    if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
+        s->outgoing_sopb->nops == 0) {
+      s->send_closed = 1;
+    }
+    if (s->writing_sopb.nops > 0 || s->send_closed) {
       stream_list_join(t, s, WRITING);
     }
 
-    /* if there are still writes to do and the stream still has window
-       available, then schedule a further write */
-    if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
-      GPR_ASSERT(!t->outgoing_window);
+    /* we should either exhaust window or have no ops left, but not both */
+    if (s->outgoing_sopb->nops == 0) {
+      s->outgoing_sopb = NULL;
+      schedule_cb(t, s->send_done_closure, 1);
+    } else if (s->outgoing_window) {
       stream_list_add_tail(t, s, WRITABLE);
     }
   }
@@ -912,10 +942,9 @@
 
   while ((s = stream_list_remove_head(t, WRITING))) {
     grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
-                       s->sending_write_closed, s->id, &t->hpack_compressor,
-                       &t->outbuf);
+                       s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
     s->writing_sopb.nops = 0;
-    if (s->sending_write_closed) {
+    if (s->send_closed) {
       stream_list_join(t, s, WRITTEN_CLOSED);
     }
   }
@@ -929,8 +958,10 @@
     drop_connection(t);
   }
   while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
-    s->sent_write_closed = 1;
-    if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
+    s->write_state = WRITE_STATE_SENT_CLOSE;
+    if (1||!s->cancelled) {
+      maybe_finish_read(t, s);
+    }
   }
   t->outbuf.count = 0;
   t->outbuf.length = 0;
@@ -980,6 +1011,9 @@
     stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
     if (!s) break;
 
+    IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
+                       t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+
     GPR_ASSERT(s->id == 0);
     s->id = t->next_stream_id;
     t->next_stream_id += 2;
@@ -988,43 +1022,63 @@
   }
 }
 
-static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
-                       size_t ops_count, int is_last) {
-  transport *t = (transport *)gt;
-  stream *s = (stream *)gs;
-
-  lock(t);
-
-  if (is_last) {
-    s->queued_write_closed = 1;
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+  if (op->cancel_with_status != GRPC_STATUS_OK) {
+    cancel_stream(
+        t, s, op->cancel_with_status,
+        grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
+        op->cancel_message, 1);
   }
-  if (!s->cancelled) {
-    grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
-    if (s->id == 0) {
-      stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
-      maybe_start_some_streams(t);
+
+  if (op->send_ops) {
+    GPR_ASSERT(s->outgoing_sopb == NULL);
+    s->send_done_closure.cb = op->on_done_send;
+    s->send_done_closure.user_data = op->send_user_data;
+    if (!s->cancelled) {
+      s->outgoing_sopb = op->send_ops;
+      if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
+        s->write_state = WRITE_STATE_QUEUED_CLOSE;
+      }
+      if (s->id == 0) {
+        IF_TRACING(gpr_log(GPR_DEBUG,
+                           "HTTP:%s: New stream %p waiting for concurrency",
+                           t->is_client ? "CLI" : "SVR", s));
+        stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+        maybe_start_some_streams(t);
+      } else if (s->outgoing_window > 0) {
+        stream_list_join(t, s, WRITABLE);
+      }
     } else {
-      stream_list_join(t, s, WRITABLE);
+      schedule_nuke_sopb(t, op->send_ops);
+      schedule_cb(t, s->send_done_closure, 0);
     }
-  } else {
-    grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
-  }
-  if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
-      !s->published_close) {
-    stream_list_join(t, s, PENDING_CALLBACKS);
   }
 
-  unlock(t);
+  if (op->recv_ops) {
+    GPR_ASSERT(s->incoming_sopb == NULL);
+    s->recv_done_closure.cb = op->on_done_recv;
+    s->recv_done_closure.user_data = op->recv_user_data;
+    s->incoming_sopb = op->recv_ops;
+    s->incoming_sopb->nops = 0;
+    s->publish_state = op->recv_state;
+    gpr_free(s->old_incoming_metadata);
+    s->old_incoming_metadata = NULL;
+    maybe_finish_read(t, s);
+    maybe_join_window_updates(t, s);
+  }
+
+  if (op->bind_pollset) {
+    add_to_pollset_locked(t, op->bind_pollset);
+  }
 }
 
-static void abort_stream(grpc_transport *gt, grpc_stream *gs,
-                         grpc_status_code status) {
+static void perform_op(grpc_transport *gt, grpc_stream *gs,
+                       grpc_transport_op *op) {
   transport *t = (transport *)gt;
   stream *s = (stream *)gs;
 
   lock(t);
-  cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
-                1);
+  perform_op_locked(t, s, op);
   unlock(t);
 }
 
@@ -1063,8 +1117,8 @@
 
   while ((s = stream_list_remove_head(t, CANCELLED))) {
     s->read_closed = 1;
-    s->sent_write_closed = 1;
-    stream_list_join(t, s, PENDING_CALLBACKS);
+    s->write_state = WRITE_STATE_SENT_CLOSE;
+    maybe_finish_read(t, s);
   }
 }
 
@@ -1082,18 +1136,24 @@
 static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
                                 grpc_status_code local_status,
                                 grpc_chttp2_error_code error_code,
-                                int send_rst) {
+                                grpc_mdstr *optional_message, int send_rst) {
   int had_outgoing;
   char buffer[GPR_LTOA_MIN_BUFSIZE];
 
   if (s) {
     /* clear out any unreported input & output: nobody cares anymore */
-    had_outgoing = s->outgoing_sopb.nops != 0;
+    had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
     schedule_nuke_sopb(t, &s->parser.incoming_sopb);
-    schedule_nuke_sopb(t, &s->outgoing_sopb);
+    if (s->outgoing_sopb) {
+      schedule_nuke_sopb(t, s->outgoing_sopb);
+      s->outgoing_sopb = NULL;
+      stream_list_remove(t, s, WRITABLE);
+      schedule_cb(t, s->send_done_closure, 0);
+    }
     if (s->cancelled) {
       send_rst = 0;
-    } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
+    } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
+               had_outgoing) {
       s->cancelled = 1;
       stream_list_join(t, s, CANCELLED);
 
@@ -1101,17 +1161,26 @@
       add_incoming_metadata(
           t, s,
           grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
-      switch (local_status) {
-        case GRPC_STATUS_CANCELLED:
-          add_incoming_metadata(
-              t, s, grpc_mdelem_from_strings(t->metadata_context,
-                                             "grpc-message", "Cancelled"));
-          break;
-        default:
-          break;
+      if (!optional_message) {
+        switch (local_status) {
+          case GRPC_STATUS_CANCELLED:
+            add_incoming_metadata(
+                t, s, grpc_mdelem_from_strings(t->metadata_context,
+                                               "grpc-message", "Cancelled"));
+            break;
+          default:
+            break;
+        }
+      } else {
+        add_incoming_metadata(
+            t, s,
+            grpc_mdelem_from_metadata_strings(
+                t->metadata_context,
+                grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+                grpc_mdstr_ref(optional_message)));
       }
-
-      stream_list_join(t, s, PENDING_CALLBACKS);
+      add_metadata_batch(t, s);
+      maybe_finish_read(t, s);
     }
   }
   if (!id) send_rst = 0;
@@ -1119,24 +1188,29 @@
     gpr_slice_buffer_add(&t->qbuf,
                          grpc_chttp2_rst_stream_create(id, error_code));
   }
+  if (optional_message) {
+    grpc_mdstr_unref(optional_message);
+  }
 }
 
 static void cancel_stream_id(transport *t, gpr_uint32 id,
                              grpc_status_code local_status,
                              grpc_chttp2_error_code error_code, int send_rst) {
   cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
-                      send_rst);
+                      NULL, send_rst);
 }
 
 static void cancel_stream(transport *t, stream *s,
                           grpc_status_code local_status,
-                          grpc_chttp2_error_code error_code, int send_rst) {
-  cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+                          grpc_chttp2_error_code error_code,
+                          grpc_mdstr *optional_message, int send_rst) {
+  cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
+                      send_rst);
 }
 
 static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
   cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
-                GRPC_CHTTP2_INTERNAL_ERROR, 0);
+                GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
 }
 
 static void end_all_the_calls(transport *t) {
@@ -1150,8 +1224,14 @@
   end_all_the_calls(t);
 }
 
+static void maybe_finish_read(transport *t, stream *s) {
+  if (s->incoming_sopb) {
+    stream_list_join(t, s, FINISHED_READ_OP);
+  }
+}
+
 static void maybe_join_window_updates(transport *t, stream *s) {
-  if (s->allow_window_updates &&
+  if (s->incoming_sopb != NULL &&
       s->incoming_window <
           t->settings[LOCAL_SETTINGS]
                      [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
@@ -1160,21 +1240,6 @@
   }
 }
 
-static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
-                                     int allow) {
-  transport *t = (transport *)tp;
-  stream *s = (stream *)sp;
-
-  lock(t);
-  s->allow_window_updates = allow;
-  if (allow) {
-    maybe_join_window_updates(t, s);
-  } else {
-    stream_list_remove(t, s, WINDOW_UPDATE);
-  }
-  unlock(t);
-}
-
 static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
   if (t->incoming_frame_size > t->incoming_window) {
     gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
@@ -1248,7 +1313,7 @@
     case GRPC_CHTTP2_STREAM_ERROR:
       cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
                               GRPC_CHTTP2_INTERNAL_ERROR),
-                    GRPC_CHTTP2_INTERNAL_ERROR, 1);
+                    GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
       return init_skip_frame(t, 0);
     case GRPC_CHTTP2_CONNECTION_ERROR:
       drop_connection(t);
@@ -1267,11 +1332,10 @@
 
   GPR_ASSERT(s);
 
-  IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
-                     grpc_mdstr_as_c_string(md->key),
-                     grpc_mdstr_as_c_string(md->value)));
+  IF_TRACING(gpr_log(
+      GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
+      grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
 
-  stream_list_join(t, s, PENDING_CALLBACKS);
   if (md->key == t->str_grpc_timeout) {
     gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
     if (!cached_timeout) {
@@ -1290,6 +1354,7 @@
   } else {
     add_incoming_metadata(t, s, md);
   }
+  maybe_finish_read(t, s);
 }
 
 static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1464,33 +1529,20 @@
   return window + window_update < MAX_WINDOW;
 }
 
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
 static void add_metadata_batch(transport *t, stream *s) {
   grpc_metadata_batch b;
-  size_t i;
 
-  b.list.head = &s->incoming_metadata[0];
-  b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
+  b.list.head = NULL;
+  /* Store away the last element of the list, so that in patch_metadata_ops
+     we can reconstitute the list.
+     We can't do list building here as later incoming metadata may reallocate
+     the underlying array. */
+  b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
   b.garbage.head = b.garbage.tail = NULL;
   b.deadline = s->incoming_deadline;
-
-  for (i = 1; i < s->incoming_metadata_count; i++) {
-    s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
-    s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
-  }
-  s->incoming_metadata[0].prev = NULL;
-  s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
+  s->incoming_deadline = gpr_inf_future;
 
   grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
-  grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
-                            s->incoming_metadata);
-
-  /* reset */
-  s->incoming_deadline = gpr_inf_future;
-  s->incoming_metadata = NULL;
-  s->incoming_metadata_count = 0;
-  s->incoming_metadata_capacity = 0;
 }
 
 static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
@@ -1501,14 +1553,14 @@
     case GRPC_CHTTP2_PARSE_OK:
       if (st.end_of_stream) {
         t->incoming_stream->read_closed = 1;
-        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+        maybe_finish_read(t, t->incoming_stream);
       }
       if (st.need_flush_reads) {
-        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+        maybe_finish_read(t, t->incoming_stream);
       }
       if (st.metadata_boundary) {
         add_metadata_batch(t, t->incoming_stream);
-        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+        maybe_finish_read(t, t->incoming_stream);
       }
       if (st.ack_settings) {
         gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
@@ -1545,11 +1597,11 @@
       }
       if (st.initial_window_update) {
         for (i = 0; i < t->stream_map.count; i++) {
-          stream *s = (stream*)(t->stream_map.values[i]);
+          stream *s = (stream *)(t->stream_map.values[i]);
           int was_window_empty = s->outgoing_window <= 0;
           s->outgoing_window += st.initial_window_update;
-          if (was_window_empty && s->outgoing_window > 0 &&
-              s->outgoing_sopb.nops > 0) {
+          if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
+              s->outgoing_sopb->nops > 0) {
             stream_list_join(t, s, WRITABLE);
           }
         }
@@ -1563,12 +1615,13 @@
             if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
               cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
                                       GRPC_CHTTP2_FLOW_CONTROL_ERROR),
-                            GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+                            GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
             } else {
               s->outgoing_window += st.window_update;
               /* if this window update makes outgoing ops writable again,
                  flag that */
-              if (was_window_empty && s->outgoing_sopb.nops) {
+              if (was_window_empty && s->outgoing_sopb &&
+                  s->outgoing_sopb->nops > 0) {
                 stream_list_join(t, s, WRITABLE);
               }
             }
@@ -1830,53 +1883,114 @@
   return GRPC_STREAM_OPEN;
 }
 
-static int prepare_callbacks(transport *t) {
-  stream *s;
-  int n = 0;
-  while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
-    int execute = 1;
+static void patch_metadata_ops(stream *s) {
+  grpc_stream_op *ops = s->incoming_sopb->ops;
+  size_t nops = s->incoming_sopb->nops;
+  size_t i;
+  size_t j;
+  size_t mdidx = 0;
+  size_t last_mdidx;
 
-    s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
-    if (s->callback_state == GRPC_STREAM_CLOSED) {
-      remove_from_stream_map(t, s);
-      if (s->published_close) {
-        execute = 0;
-      } else if (s->incoming_metadata_count) {
-        add_metadata_batch(t, s);
-      }
-      s->published_close = 1;
+  for (i = 0; i < nops; i++) {
+    grpc_stream_op *op = &ops[i];
+    if (op->type != GRPC_OP_METADATA) continue;
+    last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
+    GPR_ASSERT(last_mdidx > mdidx);
+    GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
+    op->data.metadata.list.head = &s->incoming_metadata[mdidx];
+    op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
+    for (j = mdidx + 1; j < last_mdidx; j++) {
+      s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
+      s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
     }
+    s->incoming_metadata[mdidx].prev = NULL;
+    s->incoming_metadata[last_mdidx-1].next = NULL;
+    mdidx = last_mdidx;
+  }
+  GPR_ASSERT(mdidx == s->incoming_metadata_count);
+  s->old_incoming_metadata = s->incoming_metadata;
+  s->incoming_metadata = NULL;
+  s->incoming_metadata_count = 0;
+  s->incoming_metadata_capacity = 0;
+}
 
-    grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
+static void finish_reads(transport *t) {
+  stream *s;
 
-    if (execute) {
-      stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
-      n = 1;
+  while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+    int publish = 0;
+    GPR_ASSERT(s->incoming_sopb);
+    *s->publish_state =
+        compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
+    if (*s->publish_state != s->published_state) {
+      s->published_state = *s->publish_state;
+      publish = 1;
+      if (s->published_state == GRPC_STREAM_CLOSED) {
+        remove_from_stream_map(t, s);
+      }
+    }
+    if (s->parser.incoming_sopb.nops > 0) {
+      grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+      publish = 1;
+    }
+    if (publish) {
+      if (s->incoming_metadata_count > 0) {
+        patch_metadata_ops(s);
+      }
+      s->incoming_sopb = NULL;
+      schedule_cb(t, s->recv_done_closure, 1);
     }
   }
-  return n;
+
+}
+
+static void schedule_cb(transport *t, op_closure closure, int success) {
+  if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
+    t->pending_callbacks.capacity =
+        GPR_MAX(t->pending_callbacks.capacity * 2, 8);
+    t->pending_callbacks.callbacks =
+        gpr_realloc(t->pending_callbacks.callbacks,
+                    t->pending_callbacks.capacity *
+                        sizeof(*t->pending_callbacks.callbacks));
+  }
+  closure.success = success;
+  t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
+}
+
+static int prepare_callbacks(transport *t) {
+  op_closure_array temp = t->pending_callbacks;
+  t->pending_callbacks = t->executing_callbacks;
+  t->executing_callbacks = temp;
+  return t->executing_callbacks.count > 0;
 }
 
 static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
-  stream *s;
-  while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
-    size_t nops = s->callback_sopb.nops;
-    s->callback_sopb.nops = 0;
-    cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
-                   s->callback_sopb.ops, nops, s->callback_state);
+  size_t i;
+  for (i = 0; i < t->executing_callbacks.count; i++) {
+    op_closure c = t->executing_callbacks.callbacks[i];
+    c.cb(c.user_data, c.success);
   }
+  t->executing_callbacks.count = 0;
 }
 
 static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
   cb->closed(t->cb_user_data, &t->base);
 }
 
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
-  transport *t = (transport *)gt;
-  lock(t);
+/*
+ * POLLSET STUFF
+ */
+
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
   if (t->ep) {
     grpc_endpoint_add_to_pollset(t->ep, pollset);
   }
+}
+
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+  transport *t = (transport *)gt;
+  lock(t);
+  add_to_pollset_locked(t, pollset);
   unlock(t);
 }
 
@@ -1885,9 +1999,9 @@
  */
 
 static const grpc_transport_vtable vtable = {
-    sizeof(stream), init_stream, send_batch, set_allow_window_updates,
-    add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
-    send_ping, destroy_transport};
+    sizeof(stream),  init_stream,    perform_op,
+    add_to_pollset,  destroy_stream, goaway,
+    close_transport, send_ping,      destroy_transport};
 
 void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
                                   void *arg,
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 882c078..e1a75ad 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -81,9 +81,6 @@
       case GRPC_OP_METADATA:
         grpc_metadata_batch_destroy(&ops[i].data.metadata);
         break;
-      case GRPC_OP_FLOW_CTL_CB:
-        ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
-        break;
       case GRPC_NO_OP:
       case GRPC_OP_BEGIN_MESSAGE:
         break;
@@ -91,34 +88,20 @@
   }
 }
 
-static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) {
-#ifndef NDEBUG
-  size_t i;
-  for (i = 0; i < nops; i++) {
-    if (ops[i].type == GRPC_OP_METADATA) {
-      grpc_metadata_batch_assert_ok(&ops[i].data.metadata);
-    }
-  }
-#endif /* NDEBUG */
-}
-
 static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
   sopb->capacity = new_capacity;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
   if (sopb->ops == sopb->inlined_ops) {
     sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
     memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
   } else {
     sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
   }
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
   grpc_stream_op *out;
 
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
-
+  GPR_ASSERT(sopb->nops <= sopb->capacity);
   if (sopb->nops == sopb->capacity) {
     expandto(sopb, GROW(sopb->capacity));
   }
@@ -129,7 +112,6 @@
 
 void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
   add(sopb)->type = GRPC_NO_OP;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
@@ -138,34 +120,19 @@
   op->type = GRPC_OP_BEGIN_MESSAGE;
   op->data.begin_message.length = length;
   op->data.begin_message.flags = flags;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
                             grpc_metadata_batch b) {
   grpc_stream_op *op = add(sopb);
-  grpc_metadata_batch_assert_ok(&b);
   op->type = GRPC_OP_METADATA;
   op->data.metadata = b;
-  grpc_metadata_batch_assert_ok(&op->data.metadata);
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
   grpc_stream_op *op = add(sopb);
   op->type = GRPC_OP_SLICE;
   op->data.slice = slice;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
-}
-
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
-                               void (*cb)(void *arg, grpc_op_error error),
-                               void *arg) {
-  grpc_stream_op *op = add(sopb);
-  op->type = GRPC_OP_FLOW_CTL_CB;
-  op->data.flow_ctl_cb.cb = cb;
-  op->data.flow_ctl_cb.arg = arg;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
@@ -173,15 +140,12 @@
   size_t orig_nops = sopb->nops;
   size_t new_nops = orig_nops + nops;
 
-  assert_contained_metadata_ok(ops, nops);
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
   if (new_nops > sopb->capacity) {
     expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
   }
 
   memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
   sopb->nops = new_nops;
-  assert_contained_metadata_ok(sopb->ops, sopb->nops);
 }
 
 static void assert_valid_list(grpc_mdelem_list *list) {
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 20146b9..95497a3 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -55,9 +55,7 @@
   GRPC_OP_BEGIN_MESSAGE,
   /* Add a slice of data to the current message/metadata element/status.
      Must not overflow the forward declared length. */
-  GRPC_OP_SLICE,
-  /* Call some function once this operation has passed flow control. */
-  GRPC_OP_FLOW_CTL_CB
+  GRPC_OP_SLICE
 } grpc_stream_op_code;
 
 /* Arguments for GRPC_OP_BEGIN */
@@ -68,12 +66,6 @@
   gpr_uint32 flags;
 } grpc_begin_message;
 
-/* Arguments for GRPC_OP_FLOW_CTL_CB */
-typedef struct grpc_flow_ctl_cb {
-  void (*cb)(void *arg, grpc_op_error error);
-  void *arg;
-} grpc_flow_ctl_cb;
-
 typedef struct grpc_linked_mdelem {
   grpc_mdelem *md;
   struct grpc_linked_mdelem *next;
@@ -94,29 +86,31 @@
 void grpc_metadata_batch_init(grpc_metadata_batch *comd);
 void grpc_metadata_batch_destroy(grpc_metadata_batch *comd);
 void grpc_metadata_batch_merge(grpc_metadata_batch *target,
-                                 grpc_metadata_batch *add);
+                               grpc_metadata_batch *add);
 
 void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
-                                     grpc_linked_mdelem *storage);
+                                   grpc_linked_mdelem *storage);
 void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
-                                     grpc_linked_mdelem *storage);
+                                   grpc_linked_mdelem *storage);
 
 void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
-                                    grpc_linked_mdelem *storage,
-                                    grpc_mdelem *elem_to_add);
+                                  grpc_linked_mdelem *storage,
+                                  grpc_mdelem *elem_to_add);
 void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
-                                    grpc_linked_mdelem *storage,
-                                    grpc_mdelem *elem_to_add);
+                                  grpc_linked_mdelem *storage,
+                                  grpc_mdelem *elem_to_add);
 
 void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
-                                  grpc_mdelem *(*filter)(void *user_data,
-                                                         grpc_mdelem *elem),
-                                  void *user_data);
+                                grpc_mdelem *(*filter)(void *user_data,
+                                                       grpc_mdelem *elem),
+                                void *user_data);
 
 #ifndef NDEBUG
 void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
 #else
-#define grpc_metadata_batch_assert_ok(comd) do {} while (0)
+#define grpc_metadata_batch_assert_ok(comd) \
+  do {                                      \
+  } while (0)
 #endif
 
 /* Represents a single operation performed on a stream/transport */
@@ -129,7 +123,6 @@
     grpc_begin_message begin_message;
     grpc_metadata_batch metadata;
     gpr_slice slice;
-    grpc_flow_ctl_cb flow_ctl_cb;
   } data;
 } grpc_stream_op;
 
@@ -160,15 +153,14 @@
 /* Append a GRPC_OP_BEGIN to a buffer */
 void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
                                  gpr_uint32 flags);
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
+void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
+                            grpc_metadata_batch metadata);
 /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
 void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
-                               void (*cb)(void *arg, grpc_op_error error),
-                               void *arg);
 /* Append a buffer to a buffer - does not ref/unref any internal objects */
 void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
                       size_t nops);
 
-#endif  /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index ef0020d..d9a1319 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -52,18 +52,15 @@
 }
 
 int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
-                               const void *server_data) {
-  return transport->vtable->init_stream(transport, stream, server_data);
+                               const void *server_data,
+                               grpc_transport_op *initial_op) {
+  return transport->vtable->init_stream(transport, stream, server_data,
+                                        initial_op);
 }
 
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
-                               grpc_stream_op *ops, size_t nops, int is_last) {
-  transport->vtable->send_batch(transport, stream, ops, nops, is_last);
-}
-
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
-                                             grpc_stream *stream, int allow) {
-  transport->vtable->set_allow_window_updates(transport, stream, allow);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+                               grpc_transport_op *op) {
+  transport->vtable->perform_op(transport, stream, op);
 }
 
 void grpc_transport_add_to_pollset(grpc_transport *transport,
@@ -76,11 +73,6 @@
   transport->vtable->destroy_stream(transport, stream);
 }
 
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
-                                 grpc_status_code status) {
-  transport->vtable->abort_stream(transport, stream, status);
-}
-
 void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
                          void *user_data) {
   transport->vtable->ping(transport, cb, user_data);
@@ -93,3 +85,23 @@
 void grpc_transport_setup_initiate(grpc_transport_setup *setup) {
   setup->vtable->initiate(setup);
 }
+
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
+  if (op->send_ops) {
+    op->on_done_send(op->send_user_data, 0);
+  }
+  if (op->recv_ops) {
+    op->on_done_recv(op->recv_user_data, 0);
+  }
+}
+
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+                                        grpc_status_code status,
+                                        grpc_mdstr *message) {
+  if (op->cancel_with_status == GRPC_STATUS_OK) {
+    op->cancel_with_status = status;
+    op->cancel_message = message;
+  } else if (message) {
+    grpc_mdstr_unref(message);
+  }
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index ce8c17c..cdea0b9 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -60,26 +60,26 @@
   GRPC_STREAM_CLOSED
 } grpc_stream_state;
 
+/* Transport op: a set of operations to perform on a transport */
+typedef struct grpc_transport_op {
+  grpc_stream_op_buffer *send_ops;
+  int is_last_send;
+  void (*on_done_send)(void *user_data, int success);
+  void *send_user_data;
+
+  grpc_stream_op_buffer *recv_ops;
+  grpc_stream_state *recv_state;
+  void (*on_done_recv)(void *user_data, int success);
+  void *recv_user_data;
+
+  grpc_pollset *bind_pollset;
+
+  grpc_status_code cancel_with_status;
+  grpc_mdstr *cancel_message;
+} grpc_transport_op;
+
 /* Callbacks made from the transport to the upper layers of grpc. */
 struct grpc_transport_callbacks {
-  /* Allocate a buffer to receive data into.
-     It's safe to call grpc_slice_new() to do this, but performance minded
-     proxies may want to carefully place data into optimal locations for
-     transports.
-     This function must return a valid, non-empty slice.
-
-     Arguments:
-       user_data - the transport user data set at transport creation time
-       transport - the grpc_transport instance making this call
-       stream    - the grpc_stream instance the buffer will be used for, or
-                   NULL if this is not known
-       size_hint - how big of a buffer would the transport optimally like?
-                   the actual returned buffer can be smaller or larger than
-                   size_hint as the implementation finds convenient */
-  struct gpr_slice (*alloc_recv_buffer)(void *user_data,
-                                        grpc_transport *transport,
-                                        grpc_stream *stream, size_t size_hint);
-
   /* Initialize a new stream on behalf of the transport.
      Must result in a call to
      grpc_transport_init_stream(transport, ..., request) in the same call
@@ -96,28 +96,6 @@
   void (*accept_stream)(void *user_data, grpc_transport *transport,
                         const void *server_data);
 
-  /* Process a set of stream ops that have been received by the transport.
-     Called by network threads, so must be careful not to block on network
-     activity.
-
-     If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to
-     call grpc_transport_destroy_stream.
-
-     Ownership of any objects contained in ops is transferred to the callee.
-
-     Arguments:
-       user_data   - the transport user data set at transport creation time
-       transport   - the grpc_transport instance making this call
-       stream      - the stream this data was received for
-       ops         - stream operations that are part of this batch
-       ops_count   - the number of stream operations in this batch
-       final_state - the state of the stream as of the final operation in this
-                     batch */
-  void (*recv_batch)(void *user_data, grpc_transport *transport,
-                     grpc_stream *stream, grpc_stream_op *ops, size_t ops_count,
-                     grpc_stream_state final_state);
-
-  /* The transport received a goaway */
   void (*goaway)(void *user_data, grpc_transport *transport,
                  grpc_status_code status, gpr_slice debug);
 
@@ -139,7 +117,8 @@
      server_data - either NULL for a client initiated stream, or a pointer
                    supplied from the accept_stream callback function */
 int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
-                               const void *server_data);
+                               const void *server_data,
+                               grpc_transport_op *initial_op);
 
 /* Destroy transport data for a stream.
 
@@ -154,20 +133,17 @@
 void grpc_transport_destroy_stream(grpc_transport *transport,
                                    grpc_stream *stream);
 
-/* Enable/disable incoming data for a stream.
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
 
-   This effectively disables new window becoming available for a given stream,
-   but does not prevent existing window from being consumed by a sender: the
-   caller must still be prepared to receive some additional data after this
-   call.
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+                                        grpc_status_code status,
+                                        grpc_mdstr *message);
 
-   Arguments:
-     transport - the transport on which to create this stream
-     stream    - the grpc_stream to destroy (memory is still owned by the
-                 caller, but any child memory must be cleaned up)
-     allow     - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
-                                             grpc_stream *stream, int allow);
+/* TODO(ctiller): remove this */
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+                                   grpc_pollset *pollset);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
 
 /* Send a batch of operations on a transport
 
@@ -177,13 +153,9 @@
      transport - the transport on which to initiate the stream
      stream    - the stream on which to send the operations. This must be
                  non-NULL and previously initialized by the same transport.
-     ops       - an array of operations to apply to the stream - can be NULL
-                 if ops_count == 0.
-     ops_count - the number of elements in ops
-     is_last   - is this the last batch of operations to be sent out */
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
-                               grpc_stream_op *ops, size_t ops_count,
-                               int is_last);
+     op        - a grpc_transport_op specifying the op to perform */
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+                               grpc_transport_op *op);
 
 /* Send a ping on a transport
 
@@ -193,19 +165,6 @@
 void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
                          void *user_data);
 
-/* Abort a stream
-
-   Terminate reading and writing for a stream. A final recv_batch with no
-   operations and final_state == GRPC_STREAM_CLOSED will be received locally,
-   and no more data will be presented to the up-layer.
-
-   TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
-                                 grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
-                                   grpc_pollset *pollset);
-
 /* Advise peer of pending connection termination. */
 void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
                            gpr_slice debug_data);
@@ -254,4 +213,4 @@
    used as a destruction call by setup). */
 void grpc_transport_setup_cancel(grpc_transport_setup *setup);
 
-#endif  /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ac275c7..479e153 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,15 +43,11 @@
 
   /* implementation of grpc_transport_init_stream */
   int (*init_stream)(grpc_transport *self, grpc_stream *stream,
-                     const void *server_data);
+                     const void *server_data, grpc_transport_op *initial_op);
 
   /* implementation of grpc_transport_send_batch */
-  void (*send_batch)(grpc_transport *self, grpc_stream *stream,
-                     grpc_stream_op *ops, size_t ops_count, int is_last);
-
-  /* implementation of grpc_transport_set_allow_window_updates */
-  void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
-                                   int allow);
+  void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+                     grpc_transport_op *op);
 
   /* implementation of grpc_transport_add_to_pollset */
   void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
@@ -59,10 +55,6 @@
   /* implementation of grpc_transport_destroy_stream */
   void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
 
-  /* implementation of grpc_transport_abort_stream */
-  void (*abort_stream)(grpc_transport *self, grpc_stream *stream,
-                       grpc_status_code status);
-
   /* implementation of grpc_transport_goaway */
   void (*goaway)(grpc_transport *self, grpc_status_code status,
                  gpr_slice debug_data);
@@ -84,4 +76,4 @@
   const grpc_transport_vtable *vtable;
 };
 
-#endif  /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000..04c1d79
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,161 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+  gpr_strvec_add(b, gpr_strdup("key="));
+  gpr_strvec_add(
+      b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+                     GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+  gpr_strvec_add(b, gpr_strdup(" value="));
+  gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+                                GPR_SLICE_LENGTH(md->value->slice),
+                                GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+  grpc_linked_mdelem *m;
+  for (m = md.list.head; m != NULL; m = m->next) {
+    if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
+    put_metadata(b, m->md);
+  }
+  if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+    char *tmp;
+    gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+                 md.deadline.tv_nsec);
+    gpr_strvec_add(b, tmp);
+  }
+}
+
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
+  char *out;
+  char *tmp;
+  size_t i;
+  gpr_strvec b;
+  gpr_strvec_init(&b);
+
+  for (i = 0; i < sopb->nops; i++) {
+    grpc_stream_op *op = &sopb->ops[i];
+    if (i > 0) gpr_strvec_add(&b, gpr_strdup(", "));
+    switch (op->type) {
+      case GRPC_NO_OP:
+        gpr_strvec_add(&b, gpr_strdup("NO_OP"));
+        break;
+      case GRPC_OP_BEGIN_MESSAGE:
+        gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
+        gpr_strvec_add(&b, tmp);
+        break;
+      case GRPC_OP_SLICE:
+        gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
+        gpr_strvec_add(&b, tmp);
+        break;
+      case GRPC_OP_METADATA:
+        gpr_strvec_add(&b, gpr_strdup("METADATA{"));
+        put_metadata_list(&b, op->data.metadata);
+        gpr_strvec_add(&b, gpr_strdup("}"));
+        break;
+    }
+  }
+
+  out = gpr_strvec_flatten(&b, NULL);
+  gpr_strvec_destroy(&b);
+
+  return out;
+}
+
+char *grpc_transport_op_string(grpc_transport_op *op) {
+  char *tmp;
+  char *out;
+  int first = 1;
+
+  gpr_strvec b;
+  gpr_strvec_init(&b);
+
+  if (op->send_ops) {
+    if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+    first = 0;
+    gpr_strvec_add(&b, gpr_strdup("SEND"));
+    if (op->is_last_send) {
+      gpr_strvec_add(&b, gpr_strdup("_LAST"));
+    }
+    gpr_strvec_add(&b, gpr_strdup("["));
+    gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
+    gpr_strvec_add(&b, gpr_strdup("]"));
+  }
+
+  if (op->recv_ops) {
+    if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+    first = 0;
+    gpr_strvec_add(&b, gpr_strdup("RECV"));
+  }
+
+  if (op->bind_pollset) {
+    if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+    first = 0;
+    gpr_strvec_add(&b, gpr_strdup("BIND"));
+  }
+
+  if (op->cancel_with_status != GRPC_STATUS_OK) {
+    if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+    first = 0;
+    gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+    gpr_strvec_add(&b, tmp);
+    if (op->cancel_message) {
+      gpr_asprintf(&tmp, ";msg='%s'",
+                   grpc_mdstr_as_c_string(op->cancel_message));
+      gpr_strvec_add(&b, tmp);
+    }
+  }
+
+  out = gpr_strvec_flatten(&b, NULL);
+  gpr_strvec_destroy(&b);
+
+  return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+                      grpc_call_element *elem, grpc_transport_op *op) {
+  char *str = grpc_transport_op_string(op);
+  gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+  gpr_free(str);
+}
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f4ae6fa..e76bb93 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -118,35 +118,36 @@
 }
 
 static size_t md_ary_datasize(const void *p) {
-    const grpc_metadata_array* const ary = (grpc_metadata_array*)p;
-    size_t i, datasize = sizeof(grpc_metadata_array);
-    for (i = 0; i < ary->count; ++i) {
-        const grpc_metadata* const md = &ary->metadata[i];
-        datasize += strlen(md->key);
-        datasize += md->value_length;
-    }
-    datasize += ary->capacity * sizeof(grpc_metadata);
-    return datasize;
+  const grpc_metadata_array *const ary = (grpc_metadata_array *)p;
+  size_t i, datasize = sizeof(grpc_metadata_array);
+  for (i = 0; i < ary->count; ++i) {
+    const grpc_metadata *const md = &ary->metadata[i];
+    datasize += strlen(md->key);
+    datasize += md->value_length;
+  }
+  datasize += ary->capacity * sizeof(grpc_metadata);
+  return datasize;
 }
 
 static const rb_data_type_t grpc_rb_md_ary_data_type = {
     "grpc_metadata_array",
     {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize},
-    NULL, NULL,
-    0
-};
+    NULL,
+    NULL,
+    0};
 
 /* Describes grpc_call struct for RTypedData */
 static const rb_data_type_t grpc_call_data_type = {
     "grpc_call",
     {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE},
-    NULL, NULL,
-    /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because grpc_rb_call_destroy
+    NULL,
+    NULL,
+    /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
+     * grpc_rb_call_destroy
      * touches a hash object.
      * TODO(yugui) Directly use st_table and call the free function earlier?
      */
-    0
-};
+    0};
 
 /* Error code details is a hash containing text strings describing errors */
 VALUE rb_error_code_details;
@@ -250,7 +251,7 @@
       }
       md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
       md_ary->metadata[md_ary->count].value_length =
-        RSTRING_LEN(rb_ary_entry(val, i));
+          RSTRING_LEN(rb_ary_entry(val, i));
       md_ary->count += 1;
     }
   } else {
@@ -290,10 +291,11 @@
 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
    a grpc_metadata_array.
 */
-static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
+static void grpc_rb_md_ary_convert(VALUE md_ary_hash,
+                                   grpc_metadata_array *md_ary) {
   VALUE md_ary_obj = Qnil;
   if (md_ary_hash == Qnil) {
-    return;  /* Do nothing if the expected has value is nil */
+    return; /* Do nothing if the expected has value is nil */
   }
   if (TYPE(md_ary_hash) != T_HASH) {
     rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
@@ -303,8 +305,8 @@
 
   /* Initialize the array, compute it's capacity, then fill it. */
   grpc_metadata_array_init(md_ary);
-  md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type,
-                                     md_ary);
+  md_ary_obj =
+      TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
   md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
@@ -327,16 +329,14 @@
       rb_hash_aset(result, key, value);
     } else if (TYPE(value) == T_ARRAY) {
       /* Add the string to the returned array */
-      rb_ary_push(value,
-                  rb_str_new(md_ary->metadata[i].value,
-                             md_ary->metadata[i].value_length));
+      rb_ary_push(value, rb_str_new(md_ary->metadata[i].value,
+                                    md_ary->metadata[i].value_length));
     } else {
       /* Add the current value with this key and the new one to an array */
       new_ary = rb_ary_new();
       rb_ary_push(new_ary, value);
-      rb_ary_push(new_ary,
-                  rb_str_new(md_ary->metadata[i].value,
-                             md_ary->metadata[i].value_length));
+      rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value,
+                                      md_ary->metadata[i].value_length));
       rb_hash_aset(result, key, new_ary);
     }
   }
@@ -355,7 +355,7 @@
              rb_obj_classname(key));
     return ST_STOP;
   }
-  switch(NUM2INT(key)) {
+  switch (NUM2INT(key)) {
     case GRPC_OP_SEND_INITIAL_METADATA:
     case GRPC_OP_SEND_MESSAGE:
     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@@ -367,8 +367,7 @@
       rb_ary_push(ops_ary, key);
       return ST_CONTINUE;
     default:
-      rb_raise(rb_eTypeError, "invalid operation : bad value %d",
-               NUM2INT(key));
+      rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
   };
   return ST_STOP;
 }
@@ -377,8 +376,8 @@
    struct to the 'send_status_from_server' portion of an op.
 */
 static void grpc_rb_op_update_status_from_server(grpc_op *op,
-                                          grpc_metadata_array* md_ary,
-                                          VALUE status) {
+                                                 grpc_metadata_array *md_ary,
+                                                 VALUE status) {
   VALUE code = rb_struct_aref(status, sym_code);
   VALUE details = rb_struct_aref(status, sym_details);
   VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
@@ -405,8 +404,8 @@
  * grpc_rb_call_run_batch function */
 typedef struct run_batch_stack {
   /* The batch ops */
-  grpc_op ops[8];  /* 8 is the maximum number of operations */
-  size_t op_num;   /* tracks the last added operation */
+  grpc_op ops[8]; /* 8 is the maximum number of operations */
+  size_t op_num;  /* tracks the last added operation */
 
   /* Data being sent */
   grpc_metadata_array send_metadata;
@@ -424,7 +423,7 @@
 
 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
  * initialized */
-static void grpc_run_batch_stack_init(run_batch_stack* st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st) {
   MEMZERO(st, run_batch_stack, 1);
   grpc_metadata_array_init(&st->send_metadata);
   grpc_metadata_array_init(&st->send_trailing_metadata);
@@ -435,7 +434,7 @@
 
 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
  * cleaned up */
-static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
+static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
   grpc_metadata_array_destroy(&st->send_metadata);
   grpc_metadata_array_destroy(&st->send_trailing_metadata);
   grpc_metadata_array_destroy(&st->recv_metadata);
@@ -447,7 +446,7 @@
 
 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
  * ops_hash */
-static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
+static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
   VALUE this_op = Qnil;
   VALUE this_value = Qnil;
   VALUE ops_ary = rb_ary_new();
@@ -460,7 +459,7 @@
   for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
     this_op = rb_ary_entry(ops_ary, i);
     this_value = rb_hash_aref(ops_hash, this_op);
-    switch(NUM2INT(this_op)) {
+    switch (NUM2INT(this_op)) {
       case GRPC_OP_SEND_INITIAL_METADATA:
         /* N.B. later there is no need to explicitly delete the metadata keys
          * and values, they are references to data in ruby objects. */
@@ -471,18 +470,16 @@
             st->send_metadata.metadata;
         break;
       case GRPC_OP_SEND_MESSAGE:
-        st->ops[st->op_num].data.send_message =
-            grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
-                                     RSTRING_LEN(this_value));
+        st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
+            RSTRING_PTR(this_value), RSTRING_LEN(this_value));
         break;
       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
         break;
       case GRPC_OP_SEND_STATUS_FROM_SERVER:
         /* N.B. later there is no need to explicitly delete the metadata keys
          * and values, they are references to data in ruby objects. */
-        grpc_rb_op_update_status_from_server(&st->ops[st->op_num],
-                                             &st->send_trailing_metadata,
-                                             this_value);
+        grpc_rb_op_update_status_from_server(
+            &st->ops[st->op_num], &st->send_trailing_metadata, this_value);
         break;
       case GRPC_OP_RECV_INITIAL_METADATA:
         st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
@@ -516,12 +513,12 @@
 
 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
    after the results have run */
-static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
+static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
   size_t i = 0;
   VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
                                Qnil, Qnil, Qnil, Qnil, NULL);
   for (i = 0; i < st->op_num; i++) {
-    switch(st->ops[i].op) {
+    switch (st->ops[i].op) {
       case GRPC_OP_SEND_INITIAL_METADATA:
         rb_struct_aset(result, sym_send_metadata, Qtrue);
         break;
@@ -544,13 +541,11 @@
         break;
       case GRPC_OP_RECV_STATUS_ON_CLIENT:
         rb_struct_aset(
-            result,
-            sym_status,
-            rb_struct_new(grpc_rb_sStatus,
-                          UINT2NUM(st->recv_status),
+            result, sym_status,
+            rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
                           (st->recv_status_details == NULL
-                           ? Qnil
-                           : rb_str_new2(st->recv_status_details)),
+                               ? Qnil
+                               : rb_str_new2(st->recv_status_details)),
                           grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
                           NULL));
         break;
@@ -682,8 +677,7 @@
 
 static void Init_grpc_op_codes() {
   /* Constants representing operation type codes in grpc.h */
-  VALUE grpc_rb_mCallOps =
-      rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
+  VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
   rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
                   UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
   rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
@@ -709,14 +703,14 @@
   grpc_rb_eOutOfTime =
       rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
   grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
-  grpc_rb_cMdAry = rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray",
-                                         rb_cObject);
+  grpc_rb_cMdAry =
+      rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
 
   /* Prevent allocation or inialization of the Call class */
   rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
   rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
-  rb_define_method(grpc_rb_cCall, "initialize_copy",
-                   grpc_rb_cannot_init_copy, 1);
+  rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
+                   1);
 
   /* Add ruby analogues of the Call methods. */
   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
@@ -746,16 +740,8 @@
 
   /* The Struct used to return the run_batch result. */
   grpc_rb_sBatchResult = rb_struct_define(
-      "BatchResult",
-      "send_message",
-      "send_metadata",
-      "send_close",
-      "send_status",
-      "message",
-      "metadata",
-      "status",
-      "cancelled",
-      NULL);
+      "BatchResult", "send_message", "send_metadata", "send_close",
+      "send_status", "message", "metadata", "status", "cancelled", NULL);
 
   /* The hash for reference counting calls, to ensure they can't be destroyed
    * more than once */
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index e92db59..957dee1 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -55,7 +55,8 @@
 }
 
 static void call_init_func(grpc_call_element *elem,
-                           const void *server_transport_data) {
+                           const void *server_transport_data,
+                           grpc_transport_op *initial_op) {
   ++*(int *)(elem->channel_data);
   *(int *)(elem->call_data) = 0;
 }
@@ -66,8 +67,7 @@
   ++*(int *)(elem->channel_data);
 }
 
-static void call_func(grpc_call_element *elem, grpc_call_element *from_elem,
-                      grpc_call_op *op) {
+static void call_func(grpc_call_element *elem, grpc_transport_op *op) {
   ++*(int *)(elem->call_data);
 }
 
@@ -78,9 +78,8 @@
 
 static void test_create_channel_stack(void) {
   const grpc_channel_filter filter = {
-      call_func,         channel_func,         sizeof(int),
-      call_init_func,    call_destroy_func,    sizeof(int),
-      channel_init_func, channel_destroy_func, "some_test_filter"};
+      call_func, channel_func, sizeof(int), call_init_func, call_destroy_func,
+      sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"};
   const grpc_channel_filter *filters = &filter;
   grpc_channel_stack *channel_stack;
   grpc_call_stack *call_stack;
@@ -112,7 +111,7 @@
   GPR_ASSERT(*channel_data == 0);
 
   call_stack = gpr_malloc(channel_stack->call_stack_size);
-  grpc_call_stack_init(channel_stack, NULL, call_stack);
+  grpc_call_stack_init(channel_stack, NULL, NULL, call_stack);
   GPR_ASSERT(call_stack->count == 1);
   call_elem = grpc_call_stack_element(call_stack, 0);
   GPR_ASSERT(call_elem->filter == channel_elem->filter);
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index ab7c7f4..d7de5e5 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -37,7 +37,6 @@
 
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/surface/channel.h"
 #include "src/core/surface/client.h"
diff --git a/test/core/end2end/fixtures/chttp2_fullstack_uds.c b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
index 27e4baf..53803b0 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack_uds.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
@@ -39,7 +39,6 @@
 
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/support/string.h"
 #include "src/core/surface/channel.h"
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index 1225f7d..d19ceb1 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -37,7 +37,6 @@
 
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_client_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/iomgr/endpoint_pair.h"
@@ -60,8 +59,8 @@
 static grpc_transport_setup_result server_setup_transport(
     void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
   grpc_end2end_test_fixture *f = ts;
-  static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
-                                                       &grpc_http_filter};
+  static grpc_channel_filter const *extra_filters[] = {
+      &grpc_http_server_filter};
   return grpc_server_setup_transport(f->server, transport, extra_filters,
                                      GPR_ARRAY_SIZE(extra_filters), mdctx);
 }
@@ -75,9 +74,9 @@
     void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
   sp_client_setup *cs = ts;
 
-  const grpc_channel_filter *filters[] = {
-      &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter,
-      &grpc_connected_channel_filter};
+  const grpc_channel_filter *filters[] = {&grpc_client_surface_filter,
+                                          &grpc_http_client_filter,
+                                          &grpc_connected_channel_filter};
   size_t nfilters = sizeof(filters) / sizeof(*filters);
   grpc_channel *channel = grpc_channel_create_from_filters(
       filters, nfilters, cs->client_args, mdctx, 1);
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index 9f6ad98..ddde585 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -37,7 +37,6 @@
 
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
 #include "src/core/channel/http_client_filter.h"
 #include "src/core/channel/http_server_filter.h"
 #include "src/core/iomgr/endpoint_pair.h"
@@ -60,8 +59,8 @@
 static grpc_transport_setup_result server_setup_transport(
     void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
   grpc_end2end_test_fixture *f = ts;
-  static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
-                                                       &grpc_http_filter};
+  static grpc_channel_filter const *extra_filters[] = {
+      &grpc_http_server_filter};
   return grpc_server_setup_transport(f->server, transport, extra_filters,
                                      GPR_ARRAY_SIZE(extra_filters), mdctx);
 }
@@ -75,9 +74,9 @@
     void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
   sp_client_setup *cs = ts;
 
-  const grpc_channel_filter *filters[] = {
-      &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter,
-      &grpc_connected_channel_filter};
+  const grpc_channel_filter *filters[] = {&grpc_client_surface_filter,
+                                          &grpc_http_client_filter,
+                                          &grpc_connected_channel_filter};
   size_t nfilters = sizeof(filters) / sizeof(*filters);
   grpc_channel *channel = grpc_channel_create_from_filters(
       filters, nfilters, cs->client_args, mdctx, 1);
diff --git a/test/core/transport/chttp2/stream_encoder_test.c b/test/core/transport/chttp2/stream_encoder_test.c
index bda8298..9183344 100644
--- a/test/core/transport/chttp2/stream_encoder_test.c
+++ b/test/core/transport/chttp2/stream_encoder_test.c
@@ -102,15 +102,10 @@
   gpr_slice_unref(expect);
 }
 
-static void assert_result_ok(void *user_data, grpc_op_error error) {
-  GPR_ASSERT(error == GRPC_OP_OK);
-}
-
 static void test_small_data_framing(void) {
   grpc_sopb_add_no_op(&g_sopb);
   verify_sopb(10, 0, 0, "");
 
-  grpc_sopb_add_flow_ctl_cb(&g_sopb, assert_result_ok, NULL);
   grpc_sopb_add_slice(&g_sopb, create_test_slice(3));
   verify_sopb(10, 0, 3, "000003 0000 deadbeef 000102");
 
diff --git a/test/core/transport/stream_op_test.c b/test/core/transport/stream_op_test.c
index 5885223..546080d 100644
--- a/test/core/transport/stream_op_test.c
+++ b/test/core/transport/stream_op_test.c
@@ -38,10 +38,6 @@
 #include <grpc/support/log.h>
 #include "test/core/util/test_config.h"
 
-static void flow_ctl_cb_fails(void *ignored, grpc_op_error error) {
-  GPR_ASSERT(error == GRPC_OP_ERROR);
-}
-
 static void assert_slices_equal(gpr_slice a, gpr_slice b) {
   GPR_ASSERT(a.refcount == b.refcount);
   if (a.refcount) {
@@ -60,7 +56,6 @@
   gpr_slice test_slice_2 = gpr_slice_malloc(2);
   gpr_slice test_slice_3 = gpr_slice_malloc(3);
   gpr_slice test_slice_4 = gpr_slice_malloc(4);
-  char x;
   unsigned i;
 
   grpc_stream_op_buffer buf;
@@ -78,11 +73,10 @@
   grpc_sopb_add_slice(&buf, test_slice_2);
   grpc_sopb_add_slice(&buf, test_slice_3);
   grpc_sopb_add_slice(&buf, test_slice_4);
-  grpc_sopb_add_flow_ctl_cb(&buf, flow_ctl_cb_fails, &x);
   grpc_sopb_add_no_op(&buf);
 
   /* verify that the data went in ok */
-  GPR_ASSERT(buf.nops == 7);
+  GPR_ASSERT(buf.nops == 6);
   GPR_ASSERT(buf.ops[0].type == GRPC_OP_BEGIN_MESSAGE);
   GPR_ASSERT(buf.ops[0].data.begin_message.length == 1);
   GPR_ASSERT(buf.ops[0].data.begin_message.flags == 2);
@@ -94,10 +88,7 @@
   assert_slices_equal(buf.ops[3].data.slice, test_slice_3);
   GPR_ASSERT(buf.ops[4].type == GRPC_OP_SLICE);
   assert_slices_equal(buf.ops[4].data.slice, test_slice_4);
-  GPR_ASSERT(buf.ops[5].type == GRPC_OP_FLOW_CTL_CB);
-  GPR_ASSERT(buf.ops[5].data.flow_ctl_cb.cb == flow_ctl_cb_fails);
-  GPR_ASSERT(buf.ops[5].data.flow_ctl_cb.arg == &x);
-  GPR_ASSERT(buf.ops[6].type == GRPC_NO_OP);
+  GPR_ASSERT(buf.ops[5].type == GRPC_NO_OP);
 
   /* initialize the second buffer */
   grpc_sopb_init(&buf2);
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index fc7744f..0bc5943 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -115,7 +115,6 @@
     <ClInclude Include="..\..\src\core\channel\client_setup.h" />
     <ClInclude Include="..\..\src\core\channel\connected_channel.h" />
     <ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
-    <ClInclude Include="..\..\src\core\channel\http_filter.h" />
     <ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
     <ClInclude Include="..\..\src\core\channel\noop_filter.h" />
     <ClInclude Include="..\..\src\core\compression\algorithm.h" />
@@ -237,8 +236,6 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\tsi\transport_security.c">
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\call_op_string.c">
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\census_filter.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
@@ -255,8 +252,6 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_client_filter.c">
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\http_filter.c">
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_server_filter.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\noop_filter.c">
@@ -431,6 +426,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\transport\transport.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="..\gpr\gpr.vcxproj">
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index 1dfca58..edce8d9 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -61,9 +61,6 @@
     <ClCompile Include="..\..\src\core\tsi\transport_security.c">
       <Filter>src\core\tsi</Filter>
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\call_op_string.c">
-      <Filter>src\core\channel</Filter>
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\census_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -88,9 +85,6 @@
     <ClCompile Include="..\..\src\core\channel\http_client_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\http_filter.c">
-      <Filter>src\core\channel</Filter>
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_server_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -352,6 +346,9 @@
     <ClCompile Include="..\..\src\core\transport\transport.c">
       <Filter>src\core\transport</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+      <Filter>src\core\transport</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\grpc\grpc_security.h">
@@ -443,9 +440,6 @@
     <ClInclude Include="..\..\src\core\channel\http_client_filter.h">
       <Filter>src\core\channel</Filter>
     </ClInclude>
-    <ClInclude Include="..\..\src\core\channel\http_filter.h">
-      <Filter>src\core\channel</Filter>
-    </ClInclude>
     <ClInclude Include="..\..\src\core\channel\http_server_filter.h">
       <Filter>src\core\channel</Filter>
     </ClInclude>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index 670b109..fac17e6 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -98,7 +98,6 @@
     <ClInclude Include="..\..\src\core\channel\client_setup.h" />
     <ClInclude Include="..\..\src\core\channel\connected_channel.h" />
     <ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
-    <ClInclude Include="..\..\src\core\channel\http_filter.h" />
     <ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
     <ClInclude Include="..\..\src\core\channel\noop_filter.h" />
     <ClInclude Include="..\..\src\core\compression\algorithm.h" />
@@ -182,8 +181,6 @@
   <ItemGroup>
     <ClCompile Include="..\..\src\core\surface\init_unsecure.c">
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\call_op_string.c">
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\census_filter.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\channel_args.c">
@@ -200,8 +197,6 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_client_filter.c">
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\http_filter.c">
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_server_filter.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\channel\noop_filter.c">
@@ -376,6 +371,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\transport\transport.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="..\gpr\gpr.vcxproj">
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 7c94d4d..daca2c0 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -4,9 +4,6 @@
     <ClCompile Include="..\..\src\core\surface\init_unsecure.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\call_op_string.c">
-      <Filter>src\core\channel</Filter>
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\census_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -31,9 +28,6 @@
     <ClCompile Include="..\..\src\core\channel\http_client_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
-    <ClCompile Include="..\..\src\core\channel\http_filter.c">
-      <Filter>src\core\channel</Filter>
-    </ClCompile>
     <ClCompile Include="..\..\src\core\channel\http_server_filter.c">
       <Filter>src\core\channel</Filter>
     </ClCompile>
@@ -295,6 +289,9 @@
     <ClCompile Include="..\..\src\core\transport\transport.c">
       <Filter>src\core\transport</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+      <Filter>src\core\transport</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\grpc\byte_buffer.h">
@@ -338,9 +335,6 @@
     <ClInclude Include="..\..\src\core\channel\http_client_filter.h">
       <Filter>src\core\channel</Filter>
     </ClInclude>
-    <ClInclude Include="..\..\src\core\channel\http_filter.h">
-      <Filter>src\core\channel</Filter>
-    </ClInclude>
     <ClInclude Include="..\..\src\core\channel\http_server_filter.h">
       <Filter>src\core\channel</Filter>
     </ClInclude>