Merge branch 'endpoints' into second-coming
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 09a457d..0799622 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -52,7 +52,6 @@
 
 static gpr_event g_shutdown_iocp;
 static gpr_event g_iocp_done;
-static gpr_atm g_orphans = 0;
 static gpr_atm g_custom_events = 0;
 
 static HANDLE g_iocp;
@@ -92,22 +91,13 @@
     gpr_log(GPR_ERROR, "Unknown IOCP operation");
     abort();
   }
-  GPR_ASSERT(info->outstanding);
-  if (socket->orphan) {
-    info->outstanding = 0;
-    if (!socket->read_info.outstanding && !socket->write_info.outstanding) {
-      grpc_winsocket_destroy(socket);
-      gpr_atm_full_fetch_add(&g_orphans, -1);
-    }
-    return;
-  }
   success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
                                    FALSE, &flags);
   info->bytes_transfered = bytes;
   info->wsa_error = success ? 0 : WSAGetLastError();
   GPR_ASSERT(overlapped == &info->overlapped);
-  gpr_mu_lock(&socket->state_mu);
   GPR_ASSERT(!info->has_pending_iocp);
+  gpr_mu_lock(&socket->state_mu);
   if (info->cb) {
     f = info->cb;
     opaque = info->opaque;
@@ -120,7 +110,7 @@
 }
 
 static void iocp_loop(void *p) {
-  while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) ||
+  while (gpr_atm_acq_load(&g_custom_events) || 
          !gpr_event_get(&g_shutdown_iocp)) {
     grpc_maybe_call_delayed_callbacks(NULL, 1);
     do_iocp_work();
@@ -175,12 +165,6 @@
   GPR_ASSERT(ret == g_iocp);
 }
 
-void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
-  GPR_ASSERT(!socket->orphan);
-  gpr_atm_full_fetch_add(&g_orphans, 1);
-  socket->orphan = 1;
-}
-
 /* Calling notify_on_read or write means either of two things:
    -) The IOCP already completed in the background, and we need to call
    the callback now.
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index ee3847a..7d2dc45 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -42,7 +42,6 @@
 void grpc_iocp_kick(void);
 void grpc_iocp_shutdown(void);
 void grpc_iocp_add_socket(grpc_winsocket *);
-void grpc_iocp_socket_orphan(grpc_winsocket *);
 
 void grpc_socket_notify_on_write(grpc_winsocket *,
                                  void (*cb)(void *, int success), void *opaque);
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 7d84213..dbc6463 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -62,46 +62,13 @@
    operations to abort them. We need to do that this way because of the
    various callsites of that function, which happens to be in various
    mutex hold states, and that'd be unsafe to call them directly. */
-int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
-  int callbacks_set = 0;
-  SOCKET socket;
-  gpr_mu_lock(&winsocket->state_mu);
-  socket = winsocket->socket;
-  if (winsocket->read_info.cb) {
-    callbacks_set++;
-    grpc_iomgr_closure_init(&winsocket->shutdown_closure,
-                            winsocket->read_info.cb,
-                            winsocket->read_info.opaque);
-    grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
-  }
-  if (winsocket->write_info.cb) {
-    callbacks_set++;
-    grpc_iomgr_closure_init(&winsocket->shutdown_closure,
-                            winsocket->write_info.cb,
-                            winsocket->write_info.opaque);
-    grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
-  }
-  gpr_mu_unlock(&winsocket->state_mu);
-  closesocket(socket);
-  return callbacks_set;
-}
-
-/* Abandons a socket. Either we're going to queue it up for garbage collecting
-   from the IO Completion Port thread, or destroy it immediately. Note that this
-   mechanisms assumes that we're either always waiting for an operation, or we
-   explicitly know that we don't. If there is a future case where we can have
-   an "idle" socket which is neither trying to read or write, we'd start leaking
-   both memory and sockets. */
-void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
-  grpc_iomgr_unregister_object(&winsocket->iomgr_object);
-  if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
-    grpc_iocp_socket_orphan(winsocket);
-  } else {
-    grpc_winsocket_destroy(winsocket);
-  }
+void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
+  shutdown(winsocket->socket, SD_BOTH);
 }
 
 void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
