Merge github.com:grpc/grpc into endpoints
diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h
index ec048e8..04db003 100644
--- a/include/grpc/support/slice_buffer.h
+++ b/include/grpc/support/slice_buffer.h
@@ -86,6 +86,8 @@
 void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
 /* move all of the elements of src into dst */
 void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
+/* remove n bytes from the end of a slice buffer */
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n);
 
 #ifdef __cplusplus
 }
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 9012070..1e38479 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -61,6 +61,10 @@
   grpc_httpcli_context *context;
   grpc_pollset *pollset;
   grpc_iomgr_object iomgr_obj;
+  gpr_slice_buffer incoming;
+  gpr_slice_buffer outgoing;
+  grpc_iomgr_closure on_read;
+  grpc_iomgr_closure done_write;
 } internal_request;
 
 static grpc_httpcli_get_override g_get_override = NULL;
@@ -99,73 +103,70 @@
   gpr_slice_unref(req->request_text);
   gpr_free(req->host);
   grpc_iomgr_unregister_object(&req->iomgr_obj);
+  gpr_slice_buffer_destroy(&req->incoming);
+  gpr_slice_buffer_destroy(&req->outgoing);
   gpr_free(req);
 }
 
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
-                    grpc_endpoint_cb_status status) {
+static void on_read(void *user_data, int success);
+
+static void do_read(internal_request *req) {
+  switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
+    case GRPC_ENDPOINT_DONE:
+      on_read(req, 1);
+      break;
+    case GRPC_ENDPOINT_PENDING:
+      break;
+    case GRPC_ENDPOINT_ERROR:
+      on_read(req, 0);
+      break;
+  }
+}
+
+static void on_read(void *user_data, int success) {
   internal_request *req = user_data;
   size_t i;
 
-  for (i = 0; i < nslices; i++) {
-    if (GPR_SLICE_LENGTH(slices[i])) {
+  for (i = 0; i < req->incoming.count; i++) {
+    if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
       req->have_read_byte = 1;
-      if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) {
+      if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
         finish(req, 0);
-        goto done;
+        return;
       }
     }
   }
 
-  switch (status) {
-    case GRPC_ENDPOINT_CB_OK:
-      grpc_endpoint_notify_on_read(req->ep, on_read, req);
-      break;
-    case GRPC_ENDPOINT_CB_EOF:
-    case GRPC_ENDPOINT_CB_ERROR:
-    case GRPC_ENDPOINT_CB_SHUTDOWN:
-      if (!req->have_read_byte) {
-        next_address(req);
-      } else {
-        finish(req, grpc_httpcli_parser_eof(&req->parser));
-      }
-      break;
-  }
-
-done:
-  for (i = 0; i < nslices; i++) {
-    gpr_slice_unref(slices[i]);
+  if (success) {
+    do_read(req);
+  } else if (!req->have_read_byte) {
+    next_address(req);
+  } else {
+    finish(req, grpc_httpcli_parser_eof(&req->parser));
   }
 }
 
-static void on_written(internal_request *req) {
-  grpc_endpoint_notify_on_read(req->ep, on_read, req);
-}
+static void on_written(internal_request *req) { do_read(req); }
 
-static void done_write(void *arg, grpc_endpoint_cb_status status) {
+static void done_write(void *arg, int success) {
   internal_request *req = arg;
-  switch (status) {
-    case GRPC_ENDPOINT_CB_OK:
-      on_written(req);
-      break;
-    case GRPC_ENDPOINT_CB_EOF:
-    case GRPC_ENDPOINT_CB_SHUTDOWN:
-    case GRPC_ENDPOINT_CB_ERROR:
-      next_address(req);
-      break;
+  if (success) {
+    on_written(req);
+  } else {
+    next_address(req);
   }
 }
 
 static void start_write(internal_request *req) {
   gpr_slice_ref(req->request_text);
-  switch (
-      grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
-    case GRPC_ENDPOINT_WRITE_DONE:
+  gpr_slice_buffer_add(&req->outgoing, req->request_text);
+  switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
+    case GRPC_ENDPOINT_DONE:
       on_written(req);
       break;
-    case GRPC_ENDPOINT_WRITE_PENDING:
+    case GRPC_ENDPOINT_PENDING:
       break;
-    case GRPC_ENDPOINT_WRITE_ERROR:
+    case GRPC_ENDPOINT_ERROR:
       finish(req, 0);
       break;
   }
@@ -237,6 +238,10 @@
       request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
   req->context = context;
   req->pollset = pollset;
+  grpc_iomgr_closure_init(&req->on_read, on_read, req);
+  grpc_iomgr_closure_init(&req->done_write, done_write, req);
+  gpr_slice_buffer_init(&req->incoming);
+  gpr_slice_buffer_init(&req->outgoing);
   gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
   gpr_free(name);
@@ -270,7 +275,11 @@
       request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
   req->context = context;
   req->pollset = pollset;
-  gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
+  grpc_iomgr_closure_init(&req->on_read, on_read, req);
+  grpc_iomgr_closure_init(&req->done_write, done_write, req);
+  gpr_slice_buffer_init(&req->incoming);
+  gpr_slice_buffer_init(&req->outgoing);
+  gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
   gpr_free(name);
   req->host = gpr_strdup(request->host);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 8ee14bc..a7878e3 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,17 +33,16 @@
 
 #include "src/core/iomgr/endpoint.h"
 
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
-                                  void *user_data) {
-  ep->vtable->notify_on_read(ep, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
+                                           gpr_slice_buffer *slices,
+                                           grpc_iomgr_closure *cb) {
+  return ep->vtable->read(ep, slices, cb);
 }
 
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
-                                               gpr_slice *slices,
-                                               size_t nslices,
-                                               grpc_endpoint_write_cb cb,
-                                               void *user_data) {
-  return ep->vtable->write(ep, slices, nslices, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
+                                            gpr_slice_buffer *slices,
+                                            grpc_iomgr_closure *cb) {
+  return ep->vtable->write(ep, slices, cb);
 }
 
 void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index ea92a50..d14d52d 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -37,6 +37,7 @@
 #include "src/core/iomgr/pollset.h"
 #include "src/core/iomgr/pollset_set.h"
 #include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
 #include <grpc/support/time.h>
 
 /* An endpoint caps a streaming channel between two communicating processes.
@@ -45,31 +46,17 @@
 typedef struct grpc_endpoint grpc_endpoint;
 typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
 
-typedef enum grpc_endpoint_cb_status {
-  GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
-  GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
-  GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
-  GRPC_ENDPOINT_CB_ERROR     /* Call interrupted by socket error */
-} grpc_endpoint_cb_status;
-
-typedef enum grpc_endpoint_write_status {
-  GRPC_ENDPOINT_WRITE_DONE,    /* completed immediately, cb won't be called */
-  GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
-  GRPC_ENDPOINT_WRITE_ERROR    /* write errored out, cb won't be called */
-} grpc_endpoint_write_status;
-
-typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
-                                      size_t nslices,
-                                      grpc_endpoint_cb_status error);
-typedef void (*grpc_endpoint_write_cb)(void *user_data,
-                                       grpc_endpoint_cb_status error);
+typedef enum grpc_endpoint_op_status {
+  GRPC_ENDPOINT_DONE,    /* completed immediately, cb won't be called */
+  GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
+  GRPC_ENDPOINT_ERROR    /* write errored out, cb won't be called */
+} grpc_endpoint_op_status;
 
 struct grpc_endpoint_vtable {
-  void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
-                         void *user_data);
-  grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
-                                      size_t nslices, grpc_endpoint_write_cb cb,
-                                      void *user_data);
+  grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+                                  grpc_iomgr_closure *cb);
+  grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+                                   grpc_iomgr_closure *cb);
   void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
   void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
   void (*shutdown)(grpc_endpoint *ep);
@@ -77,26 +64,32 @@
   char *(*get_peer)(grpc_endpoint *ep);
 };
 
