Revert "Refactor Endpoint API"
diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h
index 04db003..ec048e8 100644
--- a/include/grpc/support/slice_buffer.h
+++ b/include/grpc/support/slice_buffer.h
@@ -86,8 +86,6 @@
 void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
 /* move all of the elements of src into dst */
 void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
-/* remove n bytes from the end of a slice buffer */
-void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n);
 
 #ifdef __cplusplus
 }
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1e38479..9012070 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -61,10 +61,6 @@
   grpc_httpcli_context *context;
   grpc_pollset *pollset;
   grpc_iomgr_object iomgr_obj;
-  gpr_slice_buffer incoming;
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure on_read;
-  grpc_iomgr_closure done_write;
 } internal_request;
 
 static grpc_httpcli_get_override g_get_override = NULL;
@@ -103,70 +99,73 @@
   gpr_slice_unref(req->request_text);
   gpr_free(req->host);
   grpc_iomgr_unregister_object(&req->iomgr_obj);
-  gpr_slice_buffer_destroy(&req->incoming);
-  gpr_slice_buffer_destroy(&req->outgoing);
   gpr_free(req);
 }
 
-static void on_read(void *user_data, int success);
-
-static void do_read(internal_request *req) {
-  switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
-    case GRPC_ENDPOINT_DONE:
-      on_read(req, 1);
-      break;
-    case GRPC_ENDPOINT_PENDING:
-      break;
-    case GRPC_ENDPOINT_ERROR:
-      on_read(req, 0);
-      break;
-  }
-}
-
-static void on_read(void *user_data, int success) {
+static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
+                    grpc_endpoint_cb_status status) {
   internal_request *req = user_data;
   size_t i;
 
-  for (i = 0; i < req->incoming.count; i++) {
-    if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
+  for (i = 0; i < nslices; i++) {
+    if (GPR_SLICE_LENGTH(slices[i])) {
       req->have_read_byte = 1;
-      if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
+      if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) {
         finish(req, 0);
-        return;
+        goto done;
       }
     }
   }
 
-  if (success) {
-    do_read(req);
-  } else if (!req->have_read_byte) {
-    next_address(req);
-  } else {
-    finish(req, grpc_httpcli_parser_eof(&req->parser));
+  switch (status) {
+    case GRPC_ENDPOINT_CB_OK:
+      grpc_endpoint_notify_on_read(req->ep, on_read, req);
+      break;
+    case GRPC_ENDPOINT_CB_EOF:
+    case GRPC_ENDPOINT_CB_ERROR:
+    case GRPC_ENDPOINT_CB_SHUTDOWN:
+      if (!req->have_read_byte) {
+        next_address(req);
+      } else {
+        finish(req, grpc_httpcli_parser_eof(&req->parser));
+      }
+      break;
+  }
+
+done:
+  for (i = 0; i < nslices; i++) {
+    gpr_slice_unref(slices[i]);
   }
 }
 
-static void on_written(internal_request *req) { do_read(req); }
+static void on_written(internal_request *req) {
+  grpc_endpoint_notify_on_read(req->ep, on_read, req);
+}
 
-static void done_write(void *arg, int success) {
+static void done_write(void *arg, grpc_endpoint_cb_status status) {
   internal_request *req = arg;
-  if (success) {
-    on_written(req);
-  } else {
-    next_address(req);
+  switch (status) {
+    case GRPC_ENDPOINT_CB_OK:
+      on_written(req);
+      break;
+    case GRPC_ENDPOINT_CB_EOF:
+    case GRPC_ENDPOINT_CB_SHUTDOWN:
+    case GRPC_ENDPOINT_CB_ERROR:
+      next_address(req);
+      break;
   }
 }
 
 static void start_write(internal_request *req) {
   gpr_slice_ref(req->request_text);
-  gpr_slice_buffer_add(&req->outgoing, req->request_text);
-  switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
-    case GRPC_ENDPOINT_DONE:
+  switch (
+      grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
+    case GRPC_ENDPOINT_WRITE_DONE:
       on_written(req);
       break;
-    case GRPC_ENDPOINT_PENDING:
+    case GRPC_ENDPOINT_WRITE_PENDING:
       break;
-    case GRPC_ENDPOINT_ERROR:
+    case GRPC_ENDPOINT_WRITE_ERROR:
       finish(req, 0);
       break;
   }
@@ -238,10 +237,6 @@
       request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
   req->context = context;
   req->pollset = pollset;
-  grpc_iomgr_closure_init(&req->on_read, on_read, req);
-  grpc_iomgr_closure_init(&req->done_write, done_write, req);
-  gpr_slice_buffer_init(&req->incoming);
-  gpr_slice_buffer_init(&req->outgoing);
   gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
   gpr_free(name);
@@ -275,11 +270,7 @@
       request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
   req->context = context;
   req->pollset = pollset;
-  grpc_iomgr_closure_init(&req->on_read, on_read, req);
-  grpc_iomgr_closure_init(&req->done_write, done_write, req);
-  gpr_slice_buffer_init(&req->incoming);
-  gpr_slice_buffer_init(&req->outgoing);
-  gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
+  gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
   gpr_free(name);
   req->host = gpr_strdup(request->host);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index a7878e3..8ee14bc 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,16 +33,17 @@
 
 #include "src/core/iomgr/endpoint.h"
 
-grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
-                                           gpr_slice_buffer *slices,
-                                           grpc_iomgr_closure *cb) {
-  return ep->vtable->read(ep, slices, cb);
+void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+                                  void *user_data) {
+  ep->vtable->notify_on_read(ep, cb, user_data);
 }
 
-grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
-                                            gpr_slice_buffer *slices,
-                                            grpc_iomgr_closure *cb) {
-  return ep->vtable->write(ep, slices, cb);
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+                                               gpr_slice *slices,
+                                               size_t nslices,
+                                               grpc_endpoint_write_cb cb,
+                                               void *user_data) {
+  return ep->vtable->write(ep, slices, nslices, cb, user_data);
 }
 
 void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index d14d52d..ea92a50 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -37,7 +37,6 @@
 #include "src/core/iomgr/pollset.h"
 #include "src/core/iomgr/pollset_set.h"
 #include <grpc/support/slice.h>
-#include <grpc/support/slice_buffer.h>
 #include <grpc/support/time.h>
 
 /* An endpoint caps a streaming channel between two communicating processes.
@@ -46,17 +45,31 @@
 typedef struct grpc_endpoint grpc_endpoint;
 typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
 
-typedef enum grpc_endpoint_op_status {
-  GRPC_ENDPOINT_DONE,    /* completed immediately, cb won't be called */
-  GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
-  GRPC_ENDPOINT_ERROR    /* write errored out, cb won't be called */
-} grpc_endpoint_op_status;
+typedef enum grpc_endpoint_cb_status {
+  GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
+  GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
+  GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
+  GRPC_ENDPOINT_CB_ERROR     /* Call interrupted by socket error */
+} grpc_endpoint_cb_status;
+
+typedef enum grpc_endpoint_write_status {
+  GRPC_ENDPOINT_WRITE_DONE,    /* completed immediately, cb won't be called */
+  GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
+  GRPC_ENDPOINT_WRITE_ERROR    /* write errored out, cb won't be called */
+} grpc_endpoint_write_status;
+
+typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
+                                      size_t nslices,
+                                      grpc_endpoint_cb_status error);
+typedef void (*grpc_endpoint_write_cb)(void *user_data,
+                                       grpc_endpoint_cb_status error);
 
 struct grpc_endpoint_vtable {
-  grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
-                                  grpc_iomgr_closure *cb);
-  grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
-                                   grpc_iomgr_closure *cb);
+  void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+                         void *user_data);
+  grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
+                                      size_t nslices, grpc_endpoint_write_cb cb,
+                                      void *user_data);
   void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
   void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
   void (*shutdown)(grpc_endpoint *ep);
@@ -64,32 +77,26 @@
   char *(*get_peer)(grpc_endpoint *ep);
 };
 
-/* When data is available on the connection, calls the callback with slices.
-   Callback success indicates that the endpoint can accept more reads, failure
-   indicates the endpoint is closed.
-   Valid slices may be placed into \a slices even on callback success == 0. */
-grpc_endpoint_op_status grpc_endpoint_read(
-    grpc_endpoint *ep, gpr_slice_buffer *slices,
-    grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+/* When data is available on the connection, calls the callback with slices. */
+void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+                                  void *user_data);
 
 char *grpc_endpoint_get_peer(grpc_endpoint *ep);
 
 /* Write slices out to the socket.
 
    If the connection is ready for more data after the end of the call, it
-   returns GRPC_ENDPOINT_DONE.
-   Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the
-   connection is ready for more data.
-   \a slices may be mutated at will by the endpoint until cb is called.
-   No guarantee is made to the content of slices after a write EXCEPT that
-   it is a valid slice buffer.
-   */
-grpc_endpoint_op_status grpc_endpoint_write(
-    grpc_endpoint *ep, gpr_slice_buffer *slices,
-    grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+   returns GRPC_ENDPOINT_WRITE_DONE.
+   Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
+   connection is ready for more data. */
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+                                               gpr_slice *slices,
+                                               size_t nslices,
+                                               grpc_endpoint_write_cb cb,
+                                               void *user_data);
 
 /* Causes any pending read/write callbacks to run immediately with
-   success==0 */
+   GRPC_ENDPOINT_CB_SHUTDOWN status */
 void grpc_endpoint_shutdown(grpc_endpoint *ep);
 void grpc_endpoint_destroy(grpc_endpoint *ep);
 
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 0db7cd9..360e6eb 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -61,8 +61,209 @@
 #define SENDMSG_FLAGS 0
 #endif
 
+/* Holds a slice array and associated state. */
+typedef struct grpc_tcp_slice_state {
+  gpr_slice *slices;       /* Array of slices */
+  size_t nslices;          /* Size of slices array. */
+  ssize_t first_slice;     /* First valid slice in array */
+  ssize_t last_slice;      /* Last valid slice in array */
+  gpr_slice working_slice; /* pointer to original final slice */
+  int working_slice_valid; /* True if there is a working slice */
+  int memory_owned;        /* True if slices array is owned */
+} grpc_tcp_slice_state;
+
 int grpc_tcp_trace = 0;
 
