Handle reffing when a cancel or bind gets stuck in the waiting queue
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 311f4f0..9eec816 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -211,9 +211,3 @@
   op.cancel_with_status = GRPC_STATUS_CANCELLED;
   grpc_call_next_op(cur_elem, &op);
 }
-
-void grpc_call_element_recv_status(grpc_call_element *cur_elem,
-                                   grpc_status_code status,
-                                   const char *message) {
-  abort();
-}
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 1eda23b..31b1fc3 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -171,6 +171,9 @@
     *op->recv_state = GRPC_STREAM_CLOSED;
     op->on_done_recv(op->recv_user_data, 1);
   }
+  if (op->on_consumed) {
+    op->on_consumed(op->on_consumed_user_data, 0);
+  }
 }
 
 static void cc_start_transport_op(grpc_call_element *elem,
@@ -264,6 +267,9 @@
           calld->s.waiting_op.recv_user_data = op->recv_user_data;
         }
         gpr_mu_unlock(&chand->mu);
+        if (op->on_consumed) {
+          op->on_consumed(op->on_consumed_user_data, 0);
+        }
       }
       break;
     case CALL_CANCELLED:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 1e066ea..df3adf8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -156,6 +156,8 @@
   /* flags with bits corresponding to write states allowing us to determine
      what was sent */
   gpr_uint16 last_send_contains;
+  /* cancel with this status on the next outgoing transport op */
+  grpc_status_code cancel_with_status;
 
   /* Active ioreqs.
      request_set and request_data contain one element per active ioreq
@@ -247,8 +249,7 @@
 static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
 static void finish_read_ops(grpc_call *call);
 static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
-                                          const char *description,
-                                          gpr_uint8 locked);
+                                          const char *description);
 
 static void lock(grpc_call *call);
 static void unlock(grpc_call *call);
@@ -415,6 +416,7 @@
 
 static int need_more_data(grpc_call *call) {
   if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
+  /* TODO(ctiller): this needs some serious cleanup */
   return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
          (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
           grpc_bbq_empty(&call->incoming_queue)) ||
@@ -423,7 +425,8 @@
          is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
          (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
           grpc_bbq_empty(&call->incoming_queue)) ||
-         (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
+         (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
+         (call->cancel_with_status != GRPC_STATUS_OK);
 }
 
 static void unlock(grpc_call *call) {
@@ -435,11 +438,9 @@
 
   memset(&op, 0, sizeof(op));
 
-  if (!call->bound_pollset && call->cq) {
-    call->bound_pollset = 1;
-    op.bind_pollset = grpc_cq_pollset(call->cq);
-    start_op = 1;
-  }
+  op.cancel_with_status = call->cancel_with_status;
+  start_op = op.cancel_with_status != GRPC_STATUS_OK;
+  call->cancel_with_status = GRPC_STATUS_OK; /* reset */
 
   if (!call->receiving && need_more_data(call)) {
     op.recv_ops = &call->recv_ops;
@@ -459,6 +460,12 @@
     }
   }
 
+  if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
+    call->bound_pollset = 1;
+    op.bind_pollset = grpc_cq_pollset(call->cq);
+    start_op = 1;
+  }
+
   if (!call->completing && call->num_completed_requests != 0) {
     completing_requests = call->num_completed_requests;
     memcpy(completed_requests, call->completed_requests,
@@ -665,7 +672,7 @@
     gpr_asprintf(
         &message, "Message terminated early; read %d bytes, expected %d",
         (int)call->incoming_message.length, (int)call->incoming_message_length);
-    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     gpr_free(message);
     return 0;
   }
@@ -676,7 +683,7 @@
         &message,
         "Maximum message length of %d exceeded by a message of length %d",
         grpc_channel_get_max_message_length(call->channel), msg.length);
-    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     gpr_free(message);
     return 0;
   } else if (msg.length > 0) {
@@ -697,7 +704,7 @@
   /* we have to be reading a message to know what to do here */
   if (!call->reading_message) {
     cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
-                       "Received payload data while not reading a message", 1);
+                       "Received payload data while not reading a message");
     return 0;
   }
   /* append the slice to the incoming buffer */