-/* When data is available on the connection, calls the callback with slices. */
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
-                                  void *user_data);
+/* When data is available on the connection, calls the callback with slices.
+   Callback success indicates that the endpoint can accept more reads, failure
+   indicates the endpoint is closed.
+   Valid slices may be placed into \a slices even on callback success == 0. */
+grpc_endpoint_op_status grpc_endpoint_read(
+    grpc_endpoint *ep, gpr_slice_buffer *slices,
+    grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
 
 char *grpc_endpoint_get_peer(grpc_endpoint *ep);
 
 /* Write slices out to the socket.
 
    If the connection is ready for more data after the end of the call, it
-   returns GRPC_ENDPOINT_WRITE_DONE.
-   Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
-   connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
-                                               gpr_slice *slices,
-                                               size_t nslices,
-                                               grpc_endpoint_write_cb cb,
-                                               void *user_data);
+   returns GRPC_ENDPOINT_DONE.
+   Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the
+   connection is ready for more data.
+   \a slices may be mutated at will by the endpoint until cb is called.
+   No guarantee is made to the content of slices after a write EXCEPT that
+   it is a valid slice buffer.
+   */
+grpc_endpoint_op_status grpc_endpoint_write(
+    grpc_endpoint *ep, gpr_slice_buffer *slices,
+    grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
 
 /* Causes any pending read/write callbacks to run immediately with
-   GRPC_ENDPOINT_CB_SHUTDOWN status */
+   success==0 */
 void grpc_endpoint_shutdown(grpc_endpoint *ep);
 void grpc_endpoint_destroy(grpc_endpoint *ep);
 
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 360e6eb..03be462 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -61,209 +61,8 @@
 #define SENDMSG_FLAGS 0
 #endif
 
-/* Holds a slice array and associated state. */
-typedef struct grpc_tcp_slice_state {
-  gpr_slice *slices;       /* Array of slices */
-  size_t nslices;          /* Size of slices array. */
-  ssize_t first_slice;     /* First valid slice in array */
-  ssize_t last_slice;      /* Last valid slice in array */
-  gpr_slice working_slice; /* pointer to original final slice */
-  int working_slice_valid; /* True if there is a working slice */
-  int memory_owned;        /* True if slices array is owned */
-} grpc_tcp_slice_state;
-
 int grpc_tcp_trace = 0;
 
-static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
-                             size_t nslices, size_t valid_slices) {
-  state->slices = slices;
-  state->nslices = nslices;
-  if (valid_slices == 0) {
-    state->first_slice = -1;
-  } else {
-    state->first_slice = 0;
-  }
-  state->last_slice = valid_slices - 1;
-  state->working_slice_valid = 0;
-  state->memory_owned = 0;
-}
-
-/* Returns true if there is still available data */
-static int slice_state_has_available(grpc_tcp_slice_state *state) {
-  return state->first_slice != -1 && state->last_slice >= state->first_slice;
-}
-
-static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
-  if (state->first_slice == -1) {
-    return 0;
-  } else {
-    return state->last_slice - state->first_slice + 1;
-  }
-}
-
-static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
-  /* TODO(klempner): use realloc instead when first_slice is 0 */
-  /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
-  gpr_slice *slices = state->slices;
-  size_t original_size = slice_state_slices_allocated(state);
-  size_t i;
-  gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
-
-  for (i = 0; i < original_size; ++i) {
-    new_slices[i] = slices[i + state->first_slice];
-  }
-
-  state->slices = new_slices;
-  state->last_slice = original_size - 1;
-  if (original_size > 0) {
-    state->first_slice = 0;
-  } else {
-    state->first_slice = -1;
-  }
-  state->nslices = new_size;
-
-  if (state->memory_owned) {
-    gpr_free(slices);
-  }
-  state->memory_owned = 1;
-}
-
-static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
-                                      size_t prefix_bytes) {
-  gpr_slice *current_slice = &state->slices[state->first_slice];
-  size_t current_slice_size;
-
-  while (slice_state_has_available(state)) {
-    current_slice_size = GPR_SLICE_LENGTH(*current_slice);
-    if (current_slice_size > prefix_bytes) {
-      /* TODO(klempner): Get rid of the extra refcount created here by adding a
-         native "trim the first N bytes" operation to splice */
-      /* TODO(klempner): This really shouldn't be modifying the current slice
-         unless we own the slices array. */
-      gpr_slice tail;
-      tail = gpr_slice_split_tail(current_slice, prefix_bytes);
-      gpr_slice_unref(*current_slice);
-      *current_slice = tail;
-      return;
-    } else {
-      gpr_slice_unref(*current_slice);
-      ++state->first_slice;
-      ++current_slice;
-      prefix_bytes -= current_slice_size;
-    }
-  }
-}
-
-static void slice_state_destroy(grpc_tcp_slice_state *state) {
-  while (slice_state_has_available(state)) {
-    gpr_slice_unref(state->slices[state->first_slice]);
-    ++state->first_slice;
-  }
-
-  if (state->memory_owned) {
-    gpr_free(state->slices);
-    state->memory_owned = 0;
-  }
-}
-
-void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
-                                    gpr_slice **slices, size_t *nslices) {
-  *slices = state->slices + state->first_slice;
-  *nslices = state->last_slice - state->first_slice + 1;
-
-  state->first_slice = -1;
-  state->last_slice = -1;
-}
-
-/* Fills iov with the first min(iov_size, available) slices, returns number
-   filled */
-static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
-                                   struct iovec *iov, size_t iov_size) {
-  size_t nslices = state->last_slice - state->first_slice + 1;
-  gpr_slice *slices = state->slices + state->first_slice;
-  size_t i;
-  if (nslices < iov_size) {
-    iov_size = nslices;
-  }
-
-  for (i = 0; i < iov_size; ++i) {
-    iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
-    iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
-  }
-  return iov_size;
-}
-
-/* Makes n blocks available at the end of state, writes them into iov, and
-   returns the number of bytes allocated */
-static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
-                                                   struct iovec *iov, size_t n,
-                                                   size_t slice_size) {
-  size_t target_size;
-  size_t i;
-  size_t allocated_bytes;
-  ssize_t allocated_slices = slice_state_slices_allocated(state);
-
-  if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
-    /* Need to grow the slice array */
-    target_size = state->nslices;
-    do {
-      target_size = target_size * 2;
-    } while (target_size < allocated_slices + n - state->working_slice_valid);
-    /* TODO(klempner): If this ever needs to support both prefix removal and
-       append, we should be smarter about the growth logic here */
-    slice_state_realloc(state, target_size);
-  }
-
-  i = 0;
-  allocated_bytes = 0;
-
-  if (state->working_slice_valid) {
-    iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
-    iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
-                     GPR_SLICE_LENGTH(state->slices[state->last_slice]);
-    allocated_bytes += iov[0].iov_len;
-    ++i;
-    state->slices[state->last_slice] = state->working_slice;
-    state->working_slice_valid = 0;
-  }
-
-  for (; i < n; ++i) {
-    ++state->last_slice;
-    state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
-    iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
-    iov[i].iov_len = slice_size;
-    allocated_bytes += slice_size;
-  }
-  if (state->first_slice == -1) {
-    state->first_slice = 0;
-  }
-  return allocated_bytes;
-}
-
-/* Remove the last n bytes from state */
-/* TODO(klempner): Consider having this defer actual deletion until later */
-static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
-  while (bytes > 0 && slice_state_has_available(state)) {
-    if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
-      state->working_slice = state->slices[state->last_slice];
-      state->working_slice_valid = 1;
-      /* TODO(klempner): Combine these into a single operation that doesn't need
-         to refcount */
-      gpr_slice_unref(gpr_slice_split_tail(
-          &state->slices[state->last_slice],
-          GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
-      bytes = 0;
-    } else {
-      bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
-      gpr_slice_unref(state->slices[state->last_slice]);
-      --state->last_slice;
-      if (state->last_slice == -1) {
-        state->first_slice = -1;
-      }
-    }
-  }
-}
-
 typedef struct {
   grpc_endpoint base;
   grpc_fd *em_fd;
@@ -273,12 +72,15 @@
   size_t slice_size;
   gpr_refcount refcount;
 
-  grpc_endpoint_read_cb read_cb;
-  void *read_user_data;
-  grpc_endpoint_write_cb write_cb;
-  void *write_user_data;
+  gpr_slice_buffer *incoming_buffer;
+  gpr_slice_buffer *outgoing_buffer;
+  /** slice within outgoing_buffer to write next */
+  size_t outgoing_slice_idx;
+  /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+  size_t outgoing_byte_idx;
 
-  grpc_tcp_slice_state write_state;
+  grpc_iomgr_closure *read_cb;
+  grpc_iomgr_closure *write_cb;
 
   grpc_iomgr_closure read_closure;
   grpc_iomgr_closure write_closure;
@@ -288,65 +90,95 @@
   char *peer_string;
 } grpc_tcp;
 
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
 
-static void grpc_tcp_shutdown(grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_fd_shutdown(tcp->em_fd);
 }
 
-static void grpc_tcp_unref(grpc_tcp *tcp) {
-  int refcount_zero = gpr_unref(&tcp->refcount);
-  if (refcount_zero) {
-    grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
-    gpr_free(tcp->peer_string);
-    gpr_free(tcp);
+static void tcp_free(grpc_tcp *tcp) {
+  grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+  gpr_free(tcp->peer_string);
+  gpr_free(tcp);
+}
+
+/*#define GRPC_TCP_REFCOUNT_DEBUG*/
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+                      int line) {
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+          reason, tcp->refcount.count, tcp->refcount.count - 1);
+  if (gpr_unref(&tcp->refcount)) {
+    tcp_free(tcp);
   }
 }
 
-static void grpc_tcp_destroy(grpc_endpoint *ep) {
-  grpc_tcp *tcp = (grpc_tcp *)ep;
-  grpc_tcp_unref(tcp);
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+                    int line) {
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
+          reason, tcp->refcount.count, tcp->refcount.count + 1);
+  gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(grpc_tcp *tcp) {
+  if (gpr_unref(&tcp->refcount)) {
+    tcp_free(tcp);
+  }
 }
 
-static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
-                         grpc_endpoint_cb_status status) {
-  grpc_endpoint_read_cb cb = tcp->read_cb;
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void tcp_destroy(grpc_endpoint *ep) {
+  grpc_tcp *tcp = (grpc_tcp *)ep;
+  TCP_UNREF(tcp, "destroy");
+}
+
+static void call_read_cb(grpc_tcp *tcp, int success) {
+  grpc_iomgr_closure *cb = tcp->read_cb;
 
   if (grpc_tcp_trace) {
     size_t i;
-    gpr_log(GPR_DEBUG, "read: status=%d", status);
-    for (i = 0; i < nslices; i++) {
-      char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    gpr_log(GPR_DEBUG, "read: success=%d", success);
+    for (i = 0; i < tcp->incoming_buffer->count; i++) {
+      char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
+                                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
       gpr_free(dump);
     }
   }
 
   tcp->read_cb = NULL;
-  cb(tcp->read_user_data, slices, nslices, status);
+  tcp->incoming_buffer = NULL;
+  cb->cb(cb->cb_arg, success);
 }
 
-#define INLINE_SLICE_BUFFER_SIZE 8
 #define MAX_READ_IOVEC 4
-static void grpc_tcp_continue_read(grpc_tcp *tcp) {
-  gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
+static void tcp_continue_read(grpc_tcp *tcp) {
   struct msghdr msg;
   struct iovec iov[MAX_READ_IOVEC];
   ssize_t read_bytes;
-  ssize_t allocated_bytes;
-  struct grpc_tcp_slice_state read_state;
-  gpr_slice *final_slices;
-  size_t final_nslices;
+  size_t i;
 
   GPR_ASSERT(!tcp->finished_edge);
+  GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
+  GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
   GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
-  slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
-                   0);
 
-  allocated_bytes = slice_state_append_blocks_into_iovec(
-      &read_state, iov, tcp->iov_size, tcp->slice_size);
+  while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+    gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
+                                 gpr_slice_malloc(tcp->slice_size));
+  }
+  for (i = 0; i < tcp->incoming_buffer->count; i++) {
+    iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
+    iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
+  }
 
   msg.msg_name = NULL;
   msg.msg_namelen = 0;
@@ -362,87 +194,63 @@
   } while (read_bytes < 0 && errno == EINTR);
   GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
 
-  if (read_bytes < allocated_bytes) {
-    /* TODO(klempner): Consider a second read first, in hopes of getting a
-     * quick EAGAIN and saving a bunch of allocations. */
-    slice_state_remove_last(&read_state, read_bytes < 0
-                                             ? allocated_bytes
-                                             : allocated_bytes - read_bytes);
-  }
-
   if (read_bytes < 0) {
-    /* NB: After calling the user_cb a parallel call of the read handler may
+    /* NB: After calling call_read_cb a parallel call of the read handler may
      * be running. */
     if (errno == EAGAIN) {
       if (tcp->iov_size > 1) {
         tcp->iov_size /= 2;
       }
-      if (slice_state_has_available(&read_state)) {
-        /* TODO(klempner): We should probably do the call into the application
-           without all this junk on the stack */
-        /* FIXME(klempner): Refcount properly */
-        slice_state_transfer_ownership(&read_state, &final_slices,
-                                       &final_nslices);
-        tcp->finished_edge = 1;
-        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
-        slice_state_destroy(&read_state);
-        grpc_tcp_unref(tcp);
-      } else {
-        /* We've consumed the edge, request a new one */
-        slice_state_destroy(&read_state);
-        grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
-      }
+      /* We've consumed the edge, request a new one */
+      grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
     } else {
       /* TODO(klempner): Log interesting errors */
-      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
-      slice_state_destroy(&read_state);
-      grpc_tcp_unref(tcp);
+      gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+      call_read_cb(tcp, 0);
+      TCP_UNREF(tcp, "read");
     }
   } else if (read_bytes == 0) {
     /* 0 read size ==> end of stream */
-    if (slice_state_has_available(&read_state)) {
-      /* there were bytes already read: pass them up to the application */
-      slice_state_transfer_ownership(&read_state, &final_slices,
-                                     &final_nslices);
-      call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
-    } else {
-      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
-    }
-    slice_state_destroy(&read_state);
-    grpc_tcp_unref(tcp);
+    gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+    call_read_cb(tcp, 0);
+    TCP_UNREF(tcp, "read");
   } else {
-    if (tcp->iov_size < MAX_READ_IOVEC) {
+    GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
+    if ((size_t)read_bytes < tcp->incoming_buffer->length) {
+      gpr_slice_buffer_trim_end(tcp->incoming_buffer,
+                                tcp->incoming_buffer->length - read_bytes);
+    } else if (tcp->iov_size < MAX_READ_IOVEC) {
       ++tcp->iov_size;
     }
-    GPR_ASSERT(slice_state_has_available(&read_state));
-    slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
-    call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
-    slice_state_destroy(&read_state);
-    grpc_tcp_unref(tcp);
+    GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
+    call_read_cb(tcp, 1);
+    TCP_UNREF(tcp, "read");
   }
 
   GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
 }
 
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
   grpc_tcp *tcp = (grpc_tcp *)arg;
   GPR_ASSERT(!tcp->finished_edge);
 
   if (!success) {
-    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
-    grpc_tcp_unref(tcp);
+    call_read_cb(tcp, 0);
+    TCP_UNREF(tcp, "read");
   } else {
-    grpc_tcp_continue_read(tcp);
+    tcp_continue_read(tcp);
   }
 }
 