+static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
+                             size_t nslices, size_t valid_slices) {
+  state->slices = slices;
+  state->nslices = nslices;
+  if (valid_slices == 0) {
+    state->first_slice = -1;
+  } else {
+    state->first_slice = 0;
+  }
+  state->last_slice = valid_slices - 1;
+  state->working_slice_valid = 0;
+  state->memory_owned = 0;
+}
+
+/* Returns true if there is still available data */
+static int slice_state_has_available(grpc_tcp_slice_state *state) {
+  return state->first_slice != -1 && state->last_slice >= state->first_slice;
+}
+
+static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
+  if (state->first_slice == -1) {
+    return 0;
+  } else {
+    return state->last_slice - state->first_slice + 1;
+  }
+}
+
+static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
+  /* TODO(klempner): use realloc instead when first_slice is 0 */
+  /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
+  gpr_slice *slices = state->slices;
+  size_t original_size = slice_state_slices_allocated(state);
+  size_t i;
+  gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
+
+  for (i = 0; i < original_size; ++i) {
+    new_slices[i] = slices[i + state->first_slice];
+  }
+
+  state->slices = new_slices;
+  state->last_slice = original_size - 1;
+  if (original_size > 0) {
+    state->first_slice = 0;
+  } else {
+    state->first_slice = -1;
+  }
+  state->nslices = new_size;
+
+  if (state->memory_owned) {
+    gpr_free(slices);
+  }
+  state->memory_owned = 1;
+}
+
+static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
+                                      size_t prefix_bytes) {
+  gpr_slice *current_slice = &state->slices[state->first_slice];
+  size_t current_slice_size;
+
+  while (slice_state_has_available(state)) {
+    current_slice_size = GPR_SLICE_LENGTH(*current_slice);
+    if (current_slice_size > prefix_bytes) {
+      /* TODO(klempner): Get rid of the extra refcount created here by adding a
+         native "trim the first N bytes" operation to splice */
+      /* TODO(klempner): This really shouldn't be modifying the current slice
+         unless we own the slices array. */
+      gpr_slice tail;
+      tail = gpr_slice_split_tail(current_slice, prefix_bytes);
+      gpr_slice_unref(*current_slice);
+      *current_slice = tail;
+      return;
+    } else {
+      gpr_slice_unref(*current_slice);
+      ++state->first_slice;
+      ++current_slice;
+      prefix_bytes -= current_slice_size;
+    }
+  }
+}
+
+static void slice_state_destroy(grpc_tcp_slice_state *state) {
+  while (slice_state_has_available(state)) {
+    gpr_slice_unref(state->slices[state->first_slice]);
+    ++state->first_slice;
+  }
+
+  if (state->memory_owned) {
+    gpr_free(state->slices);
+    state->memory_owned = 0;
+  }
+}
+
+void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
+                                    gpr_slice **slices, size_t *nslices) {
+  *slices = state->slices + state->first_slice;
+  *nslices = state->last_slice - state->first_slice + 1;
+
+  state->first_slice = -1;
+  state->last_slice = -1;
+}
+
+/* Fills iov with the first min(iov_size, available) slices, returns number
+   filled */
+static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
+                                   struct iovec *iov, size_t iov_size) {
+  size_t nslices = state->last_slice - state->first_slice + 1;
+  gpr_slice *slices = state->slices + state->first_slice;
+  size_t i;
+  if (nslices < iov_size) {
+    iov_size = nslices;
+  }
+
+  for (i = 0; i < iov_size; ++i) {
+    iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
+    iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
+  }
+  return iov_size;
+}
+
+/* Makes n blocks available at the end of state, writes them into iov, and
+   returns the number of bytes allocated */
+static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
+                                                   struct iovec *iov, size_t n,
+                                                   size_t slice_size) {
+  size_t target_size;
+  size_t i;
+  size_t allocated_bytes;
+  ssize_t allocated_slices = slice_state_slices_allocated(state);
+
+  if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
+    /* Need to grow the slice array */
+    target_size = state->nslices;
+    do {
+      target_size = target_size * 2;
+    } while (target_size < allocated_slices + n - state->working_slice_valid);
+    /* TODO(klempner): If this ever needs to support both prefix removal and
+       append, we should be smarter about the growth logic here */
+    slice_state_realloc(state, target_size);
+  }
+
+  i = 0;
+  allocated_bytes = 0;
+
+  if (state->working_slice_valid) {
+    iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
+    iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
+                     GPR_SLICE_LENGTH(state->slices[state->last_slice]);
+    allocated_bytes += iov[0].iov_len;
+    ++i;
+    state->slices[state->last_slice] = state->working_slice;
+    state->working_slice_valid = 0;
+  }
+
+  for (; i < n; ++i) {
+    ++state->last_slice;
+    state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
+    iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
+    iov[i].iov_len = slice_size;
+    allocated_bytes += slice_size;
+  }
+  if (state->first_slice == -1) {
+    state->first_slice = 0;
+  }
+  return allocated_bytes;
+}
+
+/* Remove the last n bytes from state */
+/* TODO(klempner): Consider having this defer actual deletion until later */
+static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
+  while (bytes > 0 && slice_state_has_available(state)) {
+    if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
+      state->working_slice = state->slices[state->last_slice];
+      state->working_slice_valid = 1;
+      /* TODO(klempner): Combine these into a single operation that doesn't need
+         to refcount */
+      gpr_slice_unref(gpr_slice_split_tail(
+          &state->slices[state->last_slice],
+          GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
+      bytes = 0;
+    } else {
+      bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
+      gpr_slice_unref(state->slices[state->last_slice]);
+      --state->last_slice;
+      if (state->last_slice == -1) {
+        state->first_slice = -1;
+      }
+    }
+  }
+}
+
 typedef struct {
   grpc_endpoint base;
   grpc_fd *em_fd;
@@ -72,111 +273,80 @@
   size_t slice_size;
   gpr_refcount refcount;
 
-  gpr_slice_buffer *incoming_buffer;
-  gpr_slice_buffer *outgoing_buffer;
-  /** slice within outgoing_buffer to write next */
-  size_t outgoing_slice_idx;
-  /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
-  size_t outgoing_byte_idx;
+  grpc_endpoint_read_cb read_cb;
+  void *read_user_data;
+  grpc_endpoint_write_cb write_cb;
+  void *write_user_data;
 
-  grpc_iomgr_closure *read_cb;
-  grpc_iomgr_closure *write_cb;
+  grpc_tcp_slice_state write_state;
 
   grpc_iomgr_closure read_closure;
   grpc_iomgr_closure write_closure;
 
+  grpc_iomgr_closure handle_read_closure;
+
   char *peer_string;
 } grpc_tcp;
 
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
 
-static void tcp_shutdown(grpc_endpoint *ep) {
+static void grpc_tcp_shutdown(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_fd_shutdown(tcp->em_fd);
 }
 
-static void tcp_free(grpc_tcp *tcp) {
-  grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
-  gpr_free(tcp->peer_string);
-  gpr_free(tcp);
-}
-
-/*#define GRPC_TCP_REFCOUNT_DEBUG*/
-#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
-                      int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
-          reason, tcp->refcount.count, tcp->refcount.count - 1);
-  if (gpr_unref(&tcp->refcount)) {
-    tcp_free(tcp);
+static void grpc_tcp_unref(grpc_tcp *tcp) {
+  int refcount_zero = gpr_unref(&tcp->refcount);
+  if (refcount_zero) {
+    grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+    gpr_free(tcp->peer_string);
+    gpr_free(tcp);
   }
 }
 
-static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
-                    int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
-          reason, tcp->refcount.count, tcp->refcount.count + 1);
-  gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
-  if (gpr_unref(&tcp->refcount)) {
-    tcp_free(tcp);
-  }
-}
-
-static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
-static void tcp_destroy(grpc_endpoint *ep) {
+static void grpc_tcp_destroy(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
-  TCP_UNREF(tcp, "destroy");
+  grpc_tcp_unref(tcp);
 }
 
-static void call_read_cb(grpc_tcp *tcp, int success) {
-  grpc_iomgr_closure *cb = tcp->read_cb;
+static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
+                         grpc_endpoint_cb_status status) {
+  grpc_endpoint_read_cb cb = tcp->read_cb;
 
   if (grpc_tcp_trace) {
     size_t i;
-    gpr_log(GPR_DEBUG, "read: success=%d", success);
-    for (i = 0; i < tcp->incoming_buffer->count; i++) {
-      char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
-                                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    gpr_log(GPR_DEBUG, "read: status=%d", status);
+    for (i = 0; i < nslices; i++) {
+      char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
       gpr_free(dump);
     }
   }
 
   tcp->read_cb = NULL;
-  tcp->incoming_buffer = NULL;
-  cb->cb(cb->cb_arg, success);
+  cb(tcp->read_user_data, slices, nslices, status);
 }
 
+#define INLINE_SLICE_BUFFER_SIZE 8
 #define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_tcp *tcp) {
+static void grpc_tcp_continue_read(grpc_tcp *tcp) {
+  gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
   struct msghdr msg;
   struct iovec iov[MAX_READ_IOVEC];
   ssize_t read_bytes;
-  size_t i;
+  ssize_t allocated_bytes;
+  struct grpc_tcp_slice_state read_state;
+  gpr_slice *final_slices;
+  size_t final_nslices;
 
   GPR_ASSERT(!tcp->finished_edge);
-  GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
-  GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
   GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
+  slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
+                   0);
 
-  while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
-    gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
-                                 gpr_slice_malloc(tcp->slice_size));
-  }
-  for (i = 0; i < tcp->incoming_buffer->count; i++) {
-    iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
-    iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
-  }
+  allocated_bytes = slice_state_append_blocks_into_iovec(
+      &read_state, iov, tcp->iov_size, tcp->slice_size);
 
   msg.msg_name = NULL;
   msg.msg_namelen = 0;
@@ -192,105 +362,106 @@
   } while (read_bytes < 0 && errno == EINTR);
   GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
 
+  if (read_bytes < allocated_bytes) {
+    /* TODO(klempner): Consider a second read first, in hopes of getting a
+     * quick EAGAIN and saving a bunch of allocations. */
+    slice_state_remove_last(&read_state, read_bytes < 0
+                                             ? allocated_bytes
+                                             : allocated_bytes - read_bytes);
+  }
+
   if (read_bytes < 0) {
-    /* NB: After calling call_read_cb a parallel call of the read handler may
+    /* NB: After calling the user_cb a parallel call of the read handler may
      * be running. */
     if (errno == EAGAIN) {
       if (tcp->iov_size > 1) {
         tcp->iov_size /= 2;
       }
-      /* We've consumed the edge, request a new one */
-      grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+      if (slice_state_has_available(&read_state)) {
+        /* TODO(klempner): We should probably do the call into the application
+           without all this junk on the stack */
+        /* FIXME(klempner): Refcount properly */
+        slice_state_transfer_ownership(&read_state, &final_slices,
+                                       &final_nslices);
+        tcp->finished_edge = 1;
+        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+        slice_state_destroy(&read_state);
+        grpc_tcp_unref(tcp);
+      } else {
+        /* We've consumed the edge, request a new one */
+        slice_state_destroy(&read_state);
+        grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+      }
     } else {
       /* TODO(klempner): Log interesting errors */
-      gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
-      call_read_cb(tcp, 0);
-      TCP_UNREF(tcp, "read");
+      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
+      slice_state_destroy(&read_state);
+      grpc_tcp_unref(tcp);
     }
   } else if (read_bytes == 0) {
     /* 0 read size ==> end of stream */
-    gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
-    call_read_cb(tcp, 0);
-    TCP_UNREF(tcp, "read");
+    if (slice_state_has_available(&read_state)) {
+      /* there were bytes already read: pass them up to the application */
+      slice_state_transfer_ownership(&read_state, &final_slices,
+                                     &final_nslices);
+      call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+    } else {
+      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+    }
+    slice_state_destroy(&read_state);
+    grpc_tcp_unref(tcp);
   } else {
-    GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
-    if ((size_t)read_bytes < tcp->incoming_buffer->length) {
-      gpr_slice_buffer_trim_end(tcp->incoming_buffer,
-                                tcp->incoming_buffer->length - read_bytes);
-    } else if (tcp->iov_size < MAX_READ_IOVEC) {
+    if (tcp->iov_size < MAX_READ_IOVEC) {
       ++tcp->iov_size;
     }
-    GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
-    call_read_cb(tcp, 1);
-    TCP_UNREF(tcp, "read");
+    GPR_ASSERT(slice_state_has_available(&read_state));
+    slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
+    call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+    slice_state_destroy(&read_state);
+    grpc_tcp_unref(tcp);
   }
 
   GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
 }
 
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
   grpc_tcp *tcp = (grpc_tcp *)arg;
   GPR_ASSERT(!tcp->finished_edge);
 
   if (!success) {
-    gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
-    call_read_cb(tcp, 0);
-    TCP_UNREF(tcp, "read");
+    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+    grpc_tcp_unref(tcp);
   } else {
-    tcp_continue_read(tcp);
+    grpc_tcp_continue_read(tcp);
   }
 }
 
-static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
-                                        gpr_slice_buffer *incoming_buffer,
-                                        grpc_iomgr_closure *cb) {
+static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+                                    void *user_data) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   GPR_ASSERT(tcp->read_cb == NULL);
   tcp->read_cb = cb;
-  tcp->incoming_buffer = incoming_buffer;
-  gpr_slice_buffer_reset_and_unref(incoming_buffer);
-  TCP_REF(tcp, "read");
+  tcp->read_user_data = user_data;
+  gpr_ref(&tcp->refcount);
   if (tcp->finished_edge) {
     tcp->finished_edge = 0;
     grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
   } else {
-    grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+    tcp->handle_read_closure.cb_arg = tcp;
+    grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
   }
-  /* TODO(ctiller): immediate return */
-  return GRPC_ENDPOINT_PENDING;
 }
 
 #define MAX_WRITE_IOVEC 16
-static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
+static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
   struct msghdr msg;
   struct iovec iov[MAX_WRITE_IOVEC];
   int iov_size;
   ssize_t sent_length;