+  closesocket(winsocket->socket);
+  grpc_iomgr_unregister_object(&winsocket->iomgr_object);
   gpr_mu_destroy(&winsocket->state_mu);
   gpr_free(winsocket);
 }
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index ecf2530..498921e 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -68,8 +68,6 @@
   /* The results of the overlapped operation. */
   DWORD bytes_transfered;
   int wsa_error;
-  /* A boolean indicating that we started an operation. */
-  int outstanding;
 } grpc_winsocket_callback_info;
 
 /* This is a wrapper to a Windows socket. A socket can have one outstanding
@@ -92,10 +90,6 @@
   /* You can't add the same socket twice to the same IO Completion Port.
      This prevents that. */
   int added_to_iocp;
-  /* A boolean to indicate that the caller has abandoned that socket, but
-     there is a pending operation that the IO Completion Port will have to
-     wait for. The socket will be collected at that time. */
-  int orphan;
 
   grpc_iomgr_closure shutdown_closure;
 
@@ -108,14 +102,10 @@
 grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name);
 
 /* Initiate an asynchronous shutdown of the socket. Will call off any pending
-   operation to cancel them. Returns the number of callbacks that got setup. */
-int grpc_winsocket_shutdown(grpc_winsocket *socket);
+   operation to cancel them. */
+void grpc_winsocket_shutdown(grpc_winsocket *socket);
 
-/* Abandon a socket. */
-void grpc_winsocket_orphan(grpc_winsocket *socket);
-
-/* Destroy a socket. Should only be called by the IO Completion Port thread,
-   or by grpc_winsocket_orphan if there's no pending operation. */
+/* Destroy a socket. Should only be called if there's no pending operation. */
 void grpc_winsocket_destroy(grpc_winsocket *socket);
 
 #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 79a58fe..665ef28 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -102,7 +102,6 @@
     DWORD flags;
     BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
                                               &transfered_bytes, FALSE, &flags);
-    info->outstanding = 0;
     GPR_ASSERT(transfered_bytes == 0);
     if (!wsa_success) {
       char *utf8_message = gpr_format_message(WSAGetLastError());
@@ -125,12 +124,10 @@
     return;
   }
 
-  ac->socket->write_info.outstanding = 0;
-
   /* If we don't have an endpoint, it means the connection failed,
      so it doesn't matter if it aborted or failed. We need to orphan
      that socket. */
-  if (!ep || aborted) grpc_winsocket_orphan(ac->socket);
+  if (!ep || aborted) grpc_winsocket_destroy(ac->socket);
   async_connect_cleanup(ac);
   /* If the connection was aborted, the callback was already called when
      the deadline was met. */
@@ -196,7 +193,6 @@
 
   socket = grpc_winsocket_create(sock, "client");
   info = &socket->write_info;