-static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
-                                    void *user_data) {
+static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
+                                        gpr_slice_buffer *incoming_buffer,
+                                        grpc_iomgr_closure *cb) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   GPR_ASSERT(tcp->read_cb == NULL);
   tcp->read_cb = cb;
-  tcp->read_user_data = user_data;
-  gpr_ref(&tcp->refcount);
+  tcp->incoming_buffer = incoming_buffer;
+  gpr_slice_buffer_reset_and_unref(incoming_buffer);
+  TCP_REF(tcp, "read");
   if (tcp->finished_edge) {
     tcp->finished_edge = 0;
     grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
@@ -450,18 +258,41 @@
     tcp->handle_read_closure.cb_arg = tcp;
     grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
   }
+  /* TODO(ctiller): immediate return */
+  return GRPC_ENDPOINT_PENDING;
 }
 
 #define MAX_WRITE_IOVEC 16
-static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
+static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
   struct msghdr msg;
   struct iovec iov[MAX_WRITE_IOVEC];
   int iov_size;
   ssize_t sent_length;
-  grpc_tcp_slice_state *state = &tcp->write_state;
+  ssize_t sending_length;
+  ssize_t trailing;
+  ssize_t unwind_slice_idx;
+  ssize_t unwind_byte_idx;
 
   for (;;) {
-    iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
+    sending_length = 0;
+    unwind_slice_idx = tcp->outgoing_slice_idx;
+    unwind_byte_idx = tcp->outgoing_byte_idx;
+    for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+                       iov_size != MAX_WRITE_IOVEC;
+         iov_size++) {
+      iov[iov_size].iov_base =
+          GPR_SLICE_START_PTR(
+              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+          tcp->outgoing_byte_idx;
+      iov[iov_size].iov_len =
+          GPR_SLICE_LENGTH(
+              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+          tcp->outgoing_byte_idx;
+      sending_length += iov[iov_size].iov_len;
+      tcp->outgoing_slice_idx++;
+      tcp->outgoing_byte_idx = 0;
+    }
+    GPR_ASSERT(iov_size > 0);
 
     msg.msg_name = NULL;
     msg.msg_namelen = 0;
@@ -480,70 +311,75 @@
 
     if (sent_length < 0) {
       if (errno == EAGAIN) {
-        return GRPC_ENDPOINT_WRITE_PENDING;
+        tcp->outgoing_slice_idx = unwind_slice_idx;
+        tcp->outgoing_byte_idx = unwind_byte_idx;
+        return GRPC_ENDPOINT_PENDING;
       } else {
         /* TODO(klempner): Log some of these */
-        slice_state_destroy(state);
-        return GRPC_ENDPOINT_WRITE_ERROR;
+        return GRPC_ENDPOINT_ERROR;
       }
     }
 
-    /* TODO(klempner): Probably better to batch this after we finish flushing */
-    slice_state_remove_prefix(state, sent_length);
+    GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+    trailing = sending_length - sent_length;
+    while (trailing > 0) {
+      ssize_t slice_length;
 
-    if (!slice_state_has_available(state)) {
-      return GRPC_ENDPOINT_WRITE_DONE;
+      tcp->outgoing_slice_idx--;
+      slice_length = GPR_SLICE_LENGTH(
+          tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+      if (slice_length > trailing) {
+        tcp->outgoing_byte_idx = slice_length - trailing;
+        break;
+      } else {
+        trailing -= slice_length;
+      }
+    }
+
+    if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+      return GRPC_ENDPOINT_DONE;
     }
   };
 }
 
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
   grpc_tcp *tcp = (grpc_tcp *)arg;
-  grpc_endpoint_write_status write_status;
-  grpc_endpoint_cb_status cb_status;
-  grpc_endpoint_write_cb cb;
+  grpc_endpoint_op_status status;
+  grpc_iomgr_closure *cb;
 
   if (!success) {
-    slice_state_destroy(&tcp->write_state);
     cb = tcp->write_cb;
     tcp->write_cb = NULL;
-    cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
-    grpc_tcp_unref(tcp);
+    cb->cb(cb->cb_arg, 0);
+    TCP_UNREF(tcp, "write");
     return;
   }
 
   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
-  write_status = grpc_tcp_flush(tcp);
-  if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+  status = tcp_flush(tcp);
+  if (status == GRPC_ENDPOINT_PENDING) {
     grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
   } else {
-    slice_state_destroy(&tcp->write_state);
-    if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
-      cb_status = GRPC_ENDPOINT_CB_OK;
-    } else {
-      cb_status = GRPC_ENDPOINT_CB_ERROR;
-    }
     cb = tcp->write_cb;
     tcp->write_cb = NULL;
-    cb(tcp->write_user_data, cb_status);
-    grpc_tcp_unref(tcp);
+    cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
+    TCP_UNREF(tcp, "write");
   }
   GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
 }
 
-static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
-                                                 gpr_slice *slices,
-                                                 size_t nslices,
-                                                 grpc_endpoint_write_cb cb,
-                                                 void *user_data) {
+static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
+                                         gpr_slice_buffer *buf,
+                                         grpc_iomgr_closure *cb) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
-  grpc_endpoint_write_status status;
+  grpc_endpoint_op_status status;
 
   if (grpc_tcp_trace) {
     size_t i;
 
-    for (i = 0; i < nslices; i++) {
-      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < buf->count; i++) {
+      char *data =
+          gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
       gpr_free(data);
     }
@@ -551,15 +387,19 @@
 
   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
   GPR_ASSERT(tcp->write_cb == NULL);
-  slice_state_init(&tcp->write_state, slices, nslices, nslices);
 
-  status = grpc_tcp_flush(tcp);
-  if (status == GRPC_ENDPOINT_WRITE_PENDING) {
-    /* TODO(klempner): Consider inlining rather than malloc for small nslices */
-    slice_state_realloc(&tcp->write_state, nslices);
-    gpr_ref(&tcp->refcount);
+  if (buf->length == 0) {
+    GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
+    return GRPC_ENDPOINT_DONE;
+  }
+  tcp->outgoing_buffer = buf;
+  tcp->outgoing_slice_idx = 0;
+  tcp->outgoing_byte_idx = 0;
+
+  status = tcp_flush(tcp);
+  if (status == GRPC_ENDPOINT_PENDING) {
+    TCP_REF(tcp, "write");
     tcp->write_cb = cb;
-    tcp->write_user_data = user_data;
     grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
   }
 
@@ -567,27 +407,25 @@
   return status;
 }
 
-static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_pollset_add_fd(pollset, tcp->em_fd);
 }
 
-static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
-                                        grpc_pollset_set *pollset_set) {
+static void tcp_add_to_pollset_set(grpc_endpoint *ep,
+                                   grpc_pollset_set *pollset_set) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
 }
 
-static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
+static char *tcp_get_peer(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   return gpr_strdup(tcp->peer_string);
 }
 
 static const grpc_endpoint_vtable vtable = {
-    grpc_tcp_notify_on_read, grpc_tcp_write,
-    grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
-    grpc_tcp_shutdown,       grpc_tcp_destroy,
-    grpc_tcp_get_peer};
+    tcp_read,     tcp_write,   tcp_add_to_pollset, tcp_add_to_pollset_set,
+    tcp_shutdown, tcp_destroy, tcp_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
                                const char *peer_string) {
@@ -597,21 +435,19 @@
   tcp->fd = em_fd->fd;
   tcp->read_cb = NULL;
   tcp->write_cb = NULL;
-  tcp->read_user_data = NULL;
-  tcp->write_user_data = NULL;
+  tcp->incoming_buffer = NULL;
   tcp->slice_size = slice_size;
   tcp->iov_size = 1;
   tcp->finished_edge = 1;
-  slice_state_init(&tcp->write_state, NULL, 0, 0);
   /* paired with unref in grpc_tcp_destroy */
   gpr_ref_init(&tcp->refcount, 1);
   tcp->em_fd = em_fd;
-  tcp->read_closure.cb = grpc_tcp_handle_read;
+  tcp->read_closure.cb = tcp_handle_read;
   tcp->read_closure.cb_arg = tcp;
-  tcp->write_closure.cb = grpc_tcp_handle_write;
+  tcp->write_closure.cb = tcp_handle_write;
   tcp->write_closure.cb_arg = tcp;
 
-  tcp->handle_read_closure.cb = grpc_tcp_handle_read;
+  tcp->handle_read_closure.cb = tcp_handle_read;
   return &tcp->base;
 }
 
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 123f46d..1cf2ca2 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,13 +82,11 @@
   /* Refcounting how many operations are in progress. */
   gpr_refcount refcount;
 
-  grpc_endpoint_read_cb read_cb;
-  void *read_user_data;
+  grpc_iomgr_closure *read_cb;
+  grpc_iomgr_closure *write_cb;
   gpr_slice read_slice;
-
-  grpc_endpoint_write_cb write_cb;
-  void *write_user_data;
-  gpr_slice_buffer write_slices;
+  gpr_slice_buffer *write_slices;
+  gpr_slice_buffer *read_slices;
 
   /* The IO Completion Port runs from another thread. We need some mechanism
      to protect ourselves when requesting a shutdown. */
