Transport/call flow control interface

Allow call objects to advertise how many bytes they are currently
willing to receive.

Update the transport to utilize this data to update flow control
windows.
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe..e47dc4f 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@
 }
 
 void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+  q->bytes += grpc_byte_buffer_length(buffer);
   bba_push(&q->filling, buffer);
 }
 
@@ -72,8 +73,11 @@
   }
 }
 
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
 grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
   grpc_bbq_array temp_array;
+  grpc_byte_buffer *out;
 
   if (q->drain_pos == q->draining.count) {
     if (q->filling.count == 0) {
@@ -87,5 +91,7 @@
     q->draining = temp_array;
   }
 
-  return q->draining.data[q->drain_pos++];
+  out = q->draining.data[q->drain_pos++];
+  q->bytes -= grpc_byte_buffer_length(out);
+  return out;
 }
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8..f019589 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@
   size_t drain_pos;
   grpc_bbq_array filling;
   grpc_bbq_array draining;
+  size_t bytes;
 } grpc_byte_buffer_queue;
 
 void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@
 void grpc_bbq_flush(grpc_byte_buffer_queue *q);
 int grpc_bbq_empty(grpc_byte_buffer_queue *q);
 void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
 
 #endif  /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2d651be..eea0221 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -444,6 +444,8 @@
   int completing_requests = 0;
   int start_op = 0;
   int i;
+  const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+  size_t buffered_bytes;
 
   memset(&op, 0, sizeof(op));
 
@@ -456,6 +458,17 @@
     op.recv_state = &call->recv_state;
     op.on_done_recv = call_on_done_recv;
     op.recv_user_data = call;
+    if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+      op.max_recv_bytes = call->incoming_message_length -
+                          call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+    } else {
+      buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+      if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+        op.max_recv_bytes = 0;
+      } else {
+        op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+      }
+    }
     call->receiving = 1;
     GRPC_CALL_INTERNAL_REF(call, "receiving");
     start_op = 1;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index d9c712c..c85eb96 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -314,6 +314,18 @@
 struct stream {
   gpr_uint32 id;
 
+  /** The number of bytes the upper layers have offered to receive.
+      As the upper layer offers more bytes, this value increases.
+      As bytes are read, this value decreases. */
+  gpr_uint32 max_recv_bytes;
+  /** The number of bytes the upper layer has offered to read but we have
+      not yet announced to HTTP2 flow control.
+      As the upper layers offer to read more bytes, this value increases.
+      As we advertise incoming flow control window, this value decreases. */
+  gpr_uint32 unannounced_incoming_window;
+  /** The number of bytes of HTTP2 flow control we have advertised.
+      As we advertise incoming flow control window, this value increases.
+      As bytes are read, this value decreases. */
   gpr_uint32 incoming_window;
   gpr_int64 outgoing_window;
   /* when the application requests writes be closed, the write_closed is
@@ -659,7 +671,7 @@
     s->id = (gpr_uint32)(gpr_uintptr)server_data;
     s->outgoing_window =
         t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
-    s->incoming_window =
+    s->max_recv_bytes = s->incoming_window =
         t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     t->incoming_stream = s;
     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@@ -970,14 +982,13 @@
 
   /* for each stream that wants to update its window, add that window here */
   while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
-    window_delta =
-        t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
-        s->incoming_window;
-    if (!s->read_closed && window_delta) {
-      gpr_slice_buffer_add(
-          &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
-      FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
-      s->incoming_window += window_delta;
+    if (!s->read_closed && s->unannounced_incoming_window > 0) {
+      gpr_slice_buffer_add(&t->outbuf,
+                           grpc_chttp2_window_update_create(
+                               s->id, s->unannounced_incoming_window));
+      FLOWCTL_TRACE(t, s, incoming, s->id, s->unannounced_incoming_window);
+      s->incoming_window += s->unannounced_incoming_window;
+      s->unannounced_incoming_window = 0;
     }
   }
 
@@ -1101,8 +1112,10 @@
         t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     s->incoming_window =
         t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+    s->max_recv_bytes = GPR_MAX(s->incoming_window, s->max_recv_bytes);
     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
     stream_list_join(t, s, WRITABLE);
+    maybe_join_window_updates(t, s);
   }
   /* cancel out streams that will never be started */
   while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
@@ -1153,6 +1166,10 @@
     s->incoming_sopb = op->recv_ops;
     s->incoming_sopb->nops = 0;
     s->publish_state = op->recv_state;
+    if (s->max_recv_bytes < op->max_recv_bytes) {
+      s->unannounced_incoming_window += op->max_recv_bytes - s->max_recv_bytes;
+      s->max_recv_bytes = op->max_recv_bytes;
+    }
     gpr_free(s->old_incoming_metadata);
     s->old_incoming_metadata = NULL;
     maybe_finish_read(t, s);
@@ -1337,10 +1354,10 @@
 
 static void maybe_join_window_updates(transport *t, stream *s) {
   if (s->incoming_sopb != NULL &&
-      s->incoming_window <
+      s->unannounced_incoming_window >
           t->settings[LOCAL_SETTINGS]
-                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
-              3 / 4) {
+                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
+              4) {
     stream_list_join(t, s, WINDOW_UPDATE);
   }
 }
@@ -1362,6 +1379,8 @@
   FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
   t->incoming_window -= t->incoming_frame_size;
   s->incoming_window -= t->incoming_frame_size;
+  GPR_ASSERT(s->max_recv_bytes > t->incoming_frame_size);
+  s->max_recv_bytes -= t->incoming_frame_size;
 
   /* if the stream incoming window is getting low, schedule an update */
   maybe_join_window_updates(t, s);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7f60fdc..98fcbe7 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -74,6 +74,12 @@
 
   grpc_stream_op_buffer *recv_ops;
   grpc_stream_state *recv_state;
+  /** The number of bytes this peer is currently prepared to receive.
+
+      Bytes offered are used to replenish per-stream flow control windows.
+      Offers are not retractable: if 5 bytes are offered and no bytes are read,
+        a later offer of 3 bytes still implies that 5 have been offered. */
+  gpr_uint32 max_recv_bytes;
   void (*on_done_recv)(void *user_data, int success);
   void *recv_user_data;
 
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 7bbe827..11d5a9d 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -129,7 +129,8 @@
   if (op->recv_ops) {
     if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
     first = 0;
-    gpr_strvec_add(&b, gpr_strdup("RECV"));
+    gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+    gpr_strvec_add(&b, tmp);
   }
 
   if (op->bind_pollset) {