-  ssize_t sending_length;
-  ssize_t trailing;
-  ssize_t unwind_slice_idx;
-  ssize_t unwind_byte_idx;
+  grpc_tcp_slice_state *state = &tcp->write_state;
 
   for (;;) {
-    sending_length = 0;
-    unwind_slice_idx = tcp->outgoing_slice_idx;
-    unwind_byte_idx = tcp->outgoing_byte_idx;
-    for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
-                       iov_size != MAX_WRITE_IOVEC;
-         iov_size++) {
-      iov[iov_size].iov_base =
-          GPR_SLICE_START_PTR(
-              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
-          tcp->outgoing_byte_idx;
-      iov[iov_size].iov_len =
-          GPR_SLICE_LENGTH(
-              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
-          tcp->outgoing_byte_idx;
-      sending_length += iov[iov_size].iov_len;
-      tcp->outgoing_slice_idx++;
-      tcp->outgoing_byte_idx = 0;
-    }
-    GPR_ASSERT(iov_size > 0);
+    iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
 
     msg.msg_name = NULL;
     msg.msg_namelen = 0;
@@ -309,75 +480,70 @@
 
     if (sent_length < 0) {
       if (errno == EAGAIN) {
-        tcp->outgoing_slice_idx = unwind_slice_idx;
-        tcp->outgoing_byte_idx = unwind_byte_idx;
-        return GRPC_ENDPOINT_PENDING;
+        return GRPC_ENDPOINT_WRITE_PENDING;
       } else {
         /* TODO(klempner): Log some of these */
-        return GRPC_ENDPOINT_ERROR;
+        slice_state_destroy(state);
+        return GRPC_ENDPOINT_WRITE_ERROR;
       }
     }
 
-    GPR_ASSERT(tcp->outgoing_byte_idx == 0);
-    trailing = sending_length - sent_length;
-    while (trailing > 0) {
-      ssize_t slice_length;
+    /* TODO(klempner): Probably better to batch this after we finish flushing */
+    slice_state_remove_prefix(state, sent_length);
 
-      tcp->outgoing_slice_idx--;
-      slice_length = GPR_SLICE_LENGTH(
-          tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
-      if (slice_length > trailing) {
-        tcp->outgoing_byte_idx = slice_length - trailing;
-        break;
-      } else {
-        trailing -= slice_length;
-      }
-    }
-
-    if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
-      return GRPC_ENDPOINT_DONE;
+    if (!slice_state_has_available(state)) {
+      return GRPC_ENDPOINT_WRITE_DONE;
     }
   };
 }
 
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
   grpc_tcp *tcp = (grpc_tcp *)arg;
-  grpc_endpoint_op_status status;
-  grpc_iomgr_closure *cb;
+  grpc_endpoint_write_status write_status;
+  grpc_endpoint_cb_status cb_status;
+  grpc_endpoint_write_cb cb;
 
   if (!success) {
+    slice_state_destroy(&tcp->write_state);
     cb = tcp->write_cb;
     tcp->write_cb = NULL;
-    cb->cb(cb->cb_arg, 0);
-    TCP_UNREF(tcp, "write");
+    cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
+    grpc_tcp_unref(tcp);
     return;
   }
 
   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
-  status = tcp_flush(tcp);
-  if (status == GRPC_ENDPOINT_PENDING) {
+  write_status = grpc_tcp_flush(tcp);
+  if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
     grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
   } else {
+    slice_state_destroy(&tcp->write_state);
+    if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
+      cb_status = GRPC_ENDPOINT_CB_OK;
+    } else {
+      cb_status = GRPC_ENDPOINT_CB_ERROR;
+    }
     cb = tcp->write_cb;
     tcp->write_cb = NULL;
-    cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
-    TCP_UNREF(tcp, "write");
+    cb(tcp->write_user_data, cb_status);
+    grpc_tcp_unref(tcp);
   }
   GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
 }
 
-static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
-                                         gpr_slice_buffer *buf,
-                                         grpc_iomgr_closure *cb) {
+static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
+                                                 gpr_slice *slices,
+                                                 size_t nslices,
+                                                 grpc_endpoint_write_cb cb,
+                                                 void *user_data) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
-  grpc_endpoint_op_status status;
+  grpc_endpoint_write_status status;
 
   if (grpc_tcp_trace) {
     size_t i;
 
-    for (i = 0; i < buf->count; i++) {
-      char *data =
-          gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < nslices; i++) {
+      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
       gpr_free(data);
     }
@@ -385,19 +551,15 @@
 
   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
   GPR_ASSERT(tcp->write_cb == NULL);
+  slice_state_init(&tcp->write_state, slices, nslices, nslices);
 
-  if (buf->length == 0) {
-    GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
-    return GRPC_ENDPOINT_DONE;
-  }
-  tcp->outgoing_buffer = buf;
-  tcp->outgoing_slice_idx = 0;
-  tcp->outgoing_byte_idx = 0;
-
-  status = tcp_flush(tcp);
-  if (status == GRPC_ENDPOINT_PENDING) {
-    TCP_REF(tcp, "write");
+  status = grpc_tcp_flush(tcp);
+  if (status == GRPC_ENDPOINT_WRITE_PENDING) {
+    /* TODO(klempner): Consider inlining rather than malloc for small nslices */
+    slice_state_realloc(&tcp->write_state, nslices);
+    gpr_ref(&tcp->refcount);
     tcp->write_cb = cb;
+    tcp->write_user_data = user_data;
     grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
   }
 
@@ -405,25 +567,27 @@
   return status;
 }
 
-static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_pollset_add_fd(pollset, tcp->em_fd);
 }
 
-static void tcp_add_to_pollset_set(grpc_endpoint *ep,
-                                   grpc_pollset_set *pollset_set) {
+static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
+                                        grpc_pollset_set *pollset_set) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
 }
 
-static char *tcp_get_peer(grpc_endpoint *ep) {
+static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   return gpr_strdup(tcp->peer_string);
 }
 
 static const grpc_endpoint_vtable vtable = {
-    tcp_read,     tcp_write,   tcp_add_to_pollset, tcp_add_to_pollset_set,
-    tcp_shutdown, tcp_destroy, tcp_get_peer};
+    grpc_tcp_notify_on_read, grpc_tcp_write,
+    grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
+    grpc_tcp_shutdown,       grpc_tcp_destroy,
+    grpc_tcp_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
                                const char *peer_string) {
@@ -433,18 +597,21 @@
   tcp->fd = em_fd->fd;
   tcp->read_cb = NULL;
   tcp->write_cb = NULL;
-  tcp->incoming_buffer = NULL;
+  tcp->read_user_data = NULL;
+  tcp->write_user_data = NULL;
   tcp->slice_size = slice_size;
   tcp->iov_size = 1;
   tcp->finished_edge = 1;
+  slice_state_init(&tcp->write_state, NULL, 0, 0);
   /* paired with unref in grpc_tcp_destroy */
   gpr_ref_init(&tcp->refcount, 1);
   tcp->em_fd = em_fd;
-  tcp->read_closure.cb = tcp_handle_read;
+  tcp->read_closure.cb = grpc_tcp_handle_read;
   tcp->read_closure.cb_arg = tcp;
-  tcp->write_closure.cb = tcp_handle_write;
+  tcp->write_closure.cb = grpc_tcp_handle_write;
   tcp->write_closure.cb_arg = tcp;
 
+  tcp->handle_read_closure.cb = grpc_tcp_handle_read;
   return &tcp->base;
 }
 
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 58f9160..901793e 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,11 +82,13 @@
   /* Refcounting how many operations are in progress. */
   gpr_refcount refcount;
 
-  grpc_iomgr_closure *read_cb;
-  grpc_iomgr_closure *write_cb;
+  grpc_endpoint_read_cb read_cb;
+  void *read_user_data;
   gpr_slice read_slice;
-  gpr_slice_buffer *write_slices;
-  gpr_slice_buffer *read_slices;
+
+  grpc_endpoint_write_cb write_cb;
+  void *write_user_data;
+  gpr_slice_buffer write_slices;
 
   /* The IO Completion Port runs from another thread. We need some mechanism
      to protect ourselves when requesting a shutdown. */
@@ -96,55 +98,34 @@
   char *peer_string;
 } grpc_tcp;
 
-static void tcp_free(grpc_tcp *tcp) {
-  grpc_winsocket_orphan(tcp->socket);
-  gpr_mu_destroy(&tcp->mu);
-  gpr_free(tcp->peer_string);
-  gpr_free(tcp);
-}
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
 
-/*#define GRPC_TCP_REFCOUNT_DEBUG*/
-#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
-  int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
-    reason, tcp->refcount.count, tcp->refcount.count - 1);
-  if (gpr_unref(&tcp->refcount)) {
-    tcp_free(tcp);
-  }
-}
-
-static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
-  int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
-    reason, tcp->refcount.count, tcp->refcount.count + 1);
-  gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
 static void tcp_unref(grpc_tcp *tcp) {
   if (gpr_unref(&tcp->refcount)) {
-    tcp_free(tcp);
+    gpr_slice_buffer_destroy(&tcp->write_slices);
+    grpc_winsocket_orphan(tcp->socket);
+    gpr_mu_destroy(&tcp->mu);
+    gpr_free(tcp->peer_string);
+    gpr_free(tcp);
   }
 }
 
-static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
 /* Asynchronous callback from the IOCP, or the background thread. */
-static int on_read(grpc_tcp *tcp, int from_iocp) {
+static void on_read(void *tcpp, int from_iocp) {
+  grpc_tcp *tcp = (grpc_tcp *)tcpp;
   grpc_winsocket *socket = tcp->socket;
   gpr_slice sub;
   gpr_slice *slice = NULL;
   size_t nslices = 0;
-  int success;
+  grpc_endpoint_cb_status status;
+  grpc_endpoint_read_cb cb;
   grpc_winsocket_callback_info *info = &socket->read_info;
+  void *opaque = tcp->read_user_data;
   int do_abort = 0;
 
   gpr_mu_lock(&tcp->mu);
+  cb = tcp->read_cb;
+  tcp->read_cb = NULL;
   if (!from_iocp || tcp->shutting_down) {
     /* If we are here with from_iocp set to true, it means we got raced to
     shutting down the endpoint. No actual abort callback will happen
@@ -158,7 +139,9 @@
       tcp->socket->read_info.outstanding = 0;
       gpr_slice_unref(tcp->read_slice);
     }
-    return 0;
+    tcp_unref(tcp);
+    if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+    return;
   }
 
   GPR_ASSERT(tcp->socket->read_info.outstanding);
@@ -169,38 +152,28 @@
       gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
       gpr_free(utf8_message);
     }
-    success = 0;
     gpr_slice_unref(tcp->read_slice);
+    status = GRPC_ENDPOINT_CB_ERROR;
   } else {
     if (info->bytes_transfered != 0) {
       sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
-      gpr_slice_buffer_add(tcp->read_slices, sub);
-      success = 1;
+      status = GRPC_ENDPOINT_CB_OK;
+      slice = &sub;
+      nslices = 1;
     } else {
       gpr_slice_unref(tcp->read_slice);
-      success = 0;
+      status = GRPC_ENDPOINT_CB_EOF;
     }
   }
 
   tcp->socket->read_info.outstanding = 0;
 
-  return success;
+  tcp_unref(tcp);
+  cb(opaque, slice, nslices, status);
 }
 
-static void on_read_cb(void *tcpp, int from_iocp) {
-  grpc_tcp *tcp = tcpp;
-  grpc_iomgr_closure *cb = tcp->read_cb;
-  int success = on_read(tcp, from_iocp);
-  tcp->read_cb = NULL;
-  TCP_UNREF(tcp, "read");
-  if (cb) {
-    cb->cb(cb->cb_arg, success);
-  }
-}
-
-static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
-                                        gpr_slice_buffer *read_slices,
-                                        grpc_iomgr_closure *cb) {
+static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+                               void *arg) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_winsocket *handle = tcp->socket;
   grpc_winsocket_callback_info *info = &handle->read_info;
@@ -211,15 +184,13 @@
 
   GPR_ASSERT(!tcp->socket->read_info.outstanding);
   if (tcp->shutting_down) {
-    return GRPC_ENDPOINT_ERROR;
+    cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+    return;
   }
-
-  TCP_REF(tcp, "read");
-
+  tcp_ref(tcp);
   tcp->socket->read_info.outstanding = 1;
   tcp->read_cb = cb;
-  tcp->read_slices = read_slices;
-  gpr_slice_buffer_reset_and_unref(read_slices);
+  tcp->read_user_data = arg;
 
   tcp->read_slice = gpr_slice_malloc(8192);
 
@@ -233,11 +204,10 @@
 
   /* Did we get data immediately ? Yay. */
   if (info->wsa_error != WSAEWOULDBLOCK) {
-    int ok;
     info->bytes_transfered = bytes_read;
-    ok = on_read(tcp, 1);
-    TCP_UNREF(tcp, "read");
-    return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+    /* This might heavily recurse. */
+    on_read(tcp, 1);
+    return;
   }
 
   /* Otherwise, let's retry, by queuing a read. */
@@ -248,15 +218,13 @@
   if (status != 0) {
     int wsa_error = WSAGetLastError();
     if (wsa_error != WSA_IO_PENDING) {
-      int ok;
       info->wsa_error = wsa_error;
-      ok = on_read(tcp, 1);
-      return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+      on_read(tcp, 1);
+      return;
     }
   }
 
-  grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
-  return GRPC_ENDPOINT_PENDING;
+  grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
 }
 
 /* Asynchronous callback from the IOCP, or the background thread. */
@@ -264,8 +232,9 @@
   grpc_tcp *tcp = (grpc_tcp *)tcpp;
   grpc_winsocket *handle = tcp->socket;
   grpc_winsocket_callback_info *info = &handle->write_info;
-  grpc_iomgr_closure *cb;
-  int success;
+  grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
+  grpc_endpoint_write_cb cb;
+  void *opaque = tcp->write_user_data;
   int do_abort = 0;
 
   gpr_mu_lock(&tcp->mu);
@@ -282,11 +251,10 @@
   if (do_abort) {
     if (from_iocp) {
       tcp->socket->write_info.outstanding = 0;
+      gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
     }
-    TCP_UNREF(tcp, "write");
-    if (cb) {
-      cb->cb(cb->cb_arg, 0);
-    }
+    tcp_unref(tcp);
+    if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
     return;
   }
 
@@ -298,22 +266,23 @@
       gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
       gpr_free(utf8_message);
     }
-    success = 0;
+    status = GRPC_ENDPOINT_CB_ERROR;
   } else {
-    GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
-    success = 1;
+    GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
   }
 