@@ -102,7 +100,6 @@
 
 static void tcp_unref(grpc_tcp *tcp) {
   if (gpr_unref(&tcp->refcount)) {
-    gpr_slice_buffer_destroy(&tcp->write_slices);
     grpc_winsocket_orphan(tcp->socket);
     gpr_mu_destroy(&tcp->mu);
     gpr_free(tcp->peer_string);
@@ -111,21 +108,16 @@
 }
 
 /* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(void *tcpp, int from_iocp) {
-  grpc_tcp *tcp = (grpc_tcp *)tcpp;
+static int on_read(grpc_tcp *tcp, int from_iocp) {
   grpc_winsocket *socket = tcp->socket;
   gpr_slice sub;
   gpr_slice *slice = NULL;
   size_t nslices = 0;
-  grpc_endpoint_cb_status status;
-  grpc_endpoint_read_cb cb;
+  int success;
   grpc_winsocket_callback_info *info = &socket->read_info;
-  void *opaque = tcp->read_user_data;
   int do_abort = 0;
 
   gpr_mu_lock(&tcp->mu);
-  cb = tcp->read_cb;
-  tcp->read_cb = NULL;
   if (!from_iocp || tcp->shutting_down) {
     /* If we are here with from_iocp set to true, it means we got raced to
     shutting down the endpoint. No actual abort callback will happen
@@ -140,8 +132,7 @@
       gpr_slice_unref(tcp->read_slice);
     }
     tcp_unref(tcp);
-    if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
-    return;
+    return 0;
   }
 
   GPR_ASSERT(tcp->socket->read_info.outstanding);
@@ -152,27 +143,35 @@
       gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
       gpr_free(utf8_message);
     }
-    status = GRPC_ENDPOINT_CB_ERROR;
+    success = 0;
   } else {
     if (info->bytes_transfered != 0) {
       sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
-      status = GRPC_ENDPOINT_CB_OK;
-      slice = &sub;
-      nslices = 1;
+      gpr_slice_buffer_add(tcp->read_slices, sub);
+      success = 1;
     } else {
       gpr_slice_unref(tcp->read_slice);
-      status = GRPC_ENDPOINT_CB_EOF;
+      success = 0;
     }
   }
 
   tcp->socket->read_info.outstanding = 0;
 
-  tcp_unref(tcp);
-  cb(opaque, slice, nslices, status);
+  return success;
 }
 
-static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
-                               void *arg) {
+static void on_read_cb(void *tcpp, int from_iocp) {
+  grpc_tcp *tcp = tcpp;
+  grpc_iomgr_closure *cb = tcp->read_cb;
+  int success = on_read(tcp, from_iocp);
+  tcp->read_cb = NULL;
+  tcp_unref(tcp);
+  cb->cb(cb->cb_arg, success);
+}
+
+static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
+                                        gpr_slice_buffer *read_slices,
+                                        grpc_iomgr_closure *cb) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_winsocket *handle = tcp->socket;
   grpc_winsocket_callback_info *info = &handle->read_info;
@@ -183,13 +182,11 @@
 
   GPR_ASSERT(!tcp->socket->read_info.outstanding);
   if (tcp->shutting_down) {
-    cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
-    return;
+    return GRPC_ENDPOINT_ERROR;
   }
-  tcp_ref(tcp);
   tcp->socket->read_info.outstanding = 1;
   tcp->read_cb = cb;
-  tcp->read_user_data = arg;
+  tcp->read_slices = read_slices;
 
   tcp->read_slice = gpr_slice_malloc(8192);
 
@@ -204,9 +201,8 @@
   /* Did we get data immediately ? Yay. */
   if (info->wsa_error != WSAEWOULDBLOCK) {
     info->bytes_transfered = bytes_read;
-    /* This might heavily recurse. */
-    on_read(tcp, 1);
-    return;
+    gpr_log(GPR_DEBUG, "immread: %d bytes", bytes_read);
+    return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
   }
 
   /* Otherwise, let's retry, by queuing a read. */
@@ -218,12 +214,14 @@
     int wsa_error = WSAGetLastError();
     if (wsa_error != WSA_IO_PENDING) {
       info->wsa_error = wsa_error;
-      on_read(tcp, 1);
-      return;
+      gpr_log(GPR_DEBUG, "immread: err=%d", wsa_error);
+      return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
     }
   }
 
-  grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
+  tcp_ref(tcp);
+  grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
+  return GRPC_ENDPOINT_PENDING;
 }
 
 /* Asynchronous callback from the IOCP, or the background thread. */
@@ -231,9 +229,8 @@
   grpc_tcp *tcp = (grpc_tcp *)tcpp;
   grpc_winsocket *handle = tcp->socket;
   grpc_winsocket_callback_info *info = &handle->write_info;
-  grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
-  grpc_endpoint_write_cb cb;
-  void *opaque = tcp->write_user_data;
+  grpc_iomgr_closure *cb;
+  int success;
   int do_abort = 0;
 
   gpr_mu_lock(&tcp->mu);
@@ -250,10 +247,11 @@
   if (do_abort) {
     if (from_iocp) {
       tcp->socket->write_info.outstanding = 0;
-      gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
     }
     tcp_unref(tcp);
-    if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
+    if (cb) {
+      cb->cb(cb->cb_arg, 0);
+    }
     return;
   }
 
@@ -265,23 +263,22 @@
       gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
       gpr_free(utf8_message);
     }
-    status = GRPC_ENDPOINT_CB_ERROR;
+    success = 0;
   } else {
-    GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
+    GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
+    success = 1;
   }
 
-  gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
   tcp->socket->write_info.outstanding = 0;
 
   tcp_unref(tcp);
-  cb(opaque, status);
+  cb->cb(cb->cb_arg, success);
 }
 
 /* Initiates a write. */
-static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
-                                            gpr_slice *slices, size_t nslices,
-                                            grpc_endpoint_write_cb cb,
-                                            void *arg) {
+static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
+                                         gpr_slice_buffer *slices,
+                                         grpc_iomgr_closure *cb) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_winsocket *socket = tcp->socket;
   grpc_winsocket_callback_info *info = &socket->write_info;
@@ -294,28 +291,26 @@
 
   GPR_ASSERT(!tcp->socket->write_info.outstanding);
   if (tcp->shutting_down) {
-    return GRPC_ENDPOINT_WRITE_ERROR;
+    return GRPC_ENDPOINT_ERROR;
   }
   tcp_ref(tcp);
 
   tcp->socket->write_info.outstanding = 1;
   tcp->write_cb = cb;
-  tcp->write_user_data = arg;
+  tcp->write_slices = slices;
 
-  gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
-
-  if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
-    buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
+  if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
+    buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
     allocated = buffers;
   }
 
-  for (i = 0; i < tcp->write_slices.count; i++) {
-    buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]);
-    buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
+  for (i = 0; i < tcp->write_slices->count; i++) {
+    buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
+    buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
   }
 
   /* First, let's try a synchronous, non-blocking write. */
-  status = WSASend(socket->socket, buffers, tcp->write_slices.count,
+  status = WSASend(socket->socket, buffers, tcp->write_slices->count,
                    &bytes_sent, 0, NULL, NULL);
   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
 
@@ -323,10 +318,10 @@
      connection that has its send queue filled up. But if we don't, then we can
      avoid doing an async write operation at all. */
   if (info->wsa_error != WSAEWOULDBLOCK) {
-    grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
+    grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
     if (status == 0) {
-      ret = GRPC_ENDPOINT_WRITE_DONE;
-      GPR_ASSERT(bytes_sent == tcp->write_slices.length);
+      ret = GRPC_ENDPOINT_DONE;
+      GPR_ASSERT(bytes_sent == tcp->write_slices->length);
     } else {
       if (socket->read_info.wsa_error != WSAECONNRESET) {
         char *utf8_message = gpr_format_message(info->wsa_error);
@@ -335,7 +330,6 @@
       }
     }
     if (allocated) gpr_free(allocated);
-    gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
     tcp->socket->write_info.outstanding = 0;
     tcp_unref(tcp);
     return ret;
@@ -344,24 +338,23 @@
   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
      operation, this time asynchronously. */
   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
-  status = WSASend(socket->socket, buffers, tcp->write_slices.count,
+  status = WSASend(socket->socket, buffers, tcp->write_slices->count,
                    &bytes_sent, 0, &socket->write_info.overlapped, NULL);
   if (allocated) gpr_free(allocated);
 
   if (status != 0) {
     int wsa_error = WSAGetLastError();
     if (wsa_error != WSA_IO_PENDING) {
-      gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
       tcp->socket->write_info.outstanding = 0;
       tcp_unref(tcp);
-      return GRPC_ENDPOINT_WRITE_ERROR;
+      return GRPC_ENDPOINT_ERROR;
     }
   }
 
   /* As all is now setup, we can now ask for the IOCP notification. It may
      trigger the callback immediately however, but no matter. */
   grpc_socket_notify_on_write(socket, on_write, tcp);
-  return GRPC_ENDPOINT_WRITE_PENDING;
+  return GRPC_ENDPOINT_PENDING;
 }
 
 static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
@@ -407,8 +400,8 @@
 }
 
 static grpc_endpoint_vtable vtable = {
-    win_notify_on_read, win_write,   win_add_to_pollset, win_add_to_pollset_set,
-    win_shutdown,       win_destroy, win_get_peer};
+    win_read,     win_write,   win_add_to_pollset, win_add_to_pollset_set,
+    win_shutdown, win_destroy, win_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
@@ -416,7 +409,6 @@
   tcp->base.vtable = &vtable;
   tcp->socket = socket;
   gpr_mu_init(&tcp->mu);
-  gpr_slice_buffer_init(&tcp->write_slices);
   gpr_ref_init(&tcp->refcount, 1);
   tcp->peer_string = gpr_strdup(peer_string);
   return &tcp->base;
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 81b3e33..b696e38 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,15 +49,15 @@
   struct tsi_frame_protector *protector;
   gpr_mu protector_mu;
   /* saved upper level callbacks and user_data. */
-  grpc_endpoint_read_cb read_cb;
-  void *read_user_data;
-  grpc_endpoint_write_cb write_cb;
-  void *write_user_data;
+  grpc_iomgr_closure *read_cb;
+  grpc_iomgr_closure *write_cb;
+  grpc_iomgr_closure on_read;
+  gpr_slice_buffer *read_buffer;
+  gpr_slice_buffer source_buffer;
   /* saved handshaker leftover data to unprotect. */
   gpr_slice_buffer leftover_bytes;
   /* buffers for read and write */
   gpr_slice read_staging_buffer;
-  gpr_slice_buffer input_buffer;
 
   gpr_slice write_staging_buffer;
   gpr_slice_buffer output_buffer;
@@ -67,62 +67,91 @@
 
 int grpc_trace_secure_endpoint = 0;
 
-static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
-
 static void destroy(secure_endpoint *secure_ep) {
   secure_endpoint *ep = secure_ep;
   grpc_endpoint_destroy(ep->wrapped_ep);
   tsi_frame_protector_destroy(ep->protector);
   gpr_slice_buffer_destroy(&ep->leftover_bytes);
   gpr_slice_unref(ep->read_staging_buffer);
-  gpr_slice_buffer_destroy(&ep->input_buffer);
   gpr_slice_unref(ep->write_staging_buffer);
   gpr_slice_buffer_destroy(&ep->output_buffer);
+  gpr_slice_buffer_destroy(&ep->source_buffer);
   gpr_mu_destroy(&ep->protector_mu);
   gpr_free(ep);
 }
 
+/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
+#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
+#define SECURE_ENDPOINT_UNREF(ep, reason) \
+  secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
+#define SECURE_ENDPOINT_REF(ep, reason) \
+  secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
+static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
+                                  const char *file, int line) {
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
+          ep, reason, ep->ref.count, ep->ref.count - 1);
+  if (gpr_unref(&ep->ref)) {
+    destroy(ep);
+  }
+}
+
+static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
+                                const char *file, int line) {
+  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP   ref %p : %s %d -> %d",
+          ep, reason, ep->ref.count, ep->ref.count + 1);
+  gpr_ref(&ep->ref);
+}
+#else
+#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
 static void secure_endpoint_unref(secure_endpoint *ep) {
   if (gpr_unref(&ep->ref)) {
     destroy(ep);
   }
 }
 
+static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
+#endif
+
 static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
                                       gpr_uint8 **end) {
-  gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer);
+  gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
   ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
   *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
   *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
 }
 