-  info->outstanding = 1;
   success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
 
   /* It wouldn't be unusual to get a success immediately. But we'll still get
@@ -220,7 +216,6 @@
 
   grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
                   gpr_now(GPR_CLOCK_MONOTONIC));
-  socket->write_info.outstanding = 1;
   grpc_socket_notify_on_write(socket, on_connect, ac);
   return;
 
@@ -229,7 +224,7 @@
   gpr_log(GPR_ERROR, message, utf8_message);
   gpr_free(utf8_message);
   if (socket) {
-    grpc_winsocket_orphan(socket);
+    grpc_winsocket_destroy(socket);
   } else if (sock != INVALID_SOCKET) {
     closesocket(sock);
   }
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d0478d3..79ea223 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -116,7 +116,7 @@
   for (i = 0; i < s->nports; i++) {
     server_port *sp = &s->ports[i];
     sp->shutting_down = 1;
-    s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket);
+    grpc_winsocket_shutdown(sp->socket);
   }
   /* This happens asynchronously. Wait while that happens. */
   while (s->active_ports || s->iomgr_callbacks_pending) {
@@ -129,7 +129,7 @@
      closed by the system. */
   for (i = 0; i < s->nports; i++) {
     server_port *sp = &s->ports[i];
-    grpc_winsocket_orphan(sp->socket);
+    grpc_winsocket_destroy(sp->socket);
   }
   gpr_free(s->ports);
   gpr_free(s);
@@ -189,7 +189,6 @@
 
 static void decrement_active_ports_and_notify(server_port *sp) {
   sp->shutting_down = 0;
-  sp->socket->read_info.outstanding = 0;
   gpr_mu_lock(&sp->server->mu);
   GPR_ASSERT(sp->server->active_ports > 0);
   if (0 == --sp->server->active_ports) {
@@ -462,7 +461,6 @@
   s->cb = cb;
   s->cb_arg = cb_arg;
   for (i = 0; i < s->nports; i++) {
-    s->ports[i].socket->read_info.outstanding = 1;
     start_accept(s->ports + i);
     s->active_ports++;
   }
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 58f9160..fe3673c 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -97,7 +97,7 @@
 } grpc_tcp;
 
 static void tcp_free(grpc_tcp *tcp) {
-  grpc_winsocket_orphan(tcp->socket);
+  grpc_winsocket_destroy(tcp->socket);
   gpr_mu_destroy(&tcp->mu);
   gpr_free(tcp->peer_string);
   gpr_free(tcp);
@@ -135,55 +135,35 @@
 #endif
 
 /* Asynchronous callback from the IOCP, or the background thread. */
-static int on_read(grpc_tcp *tcp, int from_iocp) {
+static int on_read(grpc_tcp *tcp, int success) {
   grpc_winsocket *socket = tcp->socket;
   gpr_slice sub;
   gpr_slice *slice = NULL;
   size_t nslices = 0;
-  int success;
   grpc_winsocket_callback_info *info = &socket->read_info;
   int do_abort = 0;
 
-  gpr_mu_lock(&tcp->mu);
-  if (!from_iocp || tcp->shutting_down) {
-    /* If we are here with from_iocp set to true, it means we got raced to
-    shutting down the endpoint. No actual abort callback will happen
-    though, so we're going to do it from here. */
-    do_abort = 1;
-  }
-  gpr_mu_unlock(&tcp->mu);
-
-  if (do_abort) {
-    if (from_iocp) {
-      tcp->socket->read_info.outstanding = 0;
-      gpr_slice_unref(tcp->read_slice);
-    }
-    return 0;
-  }
-
-  GPR_ASSERT(tcp->socket->read_info.outstanding);
-
-  if (socket->read_info.wsa_error != 0) {
-    if (socket->read_info.wsa_error != WSAECONNRESET) {
-      char *utf8_message = gpr_format_message(info->wsa_error);
-      gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
-      gpr_free(utf8_message);
-    }
-    success = 0;
-    gpr_slice_unref(tcp->read_slice);
-  } else {
-    if (info->bytes_transfered != 0) {
-      sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
-      gpr_slice_buffer_add(tcp->read_slices, sub);
-      success = 1;
-    } else {
-      gpr_slice_unref(tcp->read_slice);
+  if (success) {
+    if (socket->read_info.wsa_error != 0) {
+      if (socket->read_info.wsa_error != WSAECONNRESET) {
+        char *utf8_message = gpr_format_message(info->wsa_error);
+        gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
+        gpr_free(utf8_message);
+      }
       success = 0;
+      gpr_slice_unref(tcp->read_slice);
+    } else {
+      if (info->bytes_transfered != 0) {
+        sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
+        gpr_slice_buffer_add(tcp->read_slices, sub);
+        success = 1;
+      } else {
+        gpr_slice_unref(tcp->read_slice);
+        success = 0;
+      }
     }
   }
 
-  tcp->socket->read_info.outstanding = 0;
-
   return success;
 }
 
@@ -209,14 +189,10 @@
   DWORD flags = 0;
   WSABUF buffer;
 
-  GPR_ASSERT(!tcp->socket->read_info.outstanding);
   if (tcp->shutting_down) {
     return GRPC_ENDPOINT_ERROR;
   }
 
-  TCP_REF(tcp, "read");
-
-  tcp->socket->read_info.outstanding = 1;
   tcp->read_cb = cb;
   tcp->read_slices = read_slices;
   gpr_slice_buffer_reset_and_unref(read_slices);
@@ -236,10 +212,11 @@
     int ok;
     info->bytes_transfered = bytes_read;
     ok = on_read(tcp, 1);
-    TCP_UNREF(tcp, "read");
     return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
   }
 
+  TCP_REF(tcp, "read");
+
   /* Otherwise, let's retry, by queuing a read. */
   memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
   status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
@@ -260,52 +237,31 @@
 }
 
 /* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(void *tcpp, int from_iocp) {
+static void on_write(void *tcpp, int success) {
   grpc_tcp *tcp = (grpc_tcp *)tcpp;
   grpc_winsocket *handle = tcp->socket;
   grpc_winsocket_callback_info *info = &handle->write_info;
   grpc_iomgr_closure *cb;
-  int success;
   int do_abort = 0;
 
   gpr_mu_lock(&tcp->mu);
   cb = tcp->write_cb;
   tcp->write_cb = NULL;
-  if (!from_iocp || tcp->shutting_down) {
-    /* If we are here with from_iocp set to true, it means we got raced to
-        shutting down the endpoint. No actual abort callback will happen
-        though, so we're going to do it from here. */
-    do_abort = 1;
-  }
   gpr_mu_unlock(&tcp->mu);
 
-  if (do_abort) {
-    if (from_iocp) {
-      tcp->socket->write_info.outstanding = 0;
+  if (success) {
+    if (info->wsa_error != 0) {
+      if (info->wsa_error != WSAECONNRESET) {
+        char *utf8_message = gpr_format_message(info->wsa_error);
+        gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
+        gpr_free(utf8_message);
+      }
+      success = 0;
+    } else {
+      GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
     }
-    TCP_UNREF(tcp, "write");
-    if (cb) {
-      cb->cb(cb->cb_arg, 0);
-    }
-    return;
   }
 
-  GPR_ASSERT(tcp->socket->write_info.outstanding);
-
-  if (info->wsa_error != 0) {
-    if (info->wsa_error != WSAECONNRESET) {
-      char *utf8_message = gpr_format_message(info->wsa_error);
-      gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
-      gpr_free(utf8_message);
-    }
-    success = 0;
-  } else {
-    GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
-    success = 1;
-  }
-
-  tcp->socket->write_info.outstanding = 0;
-
   TCP_UNREF(tcp, "write");
   cb->cb(cb->cb_arg, success);
 }
@@ -324,13 +280,10 @@
   WSABUF *allocated = NULL;
   WSABUF *buffers = local_buffers;
 
-  GPR_ASSERT(!tcp->socket->write_info.outstanding);
   if (tcp->shutting_down) {
     return GRPC_ENDPOINT_ERROR;
   }
-  TCP_REF(tcp, "write");
 
-  tcp->socket->write_info.outstanding = 1;
   tcp->write_cb = cb;
   tcp->write_slices = slices;
 
@@ -365,11 +318,11 @@
       }
     }
     if (allocated) gpr_free(allocated);
-    tcp->socket->write_info.outstanding = 0;
-    TCP_UNREF(tcp, "write");
     return ret;
   }
 
+  TCP_REF(tcp, "write");
+
   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
      operation, this time asynchronously. */
   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
@@ -380,7 +333,6 @@
   if (status != 0) {
     int wsa_error = WSAGetLastError();
     if (wsa_error != WSA_IO_PENDING) {
-      tcp->socket->write_info.outstanding = 0;
       TCP_UNREF(tcp, "write");
       return GRPC_ENDPOINT_ERROR;
     }
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a1b773b..7a42de9 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -293,6 +293,9 @@
   gpr_refcount refs;
   char *peer_string;
 
+  /** when this drops to zero it's safe to shutdown the endpoint */
+  gpr_refcount shutdown_ep_refs;
+
   gpr_mu mu;
 
   /** is the transport destroying itself? */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8caa10c..aa7a7c9 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -222,6 +222,8 @@
   t->ep = ep;
   /* one ref is for destroy, the other for when ep becomes NULL */
   gpr_ref_init(&t->refs, 2);
+  /* ref is dropped at transport close() */
+  gpr_ref_init(&t->shutdown_ep_refs, 1);
   gpr_mu_init(&t->mu);
   grpc_mdctx_ref(mdctx);
   t->peer_string = grpc_endpoint_get_peer(ep);
@@ -336,13 +338,26 @@
   UNREF_TRANSPORT(t, "destroy");
 }
 
+/** block grpc_endpoint_shutdown being called until a paired 
+    allow_endpoint_shutdown is made */
+static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
+  GPR_ASSERT(t->shutdown_ep_refs.count);
+  gpr_ref(&t->shutdown_ep_refs);
+}
+
+static void allow_endpoint_shutdown(grpc_chttp2_transport *t) {
+  if (gpr_unref(&t->shutdown_ep_refs)) {
+    grpc_endpoint_shutdown(t->ep);
+  }
+}
+
 static void close_transport_locked(grpc_chttp2_transport *t) {
   if (!t->closed) {
     t->closed = 1;
     connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE,
                            "close_transport");
     if (t->ep) {
-      grpc_endpoint_shutdown(t->ep);
+      allow_endpoint_shutdown(t);
     }
   }
 }