+  gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
   tcp->socket->write_info.outstanding = 0;
 
-  TCP_UNREF(tcp, "write");
-  cb->cb(cb->cb_arg, success);
+  tcp_unref(tcp);
+  cb(opaque, status);
 }
 
 /* Initiates a write. */
-static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
-                                         gpr_slice_buffer *slices,
-                                         grpc_iomgr_closure *cb) {
+static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
+                                            gpr_slice *slices, size_t nslices,
+                                            grpc_endpoint_write_cb cb,
+                                            void *arg) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   grpc_winsocket *socket = tcp->socket;
   grpc_winsocket_callback_info *info = &socket->write_info;
@@ -326,26 +295,28 @@
 
   GPR_ASSERT(!tcp->socket->write_info.outstanding);
   if (tcp->shutting_down) {
-    return GRPC_ENDPOINT_ERROR;
+    return GRPC_ENDPOINT_WRITE_ERROR;
   }
-  TCP_REF(tcp, "write");
+  tcp_ref(tcp);
 
   tcp->socket->write_info.outstanding = 1;
   tcp->write_cb = cb;
-  tcp->write_slices = slices;
+  tcp->write_user_data = arg;
 
-  if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
-    buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
+  gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
+
+  if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
+    buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
     allocated = buffers;
   }
 
-  for (i = 0; i < tcp->write_slices->count; i++) {
-    buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
-    buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
+  for (i = 0; i < tcp->write_slices.count; i++) {
+    buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]);
+    buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
   }
 
   /* First, let's try a synchronous, non-blocking write. */
-  status = WSASend(socket->socket, buffers, tcp->write_slices->count,
+  status = WSASend(socket->socket, buffers, tcp->write_slices.count,
                    &bytes_sent, 0, NULL, NULL);
   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
 
@@ -353,10 +324,10 @@
      connection that has its send queue filled up. But if we don't, then we can
      avoid doing an async write operation at all. */
   if (info->wsa_error != WSAEWOULDBLOCK) {
-    grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
+    grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
     if (status == 0) {
-      ret = GRPC_ENDPOINT_DONE;
-      GPR_ASSERT(bytes_sent == tcp->write_slices->length);
+      ret = GRPC_ENDPOINT_WRITE_DONE;
+      GPR_ASSERT(bytes_sent == tcp->write_slices.length);
     } else {
       if (socket->read_info.wsa_error != WSAECONNRESET) {
         char *utf8_message = gpr_format_message(info->wsa_error);
@@ -365,31 +336,33 @@
       }
     }
     if (allocated) gpr_free(allocated);
+    gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
     tcp->socket->write_info.outstanding = 0;
-    TCP_UNREF(tcp, "write");
+    tcp_unref(tcp);
     return ret;
   }
 
   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
      operation, this time asynchronously. */
   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
-  status = WSASend(socket->socket, buffers, tcp->write_slices->count,
+  status = WSASend(socket->socket, buffers, tcp->write_slices.count,
                    &bytes_sent, 0, &socket->write_info.overlapped, NULL);
   if (allocated) gpr_free(allocated);
 
   if (status != 0) {
     int wsa_error = WSAGetLastError();
     if (wsa_error != WSA_IO_PENDING) {
+      gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
       tcp->socket->write_info.outstanding = 0;
-      TCP_UNREF(tcp, "write");
-      return GRPC_ENDPOINT_ERROR;
+      tcp_unref(tcp);
+      return GRPC_ENDPOINT_WRITE_ERROR;
     }
   }
 
   /* As all is now setup, we can now ask for the IOCP notification. It may
      trigger the callback immediately however, but no matter. */
   grpc_socket_notify_on_write(socket, on_write, tcp);
-  return GRPC_ENDPOINT_PENDING;
+  return GRPC_ENDPOINT_WRITE_PENDING;
 }
 
 static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
@@ -414,17 +387,19 @@
    concurrent access of the data structure in that regard. */
 static void win_shutdown(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
+  int extra_refs = 0;
   gpr_mu_lock(&tcp->mu);
   /* At that point, what may happen is that we're already inside the IOCP
      callback. See the comments in on_read and on_write. */
   tcp->shutting_down = 1;
-  grpc_winsocket_shutdown(tcp->socket);
+  extra_refs = grpc_winsocket_shutdown(tcp->socket);
+  while (extra_refs--) tcp_ref(tcp);
   gpr_mu_unlock(&tcp->mu);
 }
 
 static void win_destroy(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
-  TCP_UNREF(tcp, "destroy");
+  tcp_unref(tcp);
 }
 
 static char *win_get_peer(grpc_endpoint *ep) {
@@ -433,8 +408,8 @@
 }
 
 static grpc_endpoint_vtable vtable = {
-    win_read,     win_write,   win_add_to_pollset, win_add_to_pollset_set,
-    win_shutdown, win_destroy, win_get_peer};
+    win_notify_on_read, win_write,   win_add_to_pollset, win_add_to_pollset_set,
+    win_shutdown,       win_destroy, win_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
@@ -442,6 +417,7 @@
   tcp->base.vtable = &vtable;
   tcp->socket = socket;
   gpr_mu_init(&tcp->mu);
+  gpr_slice_buffer_init(&tcp->write_slices);
   gpr_ref_init(&tcp->refcount, 1);
   tcp->peer_string = gpr_strdup(peer_string);
   return &tcp->base;
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index b696e38..81b3e33 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,15 +49,15 @@
   struct tsi_frame_protector *protector;
   gpr_mu protector_mu;
   /* saved upper level callbacks and user_data. */
-  grpc_iomgr_closure *read_cb;
-  grpc_iomgr_closure *write_cb;
-  grpc_iomgr_closure on_read;
-  gpr_slice_buffer *read_buffer;
-  gpr_slice_buffer source_buffer;
+  grpc_endpoint_read_cb read_cb;
+  void *read_user_data;
+  grpc_endpoint_write_cb write_cb;
+  void *write_user_data;
   /* saved handshaker leftover data to unprotect. */
   gpr_slice_buffer leftover_bytes;
   /* buffers for read and write */
   gpr_slice read_staging_buffer;
+  gpr_slice_buffer input_buffer;
 
   gpr_slice write_staging_buffer;
   gpr_slice_buffer output_buffer;
@@ -67,91 +67,62 @@
 
 int grpc_trace_secure_endpoint = 0;
 
+static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
+
 static void destroy(secure_endpoint *secure_ep) {
   secure_endpoint *ep = secure_ep;
   grpc_endpoint_destroy(ep->wrapped_ep);
   tsi_frame_protector_destroy(ep->protector);
   gpr_slice_buffer_destroy(&ep->leftover_bytes);
   gpr_slice_unref(ep->read_staging_buffer);
+  gpr_slice_buffer_destroy(&ep->input_buffer);
   gpr_slice_unref(ep->write_staging_buffer);
   gpr_slice_buffer_destroy(&ep->output_buffer);
-  gpr_slice_buffer_destroy(&ep->source_buffer);
   gpr_mu_destroy(&ep->protector_mu);
   gpr_free(ep);
 }
 
-/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
-#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
-#define SECURE_ENDPOINT_UNREF(ep, reason) \
-  secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
-#define SECURE_ENDPOINT_REF(ep, reason) \
-  secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
-static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
-                                  const char *file, int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
-          ep, reason, ep->ref.count, ep->ref.count - 1);
-  if (gpr_unref(&ep->ref)) {
-    destroy(ep);
-  }
-}
-
-static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
-                                const char *file, int line) {
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP   ref %p : %s %d -> %d",
-          ep, reason, ep->ref.count, ep->ref.count + 1);
-  gpr_ref(&ep->ref);
-}
-#else
-#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
-#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
 static void secure_endpoint_unref(secure_endpoint *ep) {
   if (gpr_unref(&ep->ref)) {
     destroy(ep);
   }
 }
 
-static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
-#endif
-
 static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
                                       gpr_uint8 **end) {
-  gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
+  gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer);
   ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
   *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
   *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
 }
 