-static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices,
-                         grpc_endpoint_cb_status error) {
+static void call_read_cb(secure_endpoint *ep, int success) {
   if (grpc_trace_secure_endpoint) {
     size_t i;
-    for (i = 0; i < nslices; i++) {
-      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < ep->read_buffer->count; i++) {
+      char *data = gpr_dump_slice(ep->read_buffer->slices[i],
+                                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
       gpr_free(data);
     }
   }
-  ep->read_cb(ep->read_user_data, slices, nslices, error);
-  secure_endpoint_unref(ep);
+  ep->read_buffer = NULL;
+  ep->read_cb->cb(ep->read_cb->cb_arg, success);
+  SECURE_ENDPOINT_UNREF(ep, "read");
 }
 
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
-                    grpc_endpoint_cb_status error) {
+static int on_read(void *user_data, int success) {
   unsigned i;
   gpr_uint8 keep_looping = 0;
-  size_t input_buffer_count = 0;
   tsi_result result = TSI_OK;
   secure_endpoint *ep = (secure_endpoint *)user_data;
   gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
   gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
 
+  if (!success) {
+    gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+    return 0;
+  }
+
   /* TODO(yangg) check error, maybe bail out early */
-  for (i = 0; i < nslices; i++) {
-    gpr_slice encrypted = slices[i];
+  for (i = 0; i < ep->source_buffer.count; i++) {
+    gpr_slice encrypted = ep->source_buffer.slices[i];
     gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
     size_t message_size = GPR_SLICE_LENGTH(encrypted);
 
@@ -161,7 +190,7 @@
 
   if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
     gpr_slice_buffer_add(
-        &ep->input_buffer,
+        ep->read_buffer,
         gpr_slice_split_head(
             &ep->read_staging_buffer,
             (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@@ -169,38 +198,53 @@
 
   /* TODO(yangg) experiment with moving this block after read_cb to see if it
      helps latency */
-  for (i = 0; i < nslices; i++) {
-    gpr_slice_unref(slices[i]);
-  }
+  gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
 
   if (result != TSI_OK) {
-    gpr_slice_buffer_reset_and_unref(&ep->input_buffer);
-    call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
-    return;
+    gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+    return 0;
   }
-  /* The upper level will unref the slices. */
-  input_buffer_count = ep->input_buffer.count;
-  ep->input_buffer.count = 0;
-  call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
+
+  return 1;
 }
 
-static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
-                                    grpc_endpoint_read_cb cb, void *user_data) {
-  secure_endpoint *ep = (secure_endpoint *)secure_ep;
-  ep->read_cb = cb;
-  ep->read_user_data = user_data;
+static void on_read_cb(void *user_data, int success) {
+  call_read_cb(user_data, on_read(user_data, success));
+}
 
-  secure_endpoint_ref(ep);
+static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
+                                             gpr_slice_buffer *slices,
+                                             grpc_iomgr_closure *cb) {
+  secure_endpoint *ep = (secure_endpoint *)secure_ep;
+  int immediate_read_success = -1;
+  ep->read_cb = cb;
+  ep->read_buffer = slices;
+  gpr_slice_buffer_reset_and_unref(ep->read_buffer);
 
   if (ep->leftover_bytes.count) {
-    size_t leftover_nslices = ep->leftover_bytes.count;
-    ep->leftover_bytes.count = 0;
-    on_read(ep, ep->leftover_bytes.slices, leftover_nslices,
-            GRPC_ENDPOINT_CB_OK);
-    return;
+    gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
+    GPR_ASSERT(ep->leftover_bytes.count == 0);
+    return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
   }
 
-  grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
+  SECURE_ENDPOINT_REF(ep, "read");
+
+  switch (
+      grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
+    case GRPC_ENDPOINT_DONE:
+      immediate_read_success = on_read(ep, 1);
+      break;
+    case GRPC_ENDPOINT_PENDING:
+      return GRPC_ENDPOINT_PENDING;
+    case GRPC_ENDPOINT_ERROR:
+      immediate_read_success = on_read(ep, 0);
+      break;
+  }
+
+  GPR_ASSERT(immediate_read_success != -1);
+  SECURE_ENDPOINT_UNREF(ep, "read");
+
+  return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
 }
 
 static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -211,36 +255,28 @@
   *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
 }
 
-static void on_write(void *data, grpc_endpoint_cb_status error) {
-  secure_endpoint *ep = data;
-  ep->write_cb(ep->write_user_data, error);
-  secure_endpoint_unref(ep);
-}
-
-static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
-                                                 gpr_slice *slices,
-                                                 size_t nslices,
-                                                 grpc_endpoint_write_cb cb,
-                                                 void *user_data) {
+static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
+                                              gpr_slice_buffer *slices,
+                                              grpc_iomgr_closure *cb) {
   unsigned i;
-  size_t output_buffer_count = 0;
   tsi_result result = TSI_OK;
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
   gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
   gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
-  grpc_endpoint_write_status status;
-  GPR_ASSERT(ep->output_buffer.count == 0);
+
+  gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
 
   if (grpc_trace_secure_endpoint) {
-    for (i = 0; i < nslices; i++) {
-      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < slices->count; i++) {
+      char *data =
+          gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
       gpr_free(data);
     }
   }
 
-  for (i = 0; i < nslices; i++) {
-    gpr_slice plain = slices[i];
+  for (i = 0; i < slices->count; i++) {
+    gpr_slice plain = slices->slices[i];
     gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
     size_t message_size = GPR_SLICE_LENGTH(plain);
     while (message_size > 0) {
@@ -290,29 +326,13 @@
     }
   }
 
-  for (i = 0; i < nslices; i++) {
-    gpr_slice_unref(slices[i]);
-  }
-
   if (result != TSI_OK) {
     /* TODO(yangg) do different things according to the error type? */
     gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
-    return GRPC_ENDPOINT_WRITE_ERROR;
+    return GRPC_ENDPOINT_ERROR;
   }
 
-  /* clear output_buffer and let the lower level handle its slices. */
-  output_buffer_count = ep->output_buffer.count;
-  ep->output_buffer.count = 0;
-  ep->write_cb = cb;
-  ep->write_user_data = user_data;
-  /* Need to keep the endpoint alive across a transport */
-  secure_endpoint_ref(ep);
-  status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
-                               output_buffer_count, on_write, ep);
-  if (status != GRPC_ENDPOINT_WRITE_PENDING) {
-    secure_endpoint_unref(ep);
-  }
-  return status;
+  return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
 }
 
 static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@@ -320,9 +340,9 @@
   grpc_endpoint_shutdown(ep->wrapped_ep);
 }
 
-static void endpoint_unref(grpc_endpoint *secure_ep) {
+static void endpoint_destroy(grpc_endpoint *secure_ep) {
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
-  secure_endpoint_unref(ep);
+  SECURE_ENDPOINT_UNREF(ep, "destroy");
 }
 
 static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@@ -343,9 +363,9 @@
 }
 
 static const grpc_endpoint_vtable vtable = {
-    endpoint_notify_on_read, endpoint_write,
+    endpoint_read,           endpoint_write,
     endpoint_add_to_pollset, endpoint_add_to_pollset_set,
-    endpoint_shutdown,       endpoint_unref,
+    endpoint_shutdown,       endpoint_destroy,
     endpoint_get_peer};
 
 grpc_endpoint *grpc_secure_endpoint_create(
@@ -363,8 +383,10 @@
   }
   ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
   ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
-  gpr_slice_buffer_init(&ep->input_buffer);
   gpr_slice_buffer_init(&ep->output_buffer);
+  gpr_slice_buffer_init(&ep->source_buffer);
+  ep->read_buffer = NULL;
+  grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
   gpr_mu_init(&ep->protector_mu);
   gpr_ref_init(&ep->ref, 1);
   return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 0c3572b..bf00795 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -50,16 +50,17 @@
   grpc_endpoint *wrapped_endpoint;
   grpc_endpoint *secure_endpoint;
   gpr_slice_buffer left_overs;
+  gpr_slice_buffer incoming;
+  gpr_slice_buffer outgoing;
   grpc_secure_transport_setup_done_cb cb;
   void *user_data;
+  grpc_iomgr_closure on_handshake_data_sent_to_peer;
+  grpc_iomgr_closure on_handshake_data_received_from_peer;
 } grpc_secure_transport_setup;
 
-static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices,
-                                                 size_t nslices,
-                                                 grpc_endpoint_cb_status error);
+static void on_handshake_data_received_from_peer(void *setup, int success);
 
-static void on_handshake_data_sent_to_peer(void *setup,
-                                           grpc_endpoint_cb_status error);
+static void on_handshake_data_sent_to_peer(void *setup, int success);
 
 static void secure_transport_setup_done(grpc_secure_transport_setup *s,
                                         int is_success) {
@@ -78,6 +79,8 @@
   if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
   if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
   gpr_slice_buffer_destroy(&s->left_overs);
+  gpr_slice_buffer_destroy(&s->outgoing);
+  gpr_slice_buffer_destroy(&s->incoming);
   GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
   gpr_free(s);
 }
@@ -102,6 +105,8 @@
   s->secure_endpoint =
       grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
                                   s->left_overs.slices, s->left_overs.count);
+  s->left_overs.count = 0;
+  s->left_overs.length = 0;
   secure_transport_setup_done(s, 1);
   return;
 }
@@ -132,7 +137,6 @@
   size_t offset = 0;
   tsi_result result = TSI_OK;
   gpr_slice to_send;
-  grpc_endpoint_write_status write_status;
 
   do {
     size_t to_send_size = s->handshake_buffer_size - offset;
@@ -155,28 +159,25 @@
 
   to_send =
       gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
+  gpr_slice_buffer_reset_and_unref(&s->outgoing);
+  gpr_slice_buffer_add(&s->outgoing, to_send);
   /* TODO(klempner,jboeuf): This should probably use the client setup
          deadline */
-  write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
-                                     on_handshake_data_sent_to_peer, s);
-  if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
-    gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
-    secure_transport_setup_done(s, 0);
-  } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
-    on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK);
+  switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
+                              &s->on_handshake_data_sent_to_peer)) {
+    case GRPC_ENDPOINT_ERROR:
+      gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
+      secure_transport_setup_done(s, 0);
+      break;
+    case GRPC_ENDPOINT_DONE:
+      on_handshake_data_sent_to_peer(s, 1);
+      break;
+    case GRPC_ENDPOINT_PENDING:
+      break;
   }
 }
 
