Adding inbound flow control to Netty client and server transports.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=74444530
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
index b5c24e2..c08fa81 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
@@ -8,41 +8,94 @@
 import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.Executor;
 
 /**
  * Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
  * {@link Decompressor}.
  */
 public class GrpcDeframer implements Closeable {
+
   private enum State {
     HEADER, BODY
   }
 
   private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
   private final Decompressor decompressor;
+  private final Executor executor;
+  private final Runnable deliveryTask;
   private State state = State.HEADER;
   private int requiredLength = HEADER_LENGTH;
   private int frameType;
   private boolean statusNotified;
-  private GrpcMessageListener listener;
+  private boolean endOfStream;
+  private boolean deliveryOutstanding;
+  private StreamListener listener;
   private CompositeBuffer nextFrame;
 
-  public GrpcDeframer(Decompressor decompressor, GrpcMessageListener listener) {
+  /**
+   * Constructs the deframer.
+   *
+   * @param decompressor the object used for de-framing GRPC compression frames.
+   * @param listener the listener for fully read GRPC messages.
+   * @param executor the executor to be used for delivery. All calls to
+   *        {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
+   *        executor must not allow concurrent access to this class, so it must be either a single
+   *        thread or have sequential processing of events.
+   */
+  public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
     this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
     this.listener = Preconditions.checkNotNull(listener, "listener");
+    this.executor = Preconditions.checkNotNull(executor, "executor");
+    deliveryTask = new Runnable() {
+      @Override
+      public void run() {
+        deliveryOutstanding = false;
+        deliver();
+      }
+    };
   }
 
+  /**
+   * Adds the given data to this deframer and attempts delivery to the listener.
+   */
   public void deframe(Buffer data, boolean endOfStream) {
     Preconditions.checkNotNull(data, "data");
 
     // Add the data to the decompression buffer.
     decompressor.decompress(data);
 
+    // Indicate that all of the data for this stream has been received.
+    this.endOfStream = endOfStream;
+
+    // Deliver the next message if not already delivering.
+    deliver();
+  }
+
+  @Override
+  public void close() {
+    decompressor.close();
+    if (nextFrame != null) {
+      nextFrame.close();
+    }
+  }
+
+  /**
+   * If there is no outstanding delivery, attempts to read and deliver as many messages to the
+   * listener as possible. Only one outstanding delivery is allowed at a time.
+   */
+  private void deliver() {
+    if (deliveryOutstanding) {
+      // Only allow one outstanding delivery at a time.
+      return;
+    }
+
     // Process the uncompressed bytes.
     while (readRequiredBytes()) {
       if (statusNotified) {
@@ -54,7 +107,18 @@
           processHeader();
           break;
         case BODY:
-          processBody();
+          // Read the body and deliver the message to the listener.
+          deliveryOutstanding = true;
+          ListenableFuture<Void> processingFuture = processBody();
+          if (processingFuture != null) {
+            // A listener was returned for the completion of processing the delivered
+            // message. Once it's done, try to deliver the next message.
+            processingFuture.addListener(deliveryTask, executor);
+            return;
+          }
+
+          // No future was returned, so assume processing is complete for the delivery.
+          deliveryOutstanding = false;
           break;
         default:
           throw new AssertionError("Invalid state: " + state);
@@ -68,15 +132,6 @@
     }
   }
 
-
-  @Override
-  public void close() {
-    decompressor.close();
-    if (nextFrame != null) {
-      nextFrame.close();
-    }
-  }
-
   /**
    * Attempts to read the required bytes into nextFrame.
    *
@@ -122,13 +177,14 @@
    * Processes the body of the GRPC compression frame. A single compression frame may contain
    * several GRPC messages within it.
    */
-  private void processBody() {
+  private ListenableFuture<Void> processBody() {
+    ListenableFuture<Void> future = null;
     switch (frameType) {
       case CONTEXT_VALUE_FRAME:
-        processContext();
+        future = processContext();
         break;
       case PAYLOAD_FRAME:
-        processMessage();
+        future = processMessage();
         break;
       case STATUS_FRAME:
         processStatus();
@@ -140,12 +196,13 @@
     // Done with this frame, begin processing the next header.
     state = State.HEADER;
     requiredLength = HEADER_LENGTH;
+    return future;
   }
 
   /**
    * Processes the payload of a context frame.
    */
-  private void processContext() {
+  private ListenableFuture<Void> processContext() {
     Transport.ContextValue ctx;
     try {
       // Not clear if using proto encoding here is of any benefit.
@@ -162,16 +219,16 @@
 
     // Call the handler.
     Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
-    listener.onContext(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
+    return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
         ctxBuffer.readableBytes());
   }
 
   /**
    * Processes the payload of a message frame.
    */
-  private void processMessage() {
+  private ListenableFuture<Void> processMessage() {
     try {
-      listener.onPayload(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
+      return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
     } finally {
       // Don't close the frame, since the listener is now responsible for the life-cycle.
       nextFrame = null;
@@ -198,6 +255,6 @@
    */
   private void notifyStatus(Status status) {
     statusNotified = true;
-    listener.onStatus(status);
+    listener.closed(status);
   }
 }