-static void call_read_cb(secure_endpoint *ep, int success) {
+static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices,
+                         grpc_endpoint_cb_status error) {
   if (grpc_trace_secure_endpoint) {
     size_t i;
-    for (i = 0; i < ep->read_buffer->count; i++) {
-      char *data = gpr_dump_slice(ep->read_buffer->slices[i],
-                                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < nslices; i++) {
+      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
       gpr_free(data);
     }
   }
-  ep->read_buffer = NULL;
-  ep->read_cb->cb(ep->read_cb->cb_arg, success);
-  SECURE_ENDPOINT_UNREF(ep, "read");
+  ep->read_cb(ep->read_user_data, slices, nslices, error);
+  secure_endpoint_unref(ep);
 }
 
-static int on_read(void *user_data, int success) {
+static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
+                    grpc_endpoint_cb_status error) {
   unsigned i;
   gpr_uint8 keep_looping = 0;
+  size_t input_buffer_count = 0;
   tsi_result result = TSI_OK;
   secure_endpoint *ep = (secure_endpoint *)user_data;
   gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
   gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
 
-  if (!success) {
-    gpr_slice_buffer_reset_and_unref(ep->read_buffer);
-    return 0;
-  }
-
   /* TODO(yangg) check error, maybe bail out early */
-  for (i = 0; i < ep->source_buffer.count; i++) {
-    gpr_slice encrypted = ep->source_buffer.slices[i];
+  for (i = 0; i < nslices; i++) {
+    gpr_slice encrypted = slices[i];
     gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
     size_t message_size = GPR_SLICE_LENGTH(encrypted);
 
@@ -190,7 +161,7 @@
 
   if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
     gpr_slice_buffer_add(
-        ep->read_buffer,
+        &ep->input_buffer,
         gpr_slice_split_head(
             &ep->read_staging_buffer,
             (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@@ -198,53 +169,38 @@
 
   /* TODO(yangg) experiment with moving this block after read_cb to see if it
      helps latency */
-  gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
+  for (i = 0; i < nslices; i++) {
+    gpr_slice_unref(slices[i]);
+  }
 
   if (result != TSI_OK) {
-    gpr_slice_buffer_reset_and_unref(ep->read_buffer);
-    return 0;
+    gpr_slice_buffer_reset_and_unref(&ep->input_buffer);
+    call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
+    return;
   }
-
-  return 1;
+  /* The upper level will unref the slices. */
+  input_buffer_count = ep->input_buffer.count;
+  ep->input_buffer.count = 0;
+  call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
 }
 
-static void on_read_cb(void *user_data, int success) {
-  call_read_cb(user_data, on_read(user_data, success));
-}
-
-static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
-                                             gpr_slice_buffer *slices,
-                                             grpc_iomgr_closure *cb) {
+static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
+                                    grpc_endpoint_read_cb cb, void *user_data) {
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
-  int immediate_read_success = -1;
   ep->read_cb = cb;
-  ep->read_buffer = slices;
-  gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+  ep->read_user_data = user_data;
+
+  secure_endpoint_ref(ep);
 
   if (ep->leftover_bytes.count) {
-    gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
-    GPR_ASSERT(ep->leftover_bytes.count == 0);
-    return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+    size_t leftover_nslices = ep->leftover_bytes.count;
+    ep->leftover_bytes.count = 0;
+    on_read(ep, ep->leftover_bytes.slices, leftover_nslices,
+            GRPC_ENDPOINT_CB_OK);
+    return;
   }
 
-  SECURE_ENDPOINT_REF(ep, "read");
-
-  switch (
-      grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
-    case GRPC_ENDPOINT_DONE:
-      immediate_read_success = on_read(ep, 1);
-      break;
-    case GRPC_ENDPOINT_PENDING:
-      return GRPC_ENDPOINT_PENDING;
-    case GRPC_ENDPOINT_ERROR:
-      immediate_read_success = on_read(ep, 0);
-      break;
-  }
-
-  GPR_ASSERT(immediate_read_success != -1);
-  SECURE_ENDPOINT_UNREF(ep, "read");
-
-  return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+  grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
 }
 
 static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -255,28 +211,36 @@
   *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
 }
 
-static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
-                                              gpr_slice_buffer *slices,
-                                              grpc_iomgr_closure *cb) {
+static void on_write(void *data, grpc_endpoint_cb_status error) {
+  secure_endpoint *ep = data;
+  ep->write_cb(ep->write_user_data, error);
+  secure_endpoint_unref(ep);
+}
+
+static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
+                                                 gpr_slice *slices,
+                                                 size_t nslices,
+                                                 grpc_endpoint_write_cb cb,
+                                                 void *user_data) {
   unsigned i;
+  size_t output_buffer_count = 0;
   tsi_result result = TSI_OK;
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
   gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
   gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
-
-  gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
+  grpc_endpoint_write_status status;
+  GPR_ASSERT(ep->output_buffer.count == 0);
 
   if (grpc_trace_secure_endpoint) {
-    for (i = 0; i < slices->count; i++) {
-      char *data =
-          gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+    for (i = 0; i < nslices; i++) {
+      char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
       gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
       gpr_free(data);
     }
   }
 
-  for (i = 0; i < slices->count; i++) {
-    gpr_slice plain = slices->slices[i];
+  for (i = 0; i < nslices; i++) {
+    gpr_slice plain = slices[i];
     gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
     size_t message_size = GPR_SLICE_LENGTH(plain);
     while (message_size > 0) {
@@ -326,13 +290,29 @@
     }
   }
 
+  for (i = 0; i < nslices; i++) {
+    gpr_slice_unref(slices[i]);
+  }
+
   if (result != TSI_OK) {
     /* TODO(yangg) do different things according to the error type? */
     gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
-    return GRPC_ENDPOINT_ERROR;
+    return GRPC_ENDPOINT_WRITE_ERROR;
   }
 
-  return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+  /* clear output_buffer and let the lower level handle its slices. */
+  output_buffer_count = ep->output_buffer.count;
+  ep->output_buffer.count = 0;
+  ep->write_cb = cb;
+  ep->write_user_data = user_data;
+  /* Need to keep the endpoint alive across a transport */
+  secure_endpoint_ref(ep);
+  status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
+                               output_buffer_count, on_write, ep);
+  if (status != GRPC_ENDPOINT_WRITE_PENDING) {
+    secure_endpoint_unref(ep);
+  }
+  return status;
 }
 
 static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@@ -340,9 +320,9 @@
   grpc_endpoint_shutdown(ep->wrapped_ep);
 }
 
-static void endpoint_destroy(grpc_endpoint *secure_ep) {
+static void endpoint_unref(grpc_endpoint *secure_ep) {
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
-  SECURE_ENDPOINT_UNREF(ep, "destroy");
+  secure_endpoint_unref(ep);
 }
 
 static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@@ -363,9 +343,9 @@
 }
 
 static const grpc_endpoint_vtable vtable = {
-    endpoint_read,           endpoint_write,
+    endpoint_notify_on_read, endpoint_write,
     endpoint_add_to_pollset, endpoint_add_to_pollset_set,
-    endpoint_shutdown,       endpoint_destroy,
+    endpoint_shutdown,       endpoint_unref,
     endpoint_get_peer};
 
 grpc_endpoint *grpc_secure_endpoint_create(
@@ -383,10 +363,8 @@
   }
   ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
   ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
+  gpr_slice_buffer_init(&ep->input_buffer);
   gpr_slice_buffer_init(&ep->output_buffer);
-  gpr_slice_buffer_init(&ep->source_buffer);
-  ep->read_buffer = NULL;
-  grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
   gpr_mu_init(&ep->protector_mu);
   gpr_ref_init(&ep->ref, 1);
   return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index bf00795..0c3572b 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -50,17 +50,16 @@
   grpc_endpoint *wrapped_endpoint;
   grpc_endpoint *secure_endpoint;
   gpr_slice_buffer left_overs;
-  gpr_slice_buffer incoming;
-  gpr_slice_buffer outgoing;
   grpc_secure_transport_setup_done_cb cb;
   void *user_data;
-  grpc_iomgr_closure on_handshake_data_sent_to_peer;
-  grpc_iomgr_closure on_handshake_data_received_from_peer;
 } grpc_secure_transport_setup;
 
-static void on_handshake_data_received_from_peer(void *setup, int success);
+static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices,
+                                                 size_t nslices,
+                                                 grpc_endpoint_cb_status error);
 
-static void on_handshake_data_sent_to_peer(void *setup, int success);
+static void on_handshake_data_sent_to_peer(void *setup,
+                                           grpc_endpoint_cb_status error);
 
 static void secure_transport_setup_done(grpc_secure_transport_setup *s,
                                         int is_success) {
@@ -79,8 +78,6 @@
   if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
   if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
   gpr_slice_buffer_destroy(&s->left_overs);
-  gpr_slice_buffer_destroy(&s->outgoing);
-  gpr_slice_buffer_destroy(&s->incoming);
   GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
   gpr_free(s);
 }
@@ -105,8 +102,6 @@
   s->secure_endpoint =
       grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
                                   s->left_overs.slices, s->left_overs.count);
-  s->left_overs.count = 0;
-  s->left_overs.length = 0;
   secure_transport_setup_done(s, 1);
   return;
 }
@@ -137,6 +132,7 @@
   size_t offset = 0;
   tsi_result result = TSI_OK;
   gpr_slice to_send;
+  grpc_endpoint_write_status write_status;
 
   do {
     size_t to_send_size = s->handshake_buffer_size - offset;
@@ -159,25 +155,28 @@
 
   to_send =
       gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
-  gpr_slice_buffer_reset_and_unref(&s->outgoing);
-  gpr_slice_buffer_add(&s->outgoing, to_send);
   /* TODO(klempner,jboeuf): This should probably use the client setup
          deadline */
-  switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
-                              &s->on_handshake_data_sent_to_peer)) {
-    case GRPC_ENDPOINT_ERROR:
-      gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
-      secure_transport_setup_done(s, 0);
-      break;
-    case GRPC_ENDPOINT_DONE:
-      on_handshake_data_sent_to_peer(s, 1);
-      break;
-    case GRPC_ENDPOINT_PENDING:
-      break;
+  write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
+                                     on_handshake_data_sent_to_peer, s);
+  if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
+    gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
+    secure_transport_setup_done(s, 0);
+  } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
+    on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK);
   }
 }
 
-static void on_handshake_data_received_from_peer(void *setup, int success) {
+static void cleanup_slices(gpr_slice *slices, size_t num_slices) {
+  size_t i;
+  for (i = 0; i < num_slices; i++) {
+    gpr_slice_unref(slices[i]);
+  }
+}
+
+static void on_handshake_data_received_from_peer(
+    void *setup, gpr_slice *slices, size_t nslices,
+    grpc_endpoint_cb_status error) {
   grpc_secure_transport_setup *s = setup;
   size_t consumed_slice_size = 0;
   tsi_result result = TSI_OK;
@@ -185,37 +184,32 @@
   size_t num_left_overs;
   int has_left_overs_in_current_slice = 0;
 
-  if (!success) {
+  if (error != GRPC_ENDPOINT_CB_OK) {
     gpr_log(GPR_ERROR, "Read failed.");
+    cleanup_slices(slices, nslices);
     secure_transport_setup_done(s, 0);
     return;
   }
 
-  for (i = 0; i < s->incoming.count; i++) {
-    consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
+  for (i = 0; i < nslices; i++) {
+    consumed_slice_size = GPR_SLICE_LENGTH(slices[i]);
     result = tsi_handshaker_process_bytes_from_peer(
-        s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
-        &consumed_slice_size);
+        s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size);
     if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
   }
 
   if (tsi_handshaker_is_in_progress(s->handshaker)) {
     /* We may need more data. */
     if (result == TSI_INCOMPLETE_DATA) {
-      switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
-                                 &s->on_handshake_data_received_from_peer)) {
-        case GRPC_ENDPOINT_DONE:
-          on_handshake_data_received_from_peer(s, 1);
-          break;
-        case GRPC_ENDPOINT_ERROR:
-          on_handshake_data_received_from_peer(s, 0);
-          break;
-        case GRPC_ENDPOINT_PENDING:
-          break;
-      }
+      /* TODO(klempner,jboeuf): This should probably use the client setup
+         deadline */
+      grpc_endpoint_notify_on_read(s->wrapped_endpoint,
+                                   on_handshake_data_received_from_peer, setup);
+      cleanup_slices(slices, nslices);
       return;
     } else {
       send_handshake_bytes_to_peer(s);
+      cleanup_slices(slices, nslices);
       return;
     }
   }
@@ -223,40 +217,42 @@
   if (result != TSI_OK) {
     gpr_log(GPR_ERROR, "Handshake failed with error %s",
             tsi_result_to_string(result));
+    cleanup_slices(slices, nslices);
     secure_transport_setup_done(s, 0);
     return;
   }
 
   /* Handshake is done and successful this point. */
   has_left_overs_in_current_slice =
-      (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
-  num_left_overs =
-      (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
+      (consumed_slice_size < GPR_SLICE_LENGTH(slices[i]));
+  num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1;
   if (num_left_overs == 0) {
+    cleanup_slices(slices, nslices);
     check_peer(s);
     return;
   }
+  cleanup_slices(slices, nslices - num_left_overs);
+
   /* Put the leftovers in our buffer (ownership transfered). */
   if (has_left_overs_in_current_slice) {
-    gpr_slice_buffer_add(
-        &s->left_overs,
-        gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
-    gpr_slice_unref(
-        s->incoming.slices[i]); /* split_tail above increments refcount. */
+    gpr_slice_buffer_add(&s->left_overs,
+                         gpr_slice_split_tail(&slices[i], consumed_slice_size));
+    gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */
   }
   gpr_slice_buffer_addn(
-      &s->left_overs, &s->incoming.slices[i + 1],
+      &s->left_overs, &slices[i + 1],
       num_left_overs - (size_t)has_left_overs_in_current_slice);
   check_peer(s);
 }
 
 /* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup, int success) {
+static void on_handshake_data_sent_to_peer(void *setup,
+                                           grpc_endpoint_cb_status error) {
   grpc_secure_transport_setup *s = setup;
 
   /* Make sure that write is OK. */
-  if (!success) {
-    gpr_log(GPR_ERROR, "Write failed.");
+  if (error != GRPC_ENDPOINT_CB_OK) {
+    gpr_log(GPR_ERROR, "Write failed with error %d.", error);
     if (setup != NULL) secure_transport_setup_done(s, 0);
     return;
   }
@@ -265,17 +261,8 @@
   if (tsi_handshaker_is_in_progress(s->handshaker)) {
     /* TODO(klempner,jboeuf): This should probably use the client setup
        deadline */
-    switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
-                               &s->on_handshake_data_received_from_peer)) {
-      case GRPC_ENDPOINT_ERROR:
-        on_handshake_data_received_from_peer(s, 0);
-        break;
-      case GRPC_ENDPOINT_PENDING:
-        break;
-      case GRPC_ENDPOINT_DONE:
-        on_handshake_data_received_from_peer(s, 1);
-        break;
-    }
+    grpc_endpoint_notify_on_read(s->wrapped_endpoint,
+                                 on_handshake_data_received_from_peer, setup);
   } else {
     check_peer(s);
   }
@@ -301,12 +288,6 @@
   s->wrapped_endpoint = nonsecure_endpoint;
   s->user_data = user_data;
   s->cb = cb;
-  grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
-                          on_handshake_data_sent_to_peer, s);
-  grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
-                          on_handshake_data_received_from_peer, s);
   gpr_slice_buffer_init(&s->left_overs);