-static void cleanup_slices(gpr_slice *slices, size_t num_slices) {
-  size_t i;
-  for (i = 0; i < num_slices; i++) {
-    gpr_slice_unref(slices[i]);
-  }
-}
-
-static void on_handshake_data_received_from_peer(
-    void *setup, gpr_slice *slices, size_t nslices,
-    grpc_endpoint_cb_status error) {
+static void on_handshake_data_received_from_peer(void *setup, int success) {
   grpc_secure_transport_setup *s = setup;
   size_t consumed_slice_size = 0;
   tsi_result result = TSI_OK;
@@ -184,32 +185,37 @@
   size_t num_left_overs;
   int has_left_overs_in_current_slice = 0;
 
-  if (error != GRPC_ENDPOINT_CB_OK) {
+  if (!success) {
     gpr_log(GPR_ERROR, "Read failed.");
-    cleanup_slices(slices, nslices);
     secure_transport_setup_done(s, 0);
     return;
   }
 
-  for (i = 0; i < nslices; i++) {
-    consumed_slice_size = GPR_SLICE_LENGTH(slices[i]);
+  for (i = 0; i < s->incoming.count; i++) {
+    consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
     result = tsi_handshaker_process_bytes_from_peer(
-        s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size);
+        s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
+        &consumed_slice_size);
     if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
   }
 
   if (tsi_handshaker_is_in_progress(s->handshaker)) {
     /* We may need more data. */
     if (result == TSI_INCOMPLETE_DATA) {
-      /* TODO(klempner,jboeuf): This should probably use the client setup
-         deadline */
-      grpc_endpoint_notify_on_read(s->wrapped_endpoint,
-                                   on_handshake_data_received_from_peer, setup);
-      cleanup_slices(slices, nslices);
+      switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+                                 &s->on_handshake_data_received_from_peer)) {
+        case GRPC_ENDPOINT_DONE:
+          on_handshake_data_received_from_peer(s, 1);
+          break;
+        case GRPC_ENDPOINT_ERROR:
+          on_handshake_data_received_from_peer(s, 0);
+          break;
+        case GRPC_ENDPOINT_PENDING:
+          break;
+      }
       return;
     } else {
       send_handshake_bytes_to_peer(s);
-      cleanup_slices(slices, nslices);
       return;
     }
   }
@@ -217,42 +223,40 @@
   if (result != TSI_OK) {
     gpr_log(GPR_ERROR, "Handshake failed with error %s",
             tsi_result_to_string(result));
-    cleanup_slices(slices, nslices);
     secure_transport_setup_done(s, 0);
     return;
   }
 
   /* Handshake is done and successful this point. */
   has_left_overs_in_current_slice =
-      (consumed_slice_size < GPR_SLICE_LENGTH(slices[i]));
-  num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1;
+      (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
+  num_left_overs =
+      (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
   if (num_left_overs == 0) {
-    cleanup_slices(slices, nslices);
     check_peer(s);
     return;
   }
-  cleanup_slices(slices, nslices - num_left_overs);
-
   /* Put the leftovers in our buffer (ownership transfered). */
   if (has_left_overs_in_current_slice) {
-    gpr_slice_buffer_add(&s->left_overs,
-                         gpr_slice_split_tail(&slices[i], consumed_slice_size));
-    gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */
+    gpr_slice_buffer_add(
+        &s->left_overs,
+        gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
+    gpr_slice_unref(
+        s->incoming.slices[i]); /* split_tail above increments refcount. */
   }
   gpr_slice_buffer_addn(
-      &s->left_overs, &slices[i + 1],
+      &s->left_overs, &s->incoming.slices[i + 1],
       num_left_overs - (size_t)has_left_overs_in_current_slice);
   check_peer(s);
 }
 
 /* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup,
-                                           grpc_endpoint_cb_status error) {
+static void on_handshake_data_sent_to_peer(void *setup, int success) {
   grpc_secure_transport_setup *s = setup;
 
   /* Make sure that write is OK. */
-  if (error != GRPC_ENDPOINT_CB_OK) {
-    gpr_log(GPR_ERROR, "Write failed with error %d.", error);
+  if (!success) {
+    gpr_log(GPR_ERROR, "Write failed.");
     if (setup != NULL) secure_transport_setup_done(s, 0);
     return;
   }
@@ -261,8 +265,17 @@
   if (tsi_handshaker_is_in_progress(s->handshaker)) {
     /* TODO(klempner,jboeuf): This should probably use the client setup
        deadline */
-    grpc_endpoint_notify_on_read(s->wrapped_endpoint,
-                                 on_handshake_data_received_from_peer, setup);
+    switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+                               &s->on_handshake_data_received_from_peer)) {
+      case GRPC_ENDPOINT_ERROR:
+        on_handshake_data_received_from_peer(s, 0);
+        break;
+      case GRPC_ENDPOINT_PENDING:
+        break;
+      case GRPC_ENDPOINT_DONE:
+        on_handshake_data_received_from_peer(s, 1);
+        break;
+    }
   } else {
     check_peer(s);
   }
@@ -288,6 +301,12 @@
   s->wrapped_endpoint = nonsecure_endpoint;
   s->user_data = user_data;
   s->cb = cb;
+  grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
+                          on_handshake_data_sent_to_peer, s);
+  grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
+                          on_handshake_data_received_from_peer, s);
   gpr_slice_buffer_init(&s->left_overs);
+  gpr_slice_buffer_init(&s->outgoing);
+  gpr_slice_buffer_init(&s->incoming);
   send_handshake_bytes_to_peer(s);
 }
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 987d5cb..6482ef9 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -207,3 +207,25 @@
   src->count = 0;
   src->length = 0;
 }
+
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
+  GPR_ASSERT(n <= sb->length);
+  sb->length -= n;
+  for (;;) {
+    size_t idx = sb->count - 1;
+    gpr_slice slice = sb->slices[idx];
+    size_t slice_len = GPR_SLICE_LENGTH(slice);
+    if (slice_len > n) {
+      sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
+      return;
+    } else if (slice_len == n) {
+      gpr_slice_unref(slice);
+      sb->count = idx;
+      return;
+    } else {
+      gpr_slice_unref(slice);
+      n -= slice_len;
+      sb->count = idx;
+    }
+  }
+}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 42cf0ec..a1b773b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -214,6 +214,8 @@
   grpc_chttp2_hpack_compressor hpack_compressor;
   /** is this a client? */
   gpr_uint8 is_client;
+  /** callback for when writing is done */
+  grpc_iomgr_closure done_cb;
 } grpc_chttp2_transport_writing;
 
 struct grpc_chttp2_transport_parsing {
@@ -329,8 +331,11 @@
 
   /** closure to execute writing */
   grpc_iomgr_closure writing_action;
-  /** closure to start reading from the endpoint */
-  grpc_iomgr_closure reading_action;
+  /** closure to finish reading from the endpoint */
+  grpc_iomgr_closure recv_data;
+
+  /** incoming read bytes */
+  gpr_slice_buffer read_buffer;
 
   /** address to place a newly accepted stream - set and unset by
       grpc_chttp2_parsing_accept_stream; used by init_stream to
@@ -463,8 +468,7 @@
                                        grpc_chttp2_transport_writing *writing);
 void grpc_chttp2_perform_writes(
     grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(
-    grpc_chttp2_transport_writing *transport_writing, int success);
+void grpc_chttp2_terminate_writing(void *transport_writing, int success);
 void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
                                  grpc_chttp2_transport_writing *writing);
 
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 123061b..2c8c48f 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -37,7 +37,6 @@
 #include <grpc/support/log.h>
 
 static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
 
 int grpc_chttp2_unlocking_check_writes(
     grpc_chttp2_transport_global *transport_global,
@@ -165,16 +164,15 @@
   GPR_ASSERT(transport_writing->outbuf.count > 0);
   GPR_ASSERT(endpoint);
 
-  switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
-                              transport_writing->outbuf.count, finish_write_cb,
-                              transport_writing)) {
-    case GRPC_ENDPOINT_WRITE_DONE:
+  switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
+                              &transport_writing->done_cb)) {
+    case GRPC_ENDPOINT_DONE:
       grpc_chttp2_terminate_writing(transport_writing, 1);
       break;
-    case GRPC_ENDPOINT_WRITE_ERROR:
+    case GRPC_ENDPOINT_ERROR:
       grpc_chttp2_terminate_writing(transport_writing, 0);
       break;
-    case GRPC_ENDPOINT_WRITE_PENDING:
+    case GRPC_ENDPOINT_PENDING:
       break;
   }
 }
@@ -209,12 +207,6 @@
   }
 }
 
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
-  grpc_chttp2_transport_writing *transport_writing = tw;
-  grpc_chttp2_terminate_writing(transport_writing,
-                                write_status == GRPC_ENDPOINT_CB_OK);
-}
-
 void grpc_chttp2_cleanup_writing(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_transport_writing *transport_writing) {
@@ -243,6 +235,5 @@
     grpc_chttp2_list_add_read_write_state_changed(transport_global,
                                                   stream_global);
   }
-  transport_writing->outbuf.count = 0;
-  transport_writing->outbuf.length = 0;
+  gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
 }
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1bbd210..8caa10c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -84,15 +84,13 @@
 
 /* forward declarations of various callbacks that we'll build closures around */
 static void writing_action(void *t, int iomgr_success_ignored);
-static void reading_action(void *t, int iomgr_success_ignored);
 
 /** Set a transport level setting, and push it to our peer */
 static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
                          gpr_uint32 value);
 
 /** Endpoint callback to process incoming data */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
-                      grpc_endpoint_cb_status error);
+static void recv_data(void *tp, int success);
 
 /** Start disconnection chain */
 static void drop_connection(grpc_chttp2_transport *t);
@@ -143,6 +141,7 @@
   grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
 
   gpr_slice_buffer_destroy(&t->parsing.qbuf);
+  gpr_slice_buffer_destroy(&t->read_buffer);
   grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
   grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
 
@@ -249,12 +248,16 @@
   gpr_slice_buffer_init(&t->writing.outbuf);
   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
   grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
-  grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
 
   gpr_slice_buffer_init(&t->parsing.qbuf);
   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
   grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
 
+  grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+                          &t->writing);
+  grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
+  gpr_slice_buffer_init(&t->read_buffer);
+
   if (is_client) {
     gpr_slice_buffer_add(
         &t->global.qbuf,
@@ -502,8 +505,8 @@
   }
 }
 
-void grpc_chttp2_terminate_writing(
-    grpc_chttp2_transport_writing *transport_writing, int success) {
+void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
+  grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
   grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
 
   lock(t);
@@ -1060,74 +1063,76 @@
 }
 
 /* tcp read callback */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
-                      grpc_endpoint_cb_status error) {
-  grpc_chttp2_transport *t = tp;
+static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
   size_t i;
-  int unref = 0;
+  int keep_reading = 0;
 
-  switch (error) {
-    case GRPC_ENDPOINT_CB_SHUTDOWN:
-    case GRPC_ENDPOINT_CB_EOF:
-    case GRPC_ENDPOINT_CB_ERROR:
-      lock(t);
+  lock(t);
+  i = 0;
+  GPR_ASSERT(!t->parsing_active);
+  if (!t->closed) {
+    t->parsing_active = 1;
+    /* merge stream lists */
+    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                     &t->parsing_stream_map);
+    grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+    gpr_mu_unlock(&t->mu);
+    for (; i < t->read_buffer.count &&
+           grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
+         i++)
+      ;
+    gpr_mu_lock(&t->mu);
+    if (i != t->read_buffer.count) {
       drop_connection(t);
-      read_error_locked(t);
-      unlock(t);
-      unref = 1;
-      for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
-      break;
-    case GRPC_ENDPOINT_CB_OK:
-      lock(t);
-      i = 0;
-      GPR_ASSERT(!t->parsing_active);
-      if (!t->closed) {
-        t->parsing_active = 1;
-        /* merge stream lists */
-        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                         &t->parsing_stream_map);
-        grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
-        gpr_mu_unlock(&t->mu);
-        for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
-             i++) {
-          gpr_slice_unref(slices[i]);
-        }
-        gpr_mu_lock(&t->mu);
-        if (i != nslices) {
-          drop_connection(t);
-        }
-        /* merge stream lists */
-        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                         &t->parsing_stream_map);
-        t->global.concurrent_stream_count =
-            grpc_chttp2_stream_map_size(&t->parsing_stream_map);
-        if (t->parsing.initial_window_update != 0) {
-          grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
-                                          update_global_window, t);
-          t->parsing.initial_window_update = 0;
-        }
-        /* handle higher level things */
-        grpc_chttp2_publish_reads(&t->global, &t->parsing);
-        t->parsing_active = 0;
-      }
-      if (i == nslices) {
-        grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
-      } else {
-        read_error_locked(t);
-        unref = 1;
-      }
-      unlock(t);
-      for (; i < nslices; i++) gpr_slice_unref(slices[i]);
-      break;
+    }
+    /* merge stream lists */
+    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                     &t->parsing_stream_map);
+    t->global.concurrent_stream_count =
+        grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+    if (t->parsing.initial_window_update != 0) {
+      grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+                                      update_global_window, t);
+      t->parsing.initial_window_update = 0;
+    }
+    /* handle higher level things */
+    grpc_chttp2_publish_reads(&t->global, &t->parsing);
+    t->parsing_active = 0;
   }