@@ -708,7 +715,7 @@
     gpr_asprintf(
         &message, "Receiving message overflow; read %d bytes, expected %d",
         (int)call->incoming_message.length, (int)call->incoming_message_length);
-    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+    cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     gpr_free(message);
     return 0;
   } else if (call->incoming_message.length == call->incoming_message_length) {
@@ -1040,35 +1047,43 @@
 grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
                                              grpc_status_code status,
                                              const char *description) {
-  return cancel_with_status(c, status, description, 0);
+  grpc_call_error r;
+  lock(c);
+  r = cancel_with_status(c, status, description);
+  unlock(c);
+  return r;
 }
 
 static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
-                                          const char *description,
-                                          gpr_uint8 locked) {
-  grpc_transport_op op;
+                                          const char *description) {
   grpc_mdstr *details =
       description ? grpc_mdstr_from_string(c->metadata_context, description)
                   : NULL;
-  memset(&op, 0, sizeof(op));
-  op.cancel_with_status = status;
 
-  if (locked == 0) {
-    lock(c);
-  }
+  GPR_ASSERT(status != GRPC_STATUS_OK);
+
   set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
   set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
-  if (locked == 0) {
-    unlock(c);
-  }
 
-  execute_op(c, &op);
+  c->cancel_with_status = status;
 
   return GRPC_CALL_OK;
 }
 
+static void finished_loose_op(void *call, int success_ignored) {
+  GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
+}
+
 static void execute_op(grpc_call *call, grpc_transport_op *op) {
   grpc_call_element *elem;
+
+  GPR_ASSERT(op->on_consumed == NULL);
+  if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
+    GRPC_CALL_INTERNAL_REF(call, "loose-op");
+    op->on_consumed = finished_loose_op;
+    op->on_consumed_user_data = call;
+  }
+
   elem = CALL_ELEM_FROM_CALL(call, 0);
   op->context = call->context;
   elem->filter->start_transport_op(elem, op);
@@ -1081,12 +1096,10 @@
 static void call_alarm(void *arg, int success) {
   grpc_call *call = arg;
   if (success) {
-    if (call->is_client) {
-      cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
-                         "Deadline Exceeded", 0);
-    } else {
-      grpc_call_cancel(call);
-    }
+    lock(call);
+    cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
+                       "Deadline Exceeded");
+    unlock(call);
   }
   GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
 }
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a3b0b26..6c07b01 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -77,6 +77,9 @@
     *op->recv_state = GRPC_STREAM_CLOSED;
     op->on_done_recv(op->recv_user_data, 1);
   }
+  if (op->on_consumed) {
+    op->on_consumed(op->on_consumed_user_data, 0);
+  }
 }
 
 static void channel_op(grpc_channel_element *elem,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 07c5b0b..d9c712c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1162,6 +1162,13 @@
   if (op->bind_pollset) {
     add_to_pollset_locked(t, op->bind_pollset);
   }
+
+  if (op->on_consumed) {
+    op_closure c;
+    c.cb = op->on_consumed;
+    c.user_data = op->on_consumed_user_data;
+    schedule_cb(t, c, 1);    
+  }
 }
 
 static void perform_op(grpc_transport *gt, grpc_stream *gs,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index e0dca22..a9948cd 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -103,6 +103,9 @@
   if (op->recv_ops) {
     op->on_done_recv(op->recv_user_data, 0);
   }
+  if (op->on_consumed) {
+    op->on_consumed(op->on_consumed_user_data, 0);
+  }
 }
 
 void grpc_transport_op_add_cancellation(grpc_transport_op *op,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 521d74c..7f60fdc 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -64,6 +64,9 @@
 
 /* Transport op: a set of operations to perform on a transport */
 typedef struct grpc_transport_op {
+  void (*on_consumed)(void *user_data, int success);
+  void *on_consumed_user_data;
+
   grpc_stream_op_buffer *send_ops;
   int is_last_send;
   void (*on_done_send)(void *user_data, int success);