-  gpr_slice_buffer_init(&s->outgoing);
-  gpr_slice_buffer_init(&s->incoming);
   send_handshake_bytes_to_peer(s);
 }
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 6482ef9..987d5cb 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -207,25 +207,3 @@
   src->count = 0;
   src->length = 0;
 }
-
-void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
-  GPR_ASSERT(n <= sb->length);
-  sb->length -= n;
-  for (;;) {
-    size_t idx = sb->count - 1;
-    gpr_slice slice = sb->slices[idx];
-    size_t slice_len = GPR_SLICE_LENGTH(slice);
-    if (slice_len > n) {
-      sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
-      return;
-    } else if (slice_len == n) {
-      gpr_slice_unref(slice);
-      sb->count = idx;
-      return;
-    } else {
-      gpr_slice_unref(slice);
-      n -= slice_len;
-      sb->count = idx;
-    }
-  }
-}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a1b773b..42cf0ec 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -214,8 +214,6 @@
   grpc_chttp2_hpack_compressor hpack_compressor;
   /** is this a client? */
   gpr_uint8 is_client;
-  /** callback for when writing is done */
-  grpc_iomgr_closure done_cb;
 } grpc_chttp2_transport_writing;
 
 struct grpc_chttp2_transport_parsing {
@@ -331,11 +329,8 @@
 
   /** closure to execute writing */
   grpc_iomgr_closure writing_action;
-  /** closure to finish reading from the endpoint */
-  grpc_iomgr_closure recv_data;
-
-  /** incoming read bytes */
-  gpr_slice_buffer read_buffer;
+  /** closure to start reading from the endpoint */
+  grpc_iomgr_closure reading_action;
 
   /** address to place a newly accepted stream - set and unset by
       grpc_chttp2_parsing_accept_stream; used by init_stream to
@@ -468,7 +463,8 @@
                                        grpc_chttp2_transport_writing *writing);
 void grpc_chttp2_perform_writes(
     grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(void *transport_writing, int success);
+void grpc_chttp2_terminate_writing(
+    grpc_chttp2_transport_writing *transport_writing, int success);
 void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
                                  grpc_chttp2_transport_writing *writing);
 
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 2c8c48f..123061b 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -37,6 +37,7 @@
 #include <grpc/support/log.h>
 
 static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
 
 int grpc_chttp2_unlocking_check_writes(
     grpc_chttp2_transport_global *transport_global,
@@ -164,15 +165,16 @@
   GPR_ASSERT(transport_writing->outbuf.count > 0);
   GPR_ASSERT(endpoint);
 
-  switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
-                              &transport_writing->done_cb)) {
-    case GRPC_ENDPOINT_DONE:
+  switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
+                              transport_writing->outbuf.count, finish_write_cb,
+                              transport_writing)) {
+    case GRPC_ENDPOINT_WRITE_DONE:
       grpc_chttp2_terminate_writing(transport_writing, 1);
       break;
-    case GRPC_ENDPOINT_ERROR:
+    case GRPC_ENDPOINT_WRITE_ERROR:
       grpc_chttp2_terminate_writing(transport_writing, 0);
       break;
-    case GRPC_ENDPOINT_PENDING:
+    case GRPC_ENDPOINT_WRITE_PENDING:
       break;
   }
 }
@@ -207,6 +209,12 @@
   }
 }
 
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
+  grpc_chttp2_transport_writing *transport_writing = tw;
+  grpc_chttp2_terminate_writing(transport_writing,
+                                write_status == GRPC_ENDPOINT_CB_OK);
+}
+
 void grpc_chttp2_cleanup_writing(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_transport_writing *transport_writing) {
@@ -235,5 +243,6 @@
     grpc_chttp2_list_add_read_write_state_changed(transport_global,
                                                   stream_global);
   }
-  gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
+  transport_writing->outbuf.count = 0;
+  transport_writing->outbuf.length = 0;
 }
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8caa10c..1bbd210 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -84,13 +84,15 @@
 
 /* forward declarations of various callbacks that we'll build closures around */
 static void writing_action(void *t, int iomgr_success_ignored);
+static void reading_action(void *t, int iomgr_success_ignored);
 
 /** Set a transport level setting, and push it to our peer */
 static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
                          gpr_uint32 value);
 
 /** Endpoint callback to process incoming data */
-static void recv_data(void *tp, int success);
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+                      grpc_endpoint_cb_status error);
 
 /** Start disconnection chain */
 static void drop_connection(grpc_chttp2_transport *t);
@@ -141,7 +143,6 @@
   grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
 
   gpr_slice_buffer_destroy(&t->parsing.qbuf);
-  gpr_slice_buffer_destroy(&t->read_buffer);
   grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
   grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
 
@@ -248,16 +249,12 @@
   gpr_slice_buffer_init(&t->writing.outbuf);
   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
   grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
+  grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
 
   gpr_slice_buffer_init(&t->parsing.qbuf);
   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
   grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
 
-  grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
-                          &t->writing);
-  grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
-  gpr_slice_buffer_init(&t->read_buffer);
-
   if (is_client) {
     gpr_slice_buffer_add(
         &t->global.qbuf,
@@ -505,8 +502,8 @@
   }
 }
 
-void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
-  grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
+void grpc_chttp2_terminate_writing(
+    grpc_chttp2_transport_writing *transport_writing, int success) {
   grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
 
   lock(t);
@@ -1063,76 +1060,74 @@
 }
 
 /* tcp read callback */
-static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+                      grpc_endpoint_cb_status error) {
+  grpc_chttp2_transport *t = tp;
   size_t i;
-  int keep_reading = 0;
+  int unref = 0;
 
-  lock(t);
-  i = 0;
-  GPR_ASSERT(!t->parsing_active);
-  if (!t->closed) {
-    t->parsing_active = 1;
-    /* merge stream lists */
-    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                     &t->parsing_stream_map);
-    grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
-    gpr_mu_unlock(&t->mu);
-    for (; i < t->read_buffer.count &&
-           grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
-         i++)
-      ;
-    gpr_mu_lock(&t->mu);
-    if (i != t->read_buffer.count) {
+  switch (error) {
+    case GRPC_ENDPOINT_CB_SHUTDOWN:
+    case GRPC_ENDPOINT_CB_EOF:
+    case GRPC_ENDPOINT_CB_ERROR:
+      lock(t);
       drop_connection(t);
-    }
-    /* merge stream lists */
-    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                     &t->parsing_stream_map);
-    t->global.concurrent_stream_count =
-        grpc_chttp2_stream_map_size(&t->parsing_stream_map);
-    if (t->parsing.initial_window_update != 0) {
-      grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
-                                      update_global_window, t);
-      t->parsing.initial_window_update = 0;
-    }
-    /* handle higher level things */
-    grpc_chttp2_publish_reads(&t->global, &t->parsing);
-    t->parsing_active = 0;
+      read_error_locked(t);
+      unlock(t);
+      unref = 1;
+      for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
+      break;
+    case GRPC_ENDPOINT_CB_OK:
+      lock(t);
+      i = 0;
+      GPR_ASSERT(!t->parsing_active);
+      if (!t->closed) {
+        t->parsing_active = 1;
+        /* merge stream lists */
+        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                         &t->parsing_stream_map);
+        grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+        gpr_mu_unlock(&t->mu);
+        for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
+             i++) {
+          gpr_slice_unref(slices[i]);
+        }
+        gpr_mu_lock(&t->mu);
+        if (i != nslices) {
+          drop_connection(t);
+        }
+        /* merge stream lists */
+        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                         &t->parsing_stream_map);
+        t->global.concurrent_stream_count =
+            grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+        if (t->parsing.initial_window_update != 0) {
+          grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+                                          update_global_window, t);
+          t->parsing.initial_window_update = 0;
+        }
+        /* handle higher level things */
+        grpc_chttp2_publish_reads(&t->global, &t->parsing);
+        t->parsing_active = 0;
+      }
+      if (i == nslices) {
+        grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
+      } else {
+        read_error_locked(t);
+        unref = 1;
+      }
+      unlock(t);
+      for (; i < nslices; i++) gpr_slice_unref(slices[i]);
+      break;
   }
-  if (!*success || i != t->read_buffer.count) {
-    drop_connection(t);
-    read_error_locked(t);
-  } else {
-    keep_reading = 1;
-  }
-  gpr_slice_buffer_reset_and_unref(&t->read_buffer);
-  unlock(t);
-
-  if (keep_reading) {
-    switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
-      case GRPC_ENDPOINT_DONE:
-        *success = 1;
-        return 1;
-      case GRPC_ENDPOINT_ERROR:
-        *success = 0;
-        return 1;
-      case GRPC_ENDPOINT_PENDING:
-        return 0;
-    }
-  } else {
+  if (unref) {
     UNREF_TRANSPORT(t, "recv_data");
-    return 0;
   }
-
-  gpr_log(GPR_ERROR, "should never reach here");
-  abort();
 }
 
-static void recv_data(void *tp, int success) {
-  grpc_chttp2_transport *t = tp;
-
-  while (recv_data_loop(t, &success))
-    ;
+static void reading_action(void *pt, int iomgr_success_ignored) {
+  grpc_chttp2_transport *t = pt;
+  grpc_endpoint_notify_on_read(t->ep, recv_data, t);
 }
 
 /*
@@ -1245,6 +1240,5 @@
                                          gpr_slice *slices, size_t nslices) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
   REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
-  gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
-  recv_data(t, 1);
+  recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
 }
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 1d98879..24bf5d3 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -59,7 +59,7 @@
   gpr_event_set(&a->done_thd, (void *)1);
 }
 
-static void done_write(void *arg, int success) {
+static void done_write(void *arg, grpc_endpoint_cb_status status) {
   thd_args *a = arg;
   gpr_event_set(&a->done_write, (void *)1);
 }
@@ -85,8 +85,6 @@
   grpc_mdctx *mdctx = grpc_mdctx_create();
   gpr_slice slice =
       gpr_slice_from_copied_buffer(client_payload, client_payload_length);
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure done_write_closure;
 
   hex = gpr_dump(client_payload, client_payload_length,
                  GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -124,18 +122,14 @@
   /* Start validator */
   gpr_thd_new(&id, thd_func, &a, NULL);
 
-  gpr_slice_buffer_init(&outgoing);
-  gpr_slice_buffer_add(&outgoing, slice);
-  grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
-
   /* Write data */
-  switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
-    case GRPC_ENDPOINT_DONE:
+  switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) {
+    case GRPC_ENDPOINT_WRITE_DONE:
       done_write(&a, 1);
       break;
-    case GRPC_ENDPOINT_PENDING:
+    case GRPC_ENDPOINT_WRITE_PENDING:
       break;
-    case GRPC_ENDPOINT_ERROR:
+    case GRPC_ENDPOINT_WRITE_ERROR:
       done_write(&a, 0);
       break;
   }
@@ -161,7 +155,6 @@
                  .type == GRPC_OP_COMPLETE);
   grpc_server_destroy(a.server);
   grpc_completion_queue_destroy(a.cq);
-  gpr_slice_buffer_destroy(&outgoing);
 
   grpc_shutdown();
 }
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index ef67374..6ef8e9c 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -59,7 +59,8 @@
 
 static grpc_pollset *g_pollset;
 