-  if (unref) {
+  if (!*success || i != t->read_buffer.count) {
+    drop_connection(t);
+    read_error_locked(t);
+  } else {
+    keep_reading = 1;
+  }
+  gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+  unlock(t);
+
+  if (keep_reading) {
+    switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
+      case GRPC_ENDPOINT_DONE:
+        *success = 1;
+        return 1;
+      case GRPC_ENDPOINT_ERROR:
+        *success = 0;
+        return 1;
+      case GRPC_ENDPOINT_PENDING:
+        return 0;
+    }
+  } else {
     UNREF_TRANSPORT(t, "recv_data");
+    return 0;
   }
+
+  gpr_log(GPR_ERROR, "should never reach here");
+  abort();
 }
 
-static void reading_action(void *pt, int iomgr_success_ignored) {
-  grpc_chttp2_transport *t = pt;
-  grpc_endpoint_notify_on_read(t->ep, recv_data, t);
+static void recv_data(void *tp, int success) {
+  grpc_chttp2_transport *t = tp;
+
+  while (recv_data_loop(t, &success))
+    ;
 }
 
 /*
@@ -1240,5 +1245,6 @@
                                          gpr_slice *slices, size_t nslices) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
   REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
-  recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
+  gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
+  recv_data(t, 1);
 }
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 24bf5d3..1d98879 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -59,7 +59,7 @@
   gpr_event_set(&a->done_thd, (void *)1);
 }
 
-static void done_write(void *arg, grpc_endpoint_cb_status status) {
+static void done_write(void *arg, int success) {
   thd_args *a = arg;
   gpr_event_set(&a->done_write, (void *)1);
 }
@@ -85,6 +85,8 @@
   grpc_mdctx *mdctx = grpc_mdctx_create();
   gpr_slice slice =
       gpr_slice_from_copied_buffer(client_payload, client_payload_length);
+  gpr_slice_buffer outgoing;
+  grpc_iomgr_closure done_write_closure;
 
   hex = gpr_dump(client_payload, client_payload_length,
                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -122,14 +124,18 @@
   /* Start validator */
   gpr_thd_new(&id, thd_func, &a, NULL);
 
+  gpr_slice_buffer_init(&outgoing);
+  gpr_slice_buffer_add(&outgoing, slice);
+  grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
+
   /* Write data */
-  switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) {
-    case GRPC_ENDPOINT_WRITE_DONE:
+  switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
+    case GRPC_ENDPOINT_DONE:
       done_write(&a, 1);
       break;
-    case GRPC_ENDPOINT_WRITE_PENDING:
+    case GRPC_ENDPOINT_PENDING:
       break;
-    case GRPC_ENDPOINT_WRITE_ERROR:
+    case GRPC_ENDPOINT_ERROR:
       done_write(&a, 0);
       break;
   }
@@ -155,6 +161,7 @@
                  .type == GRPC_OP_COMPLETE);
   grpc_server_destroy(a.server);
   grpc_completion_queue_destroy(a.cq);
+  gpr_slice_buffer_destroy(&outgoing);
 
   grpc_shutdown();
 }
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 6ef8e9c..ef67374 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -59,8 +59,7 @@
 
 static grpc_pollset *g_pollset;
 
-size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
-                              int *current_data) {
+size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
   size_t num_bytes = 0;
   size_t i;
   size_t j;
@@ -72,7 +71,6 @@
       *current_data = (*current_data + 1) % 256;
     }
     num_bytes += GPR_SLICE_LENGTH(slices[i]);
-    gpr_slice_unref(slices[i]);
   }
   return num_bytes;
 }
@@ -121,86 +119,76 @@
   int current_write_data;
   int read_done;
   int write_done;
+  gpr_slice_buffer incoming;
+  gpr_slice_buffer outgoing;
+  grpc_iomgr_closure done_read;
+  grpc_iomgr_closure done_write;
 };
 
-static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
-                                             size_t nslices,
-                                             grpc_endpoint_cb_status error) {
+static void read_and_write_test_read_handler(void *data, int success) {
   struct read_and_write_test_state *state = data;
-  GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
-  if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
-    gpr_log(GPR_INFO, "Read handler shutdown");
-    gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    state->read_done = 1;
-    grpc_pollset_kick(g_pollset, NULL);
-    gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-    return;
-  }
 
-  state->bytes_read +=
-      count_and_unref_slices(slices, nslices, &state->current_read_data);
-  if (state->bytes_read == state->target_bytes) {
+  state->bytes_read += count_slices(
+      state->incoming.slices, state->incoming.count, &state->current_read_data);
+  if (state->bytes_read == state->target_bytes || !success) {
     gpr_log(GPR_INFO, "Read handler done");
     gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    state->read_done = 1;
+    state->read_done = 1 + success;
     grpc_pollset_kick(g_pollset, NULL);
     gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-  } else {
-    grpc_endpoint_notify_on_read(state->read_ep,
-                                 read_and_write_test_read_handler, data);
+  } else if (success) {
+    switch (grpc_endpoint_read(state->read_ep, &state->incoming,
+                               &state->done_read)) {
+      case GRPC_ENDPOINT_ERROR:
+        read_and_write_test_read_handler(data, 0);
+        break;
+      case GRPC_ENDPOINT_DONE:
+        read_and_write_test_read_handler(data, 1);
+        break;
+      case GRPC_ENDPOINT_PENDING:
+        break;
+    }
   }
 }
 
-static void read_and_write_test_write_handler(void *data,
-                                              grpc_endpoint_cb_status error) {
+static void read_and_write_test_write_handler(void *data, int success) {
   struct read_and_write_test_state *state = data;
   gpr_slice *slices = NULL;
   size_t nslices;
-  grpc_endpoint_write_status write_status;
+  grpc_endpoint_op_status write_status;
 
-  GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+  if (success) {
+    for (;;) {
+      /* Need to do inline writes until they don't succeed synchronously or we
+         finish writing */
+      state->bytes_written += state->current_write_size;
+      if (state->target_bytes - state->bytes_written <
+          state->current_write_size) {
+        state->current_write_size = state->target_bytes - state->bytes_written;
+      }
+      if (state->current_write_size == 0) {
+        break;
+      }
 
-  gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
-          error);
-
-  if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
-    gpr_log(GPR_INFO, "Write handler shutdown");
-    gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    state->write_done = 1;
-    grpc_pollset_kick(g_pollset, NULL);
-    gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-    return;
+      slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+                               &state->current_write_data);
+      gpr_slice_buffer_reset_and_unref(&state->outgoing);
+      gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
+      write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
+                                         &state->done_write);
+      gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+      GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
+      free(slices);
+      if (write_status == GRPC_ENDPOINT_PENDING) {
+        return;
+      }
+    }
+    GPR_ASSERT(state->bytes_written == state->target_bytes);
   }
 
-  for (;;) {
-    /* Need to do inline writes until they don't succeed synchronously or we
-       finish writing */
-    state->bytes_written += state->current_write_size;
-    if (state->target_bytes - state->bytes_written <
-        state->current_write_size) {
-      state->current_write_size = state->target_bytes - state->bytes_written;
-    }
-    if (state->current_write_size == 0) {
-      break;
-    }
-
-    slices = allocate_blocks(state->current_write_size, 8192, &nslices,
-                             &state->current_write_data);
-    write_status =
-        grpc_endpoint_write(state->write_ep, slices, nslices,
-                            read_and_write_test_write_handler, state);
-    gpr_log(GPR_DEBUG, "write_status=%d", write_status);
-    GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
-    free(slices);
-    if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
-      return;
-    }
-  }
-  GPR_ASSERT(state->bytes_written == state->target_bytes);
-
   gpr_log(GPR_INFO, "Write handler done");
   gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-  state->write_done = 1;
+  state->write_done = 1 + success;
   grpc_pollset_kick(g_pollset, NULL);
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 }
@@ -234,16 +222,31 @@
   state.write_done = 0;
   state.current_read_data = 0;
   state.current_write_data = 0;
+  grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
+                          &state);
+  grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
+                          &state);
+  gpr_slice_buffer_init(&state.outgoing);
+  gpr_slice_buffer_init(&state.incoming);
 
   /* Get started by pretending an initial write completed */
   /* NOTE: Sets up initial conditions so we can have the same write handler
      for the first iteration as for later iterations. It does the right thing
      even when bytes_written is unsigned. */
   state.bytes_written -= state.current_write_size;
-  read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
+  read_and_write_test_write_handler(&state, 1);
 
-  grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
-                               &state);
+  switch (
+      grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
+    case GRPC_ENDPOINT_PENDING:
+      break;
+    case GRPC_ENDPOINT_ERROR:
+      read_and_write_test_read_handler(&state, 0);
+      break;
+    case GRPC_ENDPOINT_DONE:
+      read_and_write_test_read_handler(&state, 1);
+      break;
+  }
 
   if (shutdown) {
     gpr_log(GPR_DEBUG, "shutdown read");
@@ -263,6 +266,8 @@
 
   grpc_endpoint_destroy(state.read_ep);
   grpc_endpoint_destroy(state.write_ep);
+  gpr_slice_buffer_destroy(&state.outgoing);
+  gpr_slice_buffer_destroy(&state.incoming);
   end_test(config);
 }
 
@@ -273,36 +278,40 @@
 typedef struct {
   int done;
   grpc_endpoint *ep;
+  gpr_slice_buffer incoming;
+  grpc_iomgr_closure done_read;
 } shutdown_during_write_test_state;
 