@@ -471,6 +486,7 @@
     t->writing_active = 1;
     REF_TRANSPORT(t, "writing");
     grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+    prevent_endpoint_shutdown(t);
   }
 
   run_closures = t->global.pending_closures_head;
@@ -536,6 +552,7 @@
 static void writing_action(void *gt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = gt;
   grpc_chttp2_perform_writes(&t->writing, t->ep);
+  allow_endpoint_shutdown(t);
 }
 
 void grpc_chttp2_add_incoming_goaway(
@@ -1104,21 +1121,28 @@
     read_error_locked(t);
   } else {
     keep_reading = 1;
+    prevent_endpoint_shutdown(t);
   }
   gpr_slice_buffer_reset_and_unref(&t->read_buffer);
   unlock(t);
 
   if (keep_reading) {
+    int ret = -1;
     switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
       case GRPC_ENDPOINT_DONE:
         *success = 1;
-        return 1;
+        ret = 1;
+        break;
       case GRPC_ENDPOINT_ERROR:
         *success = 0;
-        return 1;
+        ret = 1;
+        break;
       case GRPC_ENDPOINT_PENDING:
-        return 0;
+        ret = 0;
+        break;
     }
+    allow_endpoint_shutdown(t);
+    return ret;
   } else {
     UNREF_TRANSPORT(t, "recv_data");
     return 0;
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 276fe75..3abde5a 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -33,13 +33,6 @@
 
 #include "src/core/iomgr/tcp_posix.h"
 
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
 #include <grpc/grpc.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index ef67374..148aa9e 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -39,6 +39,7 @@
 #include <grpc/support/slice.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
+#include <grpc/support/useful.h>
 #include "test/core/util/test_config.h"
 
 /*
@@ -128,6 +129,7 @@
 static void read_and_write_test_read_handler(void *data, int success) {
   struct read_and_write_test_state *state = data;
 
+loop:
   state->bytes_read += count_slices(
       state->incoming.slices, state->incoming.count, &state->current_read_data);
   if (state->bytes_read == state->target_bytes || !success) {
@@ -140,11 +142,11 @@
     switch (grpc_endpoint_read(state->read_ep, &state->incoming,
                                &state->done_read)) {
       case GRPC_ENDPOINT_ERROR:
-        read_and_write_test_read_handler(data, 0);
-        break;
+        success = 0;
+        goto loop;
       case GRPC_ENDPOINT_DONE:
-        read_and_write_test_read_handler(data, 1);
-        break;
+        success = 1;
+        goto loop;
       case GRPC_ENDPOINT_PENDING:
         break;
     }
@@ -176,16 +178,17 @@
       gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
       write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
                                          &state->done_write);
-      gpr_log(GPR_DEBUG, "write_status=%d", write_status);
-      GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
       free(slices);
       if (write_status == GRPC_ENDPOINT_PENDING) {
         return;
+      } else if (write_status == GRPC_ENDPOINT_ERROR) {
+        goto cleanup;
       }
     }
     GPR_ASSERT(state->bytes_written == state->target_bytes);
   }
 
+cleanup:
   gpr_log(GPR_INFO, "Write handler done");
   gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
   state->write_done = 1 + success;
@@ -204,6 +207,8 @@
   gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
   grpc_endpoint_test_fixture f =
       begin_test(config, "read_and_write_test", slice_size);
+  gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
+          num_bytes, write_size, slice_size, shutdown);
 
   if (shutdown) {
     gpr_log(GPR_INFO, "Start read and write shutdown test");
@@ -264,11 +269,11 @@
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 
-  grpc_endpoint_destroy(state.read_ep);
-  grpc_endpoint_destroy(state.write_ep);
+  end_test(config);
   gpr_slice_buffer_destroy(&state.outgoing);
   gpr_slice_buffer_destroy(&state.incoming);
-  end_test(config);
+  grpc_endpoint_destroy(state.read_ep);
+  grpc_endpoint_destroy(state.write_ep);
 }
 
 struct timeout_test_state {
@@ -286,6 +291,7 @@
                                                     int success) {
   shutdown_during_write_test_state *st = user_data;
 
+loop:
   if (!success) {
     grpc_endpoint_destroy(st->ep);
     gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -297,11 +303,11 @@
       case GRPC_ENDPOINT_PENDING:
         break;
       case GRPC_ENDPOINT_ERROR:
-        shutdown_during_write_test_read_handler(user_data, 0);
-        break;
+        success = 0;
+        goto loop;
       case GRPC_ENDPOINT_DONE:
-        shutdown_during_write_test_read_handler(user_data, 1);
-        break;
+        success = 1;
+        goto loop;
     }
   }
 }
@@ -324,86 +330,15 @@
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 }
 
-static void shutdown_during_write_test(grpc_endpoint_test_config config,
-                                       size_t slice_size) {
-  /* test that shutdown with a pending write creates no leaks */
-  gpr_timespec deadline;
-  size_t size;
-  size_t nblocks;
-  int current_data = 1;
-  shutdown_during_write_test_state read_st;
-  shutdown_during_write_test_state write_st;
-  gpr_slice *slices;
-  gpr_slice_buffer outgoing;
-  grpc_iomgr_closure done_write;
-  grpc_endpoint_test_fixture f =
-      begin_test(config, "shutdown_during_write_test", slice_size);
-
-  gpr_log(GPR_INFO, "testing shutdown during a write");
-
-  read_st.ep = f.client_ep;
-  write_st.ep = f.server_ep;
-  read_st.done = 0;
-  write_st.done = 0;
-
-  grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
-                          &write_st);
-  grpc_iomgr_closure_init(&read_st.done_read,
-                          shutdown_during_write_test_read_handler, &read_st);
-  gpr_slice_buffer_init(&read_st.incoming);
-  gpr_slice_buffer_init(&outgoing);
-
-  GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
-                                &read_st.done_read) == GRPC_ENDPOINT_PENDING);
-  for (size = 1;; size *= 2) {
-    slices = allocate_blocks(size, 1, &nblocks, &current_data);
-    gpr_slice_buffer_reset_and_unref(&outgoing);
-    gpr_slice_buffer_addn(&outgoing, slices, nblocks);
-    switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
-      case GRPC_ENDPOINT_DONE:
-        break;
-      case GRPC_ENDPOINT_ERROR:
-        gpr_log(GPR_ERROR, "error writing");
-        abort();
-      case GRPC_ENDPOINT_PENDING:
-        grpc_endpoint_shutdown(write_st.ep);
-        deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
-        gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-        while (!write_st.done) {
-          grpc_pollset_worker worker;
-          GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
-                            deadline);
-        }
-        gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-        grpc_endpoint_destroy(write_st.ep);
-        gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
-        while (!read_st.done) {
-          grpc_pollset_worker worker;
-          GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
-                            deadline);
-        }
-        gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-        gpr_free(slices);
-        gpr_slice_buffer_destroy(&read_st.incoming);
-        gpr_slice_buffer_destroy(&outgoing);
-        end_test(config);
-        return;
-    }
-    gpr_free(slices);
-  }
-
-  gpr_log(GPR_ERROR, "should never reach here");
-  abort();
-}
-
 void grpc_endpoint_tests(grpc_endpoint_test_config config,
                          grpc_pollset *pollset) {
+  size_t i;
   g_pollset = pollset;
   read_and_write_test(config, 10000000, 100000, 8192, 0);
   read_and_write_test(config, 1000000, 100000, 1, 0);
   read_and_write_test(config, 100000000, 100000, 1, 1);
-  shutdown_during_write_test(config, 1000);
+  for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+    read_and_write_test(config, 40320, i, i, 0);
+  }
   g_pollset = NULL;
 }