-size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
+size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+                              int *current_data) {
   size_t num_bytes = 0;
   size_t i;
   size_t j;
@@ -71,6 +72,7 @@
       *current_data = (*current_data + 1) % 256;
     }
     num_bytes += GPR_SLICE_LENGTH(slices[i]);
+    gpr_slice_unref(slices[i]);
   }
   return num_bytes;
 }
@@ -119,76 +121,86 @@
   int current_write_data;
   int read_done;
   int write_done;
-  gpr_slice_buffer incoming;
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure done_read;
-  grpc_iomgr_closure done_write;
 };
 
-static void read_and_write_test_read_handler(void *data, int success) {
+static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
+                                             size_t nslices,
+                                             grpc_endpoint_cb_status error) {
   struct read_and_write_test_state *state = data;
-
-  state->bytes_read += count_slices(
-      state->incoming.slices, state->incoming.count, &state->current_read_data);
-  if (state->bytes_read == state->target_bytes || !success) {
-    gpr_log(GPR_INFO, "Read handler done");
+  GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+  if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+    gpr_log(GPR_INFO, "Read handler shutdown");
     gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    state->read_done = 1 + success;
+    state->read_done = 1;
     grpc_pollset_kick(g_pollset, NULL);
     gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-  } else if (success) {
-    switch (grpc_endpoint_read(state->read_ep, &state->incoming,
-                               &state->done_read)) {
-      case GRPC_ENDPOINT_ERROR:
-        read_and_write_test_read_handler(data, 0);
-        break;
-      case GRPC_ENDPOINT_DONE:
-        read_and_write_test_read_handler(data, 1);
-        break;
-      case GRPC_ENDPOINT_PENDING:
-        break;
-    }
+    return;
+  }
+
+  state->bytes_read +=
+      count_and_unref_slices(slices, nslices, &state->current_read_data);
+  if (state->bytes_read == state->target_bytes) {
+    gpr_log(GPR_INFO, "Read handler done");
+    gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+    state->read_done = 1;
+    grpc_pollset_kick(g_pollset, NULL);
+    gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+  } else {
+    grpc_endpoint_notify_on_read(state->read_ep,
+                                 read_and_write_test_read_handler, data);
   }
 }
 
-static void read_and_write_test_write_handler(void *data, int success) {
+static void read_and_write_test_write_handler(void *data,
+                                              grpc_endpoint_cb_status error) {
   struct read_and_write_test_state *state = data;
   gpr_slice *slices = NULL;
   size_t nslices;
-  grpc_endpoint_op_status write_status;
+  grpc_endpoint_write_status write_status;
 
-  if (success) {
-    for (;;) {
-      /* Need to do inline writes until they don't succeed synchronously or we
-         finish writing */
-      state->bytes_written += state->current_write_size;
-      if (state->target_bytes - state->bytes_written <
-          state->current_write_size) {
-        state->current_write_size = state->target_bytes - state->bytes_written;
-      }
-      if (state->current_write_size == 0) {
-        break;
-      }
+  GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
 
-      slices = allocate_blocks(state->current_write_size, 8192, &nslices,
-                               &state->current_write_data);
-      gpr_slice_buffer_reset_and_unref(&state->outgoing);
-      gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
-      write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
-                                         &state->done_write);
-      gpr_log(GPR_DEBUG, "write_status=%d", write_status);
-      GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
-      free(slices);
-      if (write_status == GRPC_ENDPOINT_PENDING) {
-        return;
-      }
-    }
-    GPR_ASSERT(state->bytes_written == state->target_bytes);
+  gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
+          error);
+
+  if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+    gpr_log(GPR_INFO, "Write handler shutdown");
+    gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+    state->write_done = 1;
+    grpc_pollset_kick(g_pollset, NULL);
+    gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+    return;
   }
 
+  for (;;) {
+    /* Need to do inline writes until they don't succeed synchronously or we
+       finish writing */
+    state->bytes_written += state->current_write_size;
+    if (state->target_bytes - state->bytes_written <
+        state->current_write_size) {
+      state->current_write_size = state->target_bytes - state->bytes_written;
+    }
+    if (state->current_write_size == 0) {
+      break;
+    }
+
+    slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+                             &state->current_write_data);
+    write_status =
+        grpc_endpoint_write(state->write_ep, slices, nslices,
+                            read_and_write_test_write_handler, state);
+    gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+    GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
+    free(slices);
+    if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+      return;
+    }
+  }
+  GPR_ASSERT(state->bytes_written == state->target_bytes);
+
   gpr_log(GPR_INFO, "Write handler done");
   gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-  state->write_done = 1 + success;
+  state->write_done = 1;
   grpc_pollset_kick(g_pollset, NULL);
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 }
@@ -222,31 +234,16 @@
   state.write_done = 0;
   state.current_read_data = 0;
   state.current_write_data = 0;
-  grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
-                          &state);
-  grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
-                          &state);
-  gpr_slice_buffer_init(&state.outgoing);
-  gpr_slice_buffer_init(&state.incoming);
 
   /* Get started by pretending an initial write completed */
   /* NOTE: Sets up initial conditions so we can have the same write handler
      for the first iteration as for later iterations. It does the right thing
      even when bytes_written is unsigned. */
   state.bytes_written -= state.current_write_size;
-  read_and_write_test_write_handler(&state, 1);
+  read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
 
-  switch (
-      grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
-    case GRPC_ENDPOINT_PENDING:
-      break;
-    case GRPC_ENDPOINT_ERROR:
-      read_and_write_test_read_handler(&state, 0);
-      break;
-    case GRPC_ENDPOINT_DONE:
-      read_and_write_test_read_handler(&state, 1);
-      break;
-  }
+  grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
+                               &state);
 
   if (shutdown) {
     gpr_log(GPR_DEBUG, "shutdown read");
@@ -266,8 +263,6 @@
 
   grpc_endpoint_destroy(state.read_ep);
   grpc_endpoint_destroy(state.write_ep);
-  gpr_slice_buffer_destroy(&state.outgoing);
-  gpr_slice_buffer_destroy(&state.incoming);
   end_test(config);
 }
 
@@ -278,40 +273,36 @@
 typedef struct {
   int done;
   grpc_endpoint *ep;
-  gpr_slice_buffer incoming;
-  grpc_iomgr_closure done_read;
 } shutdown_during_write_test_state;
 
-static void shutdown_during_write_test_read_handler(void *user_data,
-                                                    int success) {
+static void shutdown_during_write_test_read_handler(
+    void *user_data, gpr_slice *slices, size_t nslices,
+    grpc_endpoint_cb_status error) {
+  size_t i;
   shutdown_during_write_test_state *st = user_data;
 
-  if (!success) {
+  for (i = 0; i < nslices; i++) {
+    gpr_slice_unref(slices[i]);
+  }
+
+  if (error != GRPC_ENDPOINT_CB_OK) {
     grpc_endpoint_destroy(st->ep);
     gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-    st->done = 1;
+    st->done = error;
     grpc_pollset_kick(g_pollset, NULL);
     gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
   } else {
-    switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
-      case GRPC_ENDPOINT_PENDING:
-        break;
-      case GRPC_ENDPOINT_ERROR:
-        shutdown_during_write_test_read_handler(user_data, 0);
-        break;
-      case GRPC_ENDPOINT_DONE:
-        shutdown_during_write_test_read_handler(user_data, 1);
-        break;
-    }
+    grpc_endpoint_notify_on_read(
+        st->ep, shutdown_during_write_test_read_handler, user_data);
   }
 }
 
-static void shutdown_during_write_test_write_handler(void *user_data,
-                                                     int success) {
+static void shutdown_during_write_test_write_handler(
+    void *user_data, grpc_endpoint_cb_status error) {
   shutdown_during_write_test_state *st = user_data;
-  gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
-          success);
-  if (success) {
+  gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
+          error);
+  if (error == 0) {
     /* This happens about 0.5% of the time when run under TSAN, and is entirely
        legitimate, but means we aren't testing the path we think we are. */
     /* TODO(klempner): Change this test to retry the write in that case */
@@ -334,8 +325,6 @@
   shutdown_during_write_test_state read_st;
   shutdown_during_write_test_state write_st;
   gpr_slice *slices;
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure done_write;
   grpc_endpoint_test_fixture f =
       begin_test(config, "shutdown_during_write_test", slice_size);
 
@@ -346,26 +335,19 @@
   read_st.done = 0;
   write_st.done = 0;
 
-  grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
-                          &write_st);
-  grpc_iomgr_closure_init(&read_st.done_read,
-                          shutdown_during_write_test_read_handler, &read_st);
-  gpr_slice_buffer_init(&read_st.incoming);
-  gpr_slice_buffer_init(&outgoing);
-
-  GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
-                                &read_st.done_read) == GRPC_ENDPOINT_PENDING);
+  grpc_endpoint_notify_on_read(
+      read_st.ep, shutdown_during_write_test_read_handler, &read_st);
   for (size = 1;; size *= 2) {
     slices = allocate_blocks(size, 1, &nblocks, &current_data);
-    gpr_slice_buffer_reset_and_unref(&outgoing);
-    gpr_slice_buffer_addn(&outgoing, slices, nblocks);
-    switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
-      case GRPC_ENDPOINT_DONE:
+    switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
+                                shutdown_during_write_test_write_handler,
+                                &write_st)) {
+      case GRPC_ENDPOINT_WRITE_DONE:
         break;
-      case GRPC_ENDPOINT_ERROR:
+      case GRPC_ENDPOINT_WRITE_ERROR:
         gpr_log(GPR_ERROR, "error writing");
         abort();
-      case GRPC_ENDPOINT_PENDING:
+      case GRPC_ENDPOINT_WRITE_PENDING:
         grpc_endpoint_shutdown(write_st.ep);
         deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
         gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -386,8 +368,6 @@
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         gpr_free(slices);
-        gpr_slice_buffer_destroy(&read_st.incoming);
-        gpr_slice_buffer_destroy(&outgoing);
         end_test(config);
         return;
     }
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 8acaa43..6ad8322 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,12 +118,10 @@
   grpc_endpoint *ep;
   ssize_t read_bytes;
   ssize_t target_read_bytes;
-  gpr_slice_buffer incoming;
-  grpc_iomgr_closure read_cb;
 };
 
-static ssize_t count_slices(gpr_slice *slices, size_t nslices,
-                            int *current_data) {
+static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+                                      int *current_data) {
   ssize_t num_bytes = 0;
   unsigned i, j;
   unsigned char *buf;
@@ -134,41 +132,31 @@
       *current_data = (*current_data + 1) % 256;
     }
     num_bytes += GPR_SLICE_LENGTH(slices[i]);
+    gpr_slice_unref(slices[i]);
   }
   return num_bytes;
 }
 
-static void read_cb(void *user_data, int success) {
+static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
+                    grpc_endpoint_cb_status error) {
   struct read_socket_state *state = (struct read_socket_state *)user_data;
   ssize_t read_bytes;
   int current_data;
 
-  GPR_ASSERT(success);
+  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   current_data = state->read_bytes % 256;
-  read_bytes = count_slices(state->incoming.slices, state->incoming.count,
-                            &current_data);
+  read_bytes = count_and_unref_slices(slices, nslices, &current_data);
   state->read_bytes += read_bytes;
   gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
           state->target_read_bytes);
   if (state->read_bytes >= state->target_read_bytes) {
-    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+    /* empty */
   } else {
-    switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
-      case GRPC_ENDPOINT_DONE:
-        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
-        read_cb(user_data, 1);
-        break;
-      case GRPC_ENDPOINT_ERROR:
-        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
-        read_cb(user_data, 0);
-        break;
-      case GRPC_ENDPOINT_PENDING:
-        gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
-        break;
-    }
+    grpc_endpoint_notify_on_read(state->ep, read_cb, state);
   }
+  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
 
 /* Write to a socket, then read from it using the grpc_tcp API. */
@@ -193,19 +181,8 @@
   state.ep = ep;
   state.read_bytes = 0;
   state.target_read_bytes = written_bytes;
-  gpr_slice_buffer_init(&state.incoming);
-  grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
 