-static void shutdown_during_write_test_read_handler(
-    void *user_data, gpr_slice *slices, size_t nslices,
-    grpc_endpoint_cb_status error) {
-  size_t i;
+static void shutdown_during_write_test_read_handler(void *user_data,
+                                                    int success) {
   shutdown_during_write_test_state *st = user_data;
 
-  for (i = 0; i < nslices; i++) {
-    gpr_slice_unref(slices[i]);
-  }
-
-  if (error != GRPC_ENDPOINT_CB_OK) {
+  if (!success) {
     grpc_endpoint_destroy(st->ep);
     gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    st->done = error;
+    st->done = 1;
     grpc_pollset_kick(g_pollset, NULL);
     gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
   } else {
-    grpc_endpoint_notify_on_read(
-        st->ep, shutdown_during_write_test_read_handler, user_data);
+    switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
+      case GRPC_ENDPOINT_PENDING:
+        break;
+      case GRPC_ENDPOINT_ERROR:
+        shutdown_during_write_test_read_handler(user_data, 0);
+        break;
+      case GRPC_ENDPOINT_DONE:
+        shutdown_during_write_test_read_handler(user_data, 1);
+        break;
+    }
   }
 }
 
-static void shutdown_during_write_test_write_handler(
-    void *user_data, grpc_endpoint_cb_status error) {
+static void shutdown_during_write_test_write_handler(void *user_data,
+                                                     int success) {
   shutdown_during_write_test_state *st = user_data;
-  gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
-          error);
-  if (error == 0) {
+  gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
+          success);
+  if (success) {
     /* This happens about 0.5% of the time when run under TSAN, and is entirely
        legitimate, but means we aren't testing the path we think we are. */
     /* TODO(klempner): Change this test to retry the write in that case */
@@ -325,6 +334,8 @@
   shutdown_during_write_test_state read_st;
   shutdown_during_write_test_state write_st;
   gpr_slice *slices;
+  gpr_slice_buffer outgoing;
+  grpc_iomgr_closure done_write;
   grpc_endpoint_test_fixture f =
       begin_test(config, "shutdown_during_write_test", slice_size);
 
@@ -335,19 +346,26 @@
   read_st.done = 0;
   write_st.done = 0;
 
-  grpc_endpoint_notify_on_read(
-      read_st.ep, shutdown_during_write_test_read_handler, &read_st);
+  grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
+                          &write_st);
+  grpc_iomgr_closure_init(&read_st.done_read,
+                          shutdown_during_write_test_read_handler, &read_st);
+  gpr_slice_buffer_init(&read_st.incoming);
+  gpr_slice_buffer_init(&outgoing);
+
+  GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
+                                &read_st.done_read) == GRPC_ENDPOINT_PENDING);
   for (size = 1;; size *= 2) {
     slices = allocate_blocks(size, 1, &nblocks, &current_data);
-    switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
-                                shutdown_during_write_test_write_handler,
-                                &write_st)) {
-      case GRPC_ENDPOINT_WRITE_DONE:
+    gpr_slice_buffer_reset_and_unref(&outgoing);
+    gpr_slice_buffer_addn(&outgoing, slices, nblocks);
+    switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
+      case GRPC_ENDPOINT_DONE:
         break;
-      case GRPC_ENDPOINT_WRITE_ERROR:
+      case GRPC_ENDPOINT_ERROR:
         gpr_log(GPR_ERROR, "error writing");
         abort();
-      case GRPC_ENDPOINT_WRITE_PENDING:
+      case GRPC_ENDPOINT_PENDING:
         grpc_endpoint_shutdown(write_st.ep);
         deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
         gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -368,6 +386,8 @@
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         gpr_free(slices);
+        gpr_slice_buffer_destroy(&read_st.incoming);
+        gpr_slice_buffer_destroy(&outgoing);
         end_test(config);
         return;
     }
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 6ad8322..8acaa43 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,10 +118,12 @@
   grpc_endpoint *ep;
   ssize_t read_bytes;
   ssize_t target_read_bytes;
+  gpr_slice_buffer incoming;
+  grpc_iomgr_closure read_cb;
 };
 
-static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
-                                      int *current_data) {
+static ssize_t count_slices(gpr_slice *slices, size_t nslices,
+                            int *current_data) {
   ssize_t num_bytes = 0;
   unsigned i, j;
   unsigned char *buf;
@@ -132,31 +134,41 @@
       *current_data = (*current_data + 1) % 256;
     }
     num_bytes += GPR_SLICE_LENGTH(slices[i]);
-    gpr_slice_unref(slices[i]);
   }
   return num_bytes;
 }
 
-static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
-                    grpc_endpoint_cb_status error) {
+static void read_cb(void *user_data, int success) {
   struct read_socket_state *state = (struct read_socket_state *)user_data;
   ssize_t read_bytes;
   int current_data;
 
-  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+  GPR_ASSERT(success);
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   current_data = state->read_bytes % 256;
-  read_bytes = count_and_unref_slices(slices, nslices, &current_data);
+  read_bytes = count_slices(state->incoming.slices, state->incoming.count,
+                            &current_data);
   state->read_bytes += read_bytes;
   gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
           state->target_read_bytes);
   if (state->read_bytes >= state->target_read_bytes) {
-    /* empty */
+    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   } else {
-    grpc_endpoint_notify_on_read(state->ep, read_cb, state);
+    switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
+      case GRPC_ENDPOINT_DONE:
+        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+        read_cb(user_data, 1);
+        break;
+      case GRPC_ENDPOINT_ERROR:
+        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+        read_cb(user_data, 0);
+        break;
+      case GRPC_ENDPOINT_PENDING:
+        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+        break;
+    }
   }
-  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
 
 /* Write to a socket, then read from it using the grpc_tcp API. */
@@ -181,8 +193,19 @@
   state.ep = ep;
   state.read_bytes = 0;
   state.target_read_bytes = written_bytes;
+  gpr_slice_buffer_init(&state.incoming);
+  grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
 
-  grpc_endpoint_notify_on_read(ep, read_cb, &state);
+  switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+    case GRPC_ENDPOINT_DONE:
+      read_cb(&state, 1);
+      break;
+    case GRPC_ENDPOINT_ERROR:
+      read_cb(&state, 0);
+      break;
+    case GRPC_ENDPOINT_PENDING:
+      break;
+  }
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
@@ -193,6 +216,7 @@
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
+  gpr_slice_buffer_destroy(&state.incoming);
   grpc_endpoint_destroy(ep);
 }
 
@@ -219,8 +243,19 @@
   state.ep = ep;
   state.read_bytes = 0;
   state.target_read_bytes = written_bytes;
+  gpr_slice_buffer_init(&state.incoming);
+  grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
 
-  grpc_endpoint_notify_on_read(ep, read_cb, &state);
+  switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+    case GRPC_ENDPOINT_DONE:
+      read_cb(&state, 1);
+      break;
+    case GRPC_ENDPOINT_ERROR:
+      read_cb(&state, 0);
+      break;
+    case GRPC_ENDPOINT_PENDING:
+      break;
+  }
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
@@ -231,6 +266,7 @@
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
+  gpr_slice_buffer_destroy(&state.incoming);
   grpc_endpoint_destroy(ep);
 }
 
@@ -262,8 +298,7 @@
   return slices;
 }
 
-static void write_done(void *user_data /* write_socket_state */,
-                       grpc_endpoint_cb_status error) {
+static void write_done(void *user_data /* write_socket_state */, int success) {
   struct write_socket_state *state = (struct write_socket_state *)user_data;
   gpr_log(GPR_INFO, "Write done callback called");
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -339,6 +374,8 @@
   size_t num_blocks;
   gpr_slice *slices;
   int current_data = 0;
+  gpr_slice_buffer outgoing;
+  grpc_iomgr_closure write_done_closure;
   gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
 
   gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -355,74 +392,21 @@
 
   slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
 
-  if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
-      GRPC_ENDPOINT_WRITE_DONE) {
-    /* Write completed immediately */
-    read_bytes = drain_socket(sv[0]);
-    GPR_ASSERT(read_bytes == num_bytes);
-  } else {
-    drain_socket_blocking(sv[0], num_bytes, num_bytes);
-    gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
-    for (;;) {
-      grpc_pollset_worker worker;
-      if (state.write_done) {
-        break;
-      }
-      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
-                        deadline);
-    }
-    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
-  }
+  gpr_slice_buffer_init(&outgoing);
+  gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
+  grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
 
-  grpc_endpoint_destroy(ep);
-  gpr_free(slices);
-}
-
-static void read_done_for_write_error(void *ud, gpr_slice *slices,
-                                      size_t nslices,
-                                      grpc_endpoint_cb_status error) {
-  GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
-  GPR_ASSERT(nslices == 0);
-}
-
-/* Write to a socket using the grpc_tcp API, then drain it directly.
-   Note that if the write does not complete immediately we need to drain the
-   socket in parallel with the read. */
-static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
-  int sv[2];
-  grpc_endpoint *ep;
-  struct write_socket_state state;
-  size_t num_blocks;
-  gpr_slice *slices;
-  int current_data = 0;
-  grpc_pollset_worker worker;
-  gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
-
-  gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
-          num_bytes, slice_size);
-
-  create_sockets(sv);
-
-  ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
-                       GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
-  grpc_endpoint_add_to_pollset(ep, &g_pollset);
-
-  close(sv[0]);
-
-  state.ep = ep;
-  state.write_done = 0;
-
-  slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
-
-  switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
-    case GRPC_ENDPOINT_WRITE_DONE:
-    case GRPC_ENDPOINT_WRITE_ERROR:
+  switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
+    case GRPC_ENDPOINT_DONE:
       /* Write completed immediately */
+      read_bytes = drain_socket(sv[0]);
+      GPR_ASSERT(read_bytes == num_bytes);
       break;
-    case GRPC_ENDPOINT_WRITE_PENDING:
-      grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
+    case GRPC_ENDPOINT_PENDING:
+      drain_socket_blocking(sv[0], num_bytes, num_bytes);
       gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
       for (;;) {
+        grpc_pollset_worker worker;
         if (state.write_done) {
           break;
         }
@@ -431,10 +415,14 @@
       }
       gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
       break;
+    case GRPC_ENDPOINT_ERROR:
+      gpr_log(GPR_ERROR, "endpoint got error");
+      abort();
   }
 
+  gpr_slice_buffer_destroy(&outgoing);
   grpc_endpoint_destroy(ep);
-  free(slices);
+  gpr_free(slices);
 }
 
 void run_tests(void) {
@@ -454,10 +442,6 @@
   write_test(100000, 137);
 
   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
-    write_error_test(40320, i);
-  }
-
-  for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
     write_test(40320, i);
   }
 }
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index a8368fc..c76ddcd 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -135,62 +135,26 @@
      secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
 };
 
-static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices,
-                            grpc_endpoint_cb_status error) {
-  gpr_slice s =
-      gpr_slice_from_copied_string("hello world 12345678900987654321");
-
-  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
-  GPR_ASSERT(nslices == 1);
-
-  GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
-  gpr_slice_unref(slices[0]);
-  gpr_slice_unref(s);
-  *(int *)user_data = 1;
-}
-
 static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
   grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
-  int verified = 0;
+  gpr_slice_buffer incoming;
+  gpr_slice s =
+      gpr_slice_from_copied_string("hello world 12345678900987654321");
   gpr_log(GPR_INFO, "Start test left over");
 
-  grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified);
-  GPR_ASSERT(verified == 1);
+  gpr_slice_buffer_init(&incoming);
+  GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
+             GRPC_ENDPOINT_DONE);
+  GPR_ASSERT(incoming.count == 1);
+  GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
 
   grpc_endpoint_shutdown(f.client_ep);
   grpc_endpoint_shutdown(f.server_ep);
   grpc_endpoint_destroy(f.client_ep);
   grpc_endpoint_destroy(f.server_ep);
-  clean_up();
-}
-
-static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices,
-                          grpc_endpoint_cb_status error) {
-  grpc_endpoint_test_fixture *f = user_data;
-  gpr_slice s =
-      gpr_slice_from_copied_string("hello world 12345678900987654321");
-
-  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
-  GPR_ASSERT(nslices == 1);
-
-  grpc_endpoint_shutdown(f->client_ep);
-  grpc_endpoint_destroy(f->client_ep);
-
-  GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
-  gpr_slice_unref(slices[0]);
   gpr_slice_unref(s);
-}
+  gpr_slice_buffer_destroy(&incoming);
 
-/* test which destroys the ep before finishing reading */
-static void test_destroy_ep_early(grpc_endpoint_test_config config,
-                                  size_t slice_size) {
-  grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
-  gpr_log(GPR_INFO, "Start test destroy early");
-
-  grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
-
-  grpc_endpoint_shutdown(f.server_ep);
-  grpc_endpoint_destroy(f.server_ep);
   clean_up();
 }
 
@@ -203,7 +167,6 @@
   grpc_pollset_init(&g_pollset);
   grpc_endpoint_tests(configs[0], &g_pollset);
   test_leftover(configs[1], 1);
-  test_destroy_ep_early(configs[1], 1);
   grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
   grpc_iomgr_shutdown();