-  switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
-    case GRPC_ENDPOINT_DONE:
-      read_cb(&state, 1);
-      break;
-    case GRPC_ENDPOINT_ERROR:
-      read_cb(&state, 0);
-      break;
-    case GRPC_ENDPOINT_PENDING:
-      break;
-  }
+  grpc_endpoint_notify_on_read(ep, read_cb, &state);
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
@@ -216,7 +193,6 @@
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
-  gpr_slice_buffer_destroy(&state.incoming);
   grpc_endpoint_destroy(ep);
 }
 
@@ -243,19 +219,8 @@
   state.ep = ep;
   state.read_bytes = 0;
   state.target_read_bytes = written_bytes;
-  gpr_slice_buffer_init(&state.incoming);
-  grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
 
-  switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
-    case GRPC_ENDPOINT_DONE:
-      read_cb(&state, 1);
-      break;
-    case GRPC_ENDPOINT_ERROR:
-      read_cb(&state, 0);
-      break;
-    case GRPC_ENDPOINT_PENDING:
-      break;
-  }
+  grpc_endpoint_notify_on_read(ep, read_cb, &state);
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
@@ -266,7 +231,6 @@
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
-  gpr_slice_buffer_destroy(&state.incoming);
   grpc_endpoint_destroy(ep);
 }
 
@@ -298,7 +262,8 @@
   return slices;
 }
 
-static void write_done(void *user_data /* write_socket_state */, int success) {
+static void write_done(void *user_data /* write_socket_state */,
+                       grpc_endpoint_cb_status error) {
   struct write_socket_state *state = (struct write_socket_state *)user_data;
   gpr_log(GPR_INFO, "Write done callback called");
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -374,8 +339,6 @@
   size_t num_blocks;
   gpr_slice *slices;
   int current_data = 0;
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure write_done_closure;
   gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
 
   gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -392,21 +355,74 @@
 
   slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
 
-  gpr_slice_buffer_init(&outgoing);
-  gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
-  grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
+  if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
+      GRPC_ENDPOINT_WRITE_DONE) {
+    /* Write completed immediately */
+    read_bytes = drain_socket(sv[0]);
+    GPR_ASSERT(read_bytes == num_bytes);
+  } else {
+    drain_socket_blocking(sv[0], num_bytes, num_bytes);
+    gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+    for (;;) {
+      grpc_pollset_worker worker;
+      if (state.write_done) {
+        break;
+      }
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
+    }
+    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+  }
 
-  switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
-    case GRPC_ENDPOINT_DONE:
+  grpc_endpoint_destroy(ep);
+  gpr_free(slices);
+}
+
+static void read_done_for_write_error(void *ud, gpr_slice *slices,
+                                      size_t nslices,
+                                      grpc_endpoint_cb_status error) {
+  GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
+  GPR_ASSERT(nslices == 0);
+}
+
+/* Write to a socket using the grpc_tcp API, then drain it directly.
+   Note that if the write does not complete immediately we need to drain the
+   socket in parallel with the read. */
+static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
+  int sv[2];
+  grpc_endpoint *ep;
+  struct write_socket_state state;
+  size_t num_blocks;
+  gpr_slice *slices;
+  int current_data = 0;
+  grpc_pollset_worker worker;
+  gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+
+  gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
+          num_bytes, slice_size);
+
+  create_sockets(sv);
+
+  ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
+                       GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
+  grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
+  close(sv[0]);
+
+  state.ep = ep;
+  state.write_done = 0;
+
+  slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
+
+  switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
+    case GRPC_ENDPOINT_WRITE_DONE:
+    case GRPC_ENDPOINT_WRITE_ERROR:
       /* Write completed immediately */
-      read_bytes = drain_socket(sv[0]);
-      GPR_ASSERT(read_bytes == num_bytes);
       break;
-    case GRPC_ENDPOINT_PENDING:
-      drain_socket_blocking(sv[0], num_bytes, num_bytes);
+    case GRPC_ENDPOINT_WRITE_PENDING:
+      grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
       gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
       for (;;) {
-        grpc_pollset_worker worker;
         if (state.write_done) {
           break;
         }
@@ -415,14 +431,10 @@
       }
       gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
       break;
-    case GRPC_ENDPOINT_ERROR:
-      gpr_log(GPR_ERROR, "endpoint got error");
-      abort();
   }
 
-  gpr_slice_buffer_destroy(&outgoing);
   grpc_endpoint_destroy(ep);
-  gpr_free(slices);
+  free(slices);
 }
 
 void run_tests(void) {
@@ -442,6 +454,10 @@
   write_test(100000, 137);
 
   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+    write_error_test(40320, i);
+  }
+
+  for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
     write_test(40320, i);
   }
 }
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index c76ddcd..a8368fc 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -135,26 +135,62 @@
      secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
 };
 
-static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
-  grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
-  gpr_slice_buffer incoming;
+static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices,
+                            grpc_endpoint_cb_status error) {
   gpr_slice s =
       gpr_slice_from_copied_string("hello world 12345678900987654321");
+
+  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+  GPR_ASSERT(nslices == 1);
+
+  GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
+  gpr_slice_unref(slices[0]);
+  gpr_slice_unref(s);
+  *(int *)user_data = 1;
+}
+
+static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
+  grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
+  int verified = 0;
   gpr_log(GPR_INFO, "Start test left over");
 
-  gpr_slice_buffer_init(&incoming);
-  GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
-             GRPC_ENDPOINT_DONE);
-  GPR_ASSERT(incoming.count == 1);
-  GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
+  grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified);
+  GPR_ASSERT(verified == 1);
 
   grpc_endpoint_shutdown(f.client_ep);
   grpc_endpoint_shutdown(f.server_ep);
   grpc_endpoint_destroy(f.client_ep);
   grpc_endpoint_destroy(f.server_ep);
-  gpr_slice_unref(s);
-  gpr_slice_buffer_destroy(&incoming);
+  clean_up();
+}
 
+static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices,
+                          grpc_endpoint_cb_status error) {
+  grpc_endpoint_test_fixture *f = user_data;
+  gpr_slice s =
+      gpr_slice_from_copied_string("hello world 12345678900987654321");
+
+  GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+  GPR_ASSERT(nslices == 1);
+
+  grpc_endpoint_shutdown(f->client_ep);
+  grpc_endpoint_destroy(f->client_ep);
+
+  GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
+  gpr_slice_unref(slices[0]);
+  gpr_slice_unref(s);
+}
+
+/* test which destroys the ep before finishing reading */
+static void test_destroy_ep_early(grpc_endpoint_test_config config,
+                                  size_t slice_size) {
+  grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
+  gpr_log(GPR_INFO, "Start test destroy early");
+
+  grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
+
+  grpc_endpoint_shutdown(f.server_ep);
+  grpc_endpoint_destroy(f.server_ep);
   clean_up();
 }
 
@@ -167,6 +203,7 @@
   grpc_pollset_init(&g_pollset);
   grpc_endpoint_tests(configs[0], &g_pollset);
   test_leftover(configs[1], 1);
+  test_destroy_ep_early(configs[1], 1);
   grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
   grpc_iomgr_shutdown();
 
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 4781d33..836e62a 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -198,13 +198,14 @@
      races with other processes on kernels that want to reuse the same
      port numbers over and over. */
 
-  /* In alternating iterations we trial UDP ports before TCP ports UDP
+  /* In alternating iterations we try UDP ports before TCP ports UDP
      ports -- it could be the case that this machine has been using up
      UDP ports and they are scarcer. */
 
   /* Type of port to first pick in next iteration */
   int is_tcp = 1;
-  int trial = 0;
+  int try
+    = 0;
 
   char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
   if (env) {
@@ -217,10 +218,11 @@
 
   for (;;) {
     int port;
-    trial++;
-    if (trial == 1) {
+    try
+      ++;
+    if (try == 1) {
       port = getpid() % (65536 - 30000) + 30000;
-    } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) {
+    } else if (try <= NUM_RANDOM_PORTS_TO_PICK) {
       port = rand() % (65536 - 30000) + 30000;
     } else {
       port = 0;
@@ -237,7 +239,7 @@
     GPR_ASSERT(port > 0);
     /* Check that the port # is free for the other type of socket also */
     if (!is_port_available(&port, !is_tcp)) {
-      /* In the next iteration trial to bind to the other type first
+      /* In the next iteration try to bind to the other type first
          because perhaps it is more rare. */
       is_tcp = !is_tcp;
       continue;
diff --git a/test/core/util/port_windows.c b/test/core/util/port_windows.c
index 2f64626..5b072f8 100644
--- a/test/core/util/port_windows.c
+++ b/test/core/util/port_windows.c
@@ -35,6 +35,7 @@
 #include "test/core/util/test_config.h"
 #if defined(GPR_WINSOCK_SOCKET) && defined(GRPC_TEST_PICK_PORT)
 
+#include "src/core/iomgr/sockaddr_utils.h"
 #include "test/core/util/port.h"
 
 #include <process.h>
@@ -42,14 +43,8 @@
 #include <errno.h>
 #include <string.h>
 
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
-#include "src/core/support/env.h"
-#include "src/core/httpcli/httpcli.h"
-#include "src/core/iomgr/sockaddr_utils.h"
-
 #define NUM_RANDOM_PORTS_TO_PICK 100
 
 static int is_port_available(int *port, int is_tcp) {
@@ -104,67 +99,6 @@
   return 1;
 }
 
-typedef struct portreq {
-  grpc_pollset pollset;
-  int port;
-} portreq;
-
-static void got_port_from_server(void *arg,
-                                 const grpc_httpcli_response *response) {
-  size_t i;
-  int port = 0;
-  portreq *pr = arg;
-  GPR_ASSERT(response);
-  GPR_ASSERT(response->status == 200);
-  for (i = 0; i < response->body_length; i++) {
-    GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
-    port = port * 10 + response->body[i] - '0';
-  }
-  GPR_ASSERT(port > 1024);
-  gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset));
-  pr->port = port;
-  grpc_pollset_kick(&pr->pollset, NULL);
-  gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset));
-}
-
-static void destroy_pollset_and_shutdown(void *p) {
-  grpc_pollset_destroy(p);
-  grpc_shutdown();
-}
-
-static int pick_port_using_server(char *server) {
-  grpc_httpcli_context context;
-  grpc_httpcli_request req;
-  portreq pr;
-
-  grpc_init();
-
-  memset(&pr, 0, sizeof(pr));
-  memset(&req, 0, sizeof(req));
-  grpc_pollset_init(&pr.pollset);
-  pr.port = -1;
-
-  req.host = server;
-  req.path = "/get";
-
-  grpc_httpcli_context_init(&context);
-  grpc_httpcli_get(&context, &pr.pollset, &req,
-                   GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
-                   &pr);
-  gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
-  while (pr.port == -1) {
-    grpc_pollset_worker worker;
-    grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
-                      GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
-  }
-  gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
-
-  grpc_httpcli_context_destroy(&context);
-  grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset);
-
-  return pr.port;
-}
-
 int grpc_pick_unused_port(void) {
   /* We repeatedly pick a port and then see whether or not it is
      available for use both as a TCP socket and a UDP socket.  First, we
@@ -174,29 +108,22 @@
      races with other processes on kernels that want to reuse the same
      port numbers over and over. */
 
-  /* In alternating iterations we trial UDP ports before TCP ports UDP
+  /* In alternating iterations we try UDP ports before TCP ports UDP
      ports -- it could be the case that this machine has been using up
      UDP ports and they are scarcer. */
 
   /* Type of port to first pick in next iteration */
   int is_tcp = 1;
-  int trial = 0;
-
-  char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
-  if (env) {
-    int port = pick_port_using_server(env);
-    gpr_free(env);
-    if (port != 0) {
-      return port;
-    }
-  }
+  int try
+    = 0;
 
   for (;;) {
     int port;
-    trial++;
-    if (trial == 1) {
+    try
+      ++;
+    if (try == 1) {
       port = _getpid() % (65536 - 30000) + 30000;
-    } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) {
+    } else if (try <= NUM_RANDOM_PORTS_TO_PICK) {
       port = rand() % (65536 - 30000) + 30000;
     } else {
       port = 0;
@@ -209,7 +136,7 @@
     GPR_ASSERT(port > 0);
     /* Check that the port # is free for the other type of socket also */
     if (!is_port_available(&port, !is_tcp)) {
-      /* In the next iteration trial to bind to the other type first
+      /* In the next iteration try to bind to the other type first
          because perhaps it is more rare. */
       is_tcp = !is_tcp;
       continue;