Adding inbound flow control to Netty client and server transports.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=74444530
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
index b5c24e2..c08fa81 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
@@ -8,41 +8,94 @@
import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.Executor;
/**
* Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
* {@link Decompressor}.
*/
public class GrpcDeframer implements Closeable {
+
private enum State {
HEADER, BODY
}
private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
private final Decompressor decompressor;
+ private final Executor executor;
+ private final Runnable deliveryTask;
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private int frameType;
private boolean statusNotified;
- private GrpcMessageListener listener;
+ private boolean endOfStream;
+ private boolean deliveryOutstanding;
+ private StreamListener listener;
private CompositeBuffer nextFrame;
- public GrpcDeframer(Decompressor decompressor, GrpcMessageListener listener) {
+ /**
+ * Constructs the deframer.
+ *
+ * @param decompressor the object used for de-framing GRPC compression frames.
+ * @param listener the listener for fully read GRPC messages.
+ * @param executor the executor to be used for delivery. All calls to
+ * {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
+ * executor must not allow concurrent access to this class, so it must be either a single
+ * thread or have sequential processing of events.
+ */
+ public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.listener = Preconditions.checkNotNull(listener, "listener");
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+ deliveryTask = new Runnable() {
+ @Override
+ public void run() {
+ deliveryOutstanding = false;
+ deliver();
+ }
+ };
}
+ /**
+ * Adds the given data to this deframer and attempts delivery to the listener.
+ */
public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
// Add the data to the decompression buffer.
decompressor.decompress(data);
+ // Indicate that all of the data for this stream has been received.
+ this.endOfStream = endOfStream;
+
+ // Deliver the next message if not already delivering.
+ deliver();
+ }
+
+ @Override
+ public void close() {
+ decompressor.close();
+ if (nextFrame != null) {
+ nextFrame.close();
+ }
+ }
+
+ /**
+ * If there is no outstanding delivery, attempts to read and deliver as many messages to the
+ * listener as possible. Only one outstanding delivery is allowed at a time.
+ */
+ private void deliver() {
+ if (deliveryOutstanding) {
+ // Only allow one outstanding delivery at a time.
+ return;
+ }
+
// Process the uncompressed bytes.
while (readRequiredBytes()) {
if (statusNotified) {
@@ -54,7 +107,18 @@
processHeader();
break;
case BODY:
- processBody();
+ // Read the body and deliver the message to the listener.
+ deliveryOutstanding = true;
+ ListenableFuture<Void> processingFuture = processBody();
+ if (processingFuture != null) {
+ // A listener was returned for the completion of processing the delivered
+ // message. Once it's done, try to deliver the next message.
+ processingFuture.addListener(deliveryTask, executor);
+ return;
+ }
+
+ // No future was returned, so assume processing is complete for the delivery.
+ deliveryOutstanding = false;
break;
default:
throw new AssertionError("Invalid state: " + state);
@@ -68,15 +132,6 @@
}
}
-
- @Override
- public void close() {
- decompressor.close();
- if (nextFrame != null) {
- nextFrame.close();
- }
- }
-
/**
* Attempts to read the required bytes into nextFrame.
*
@@ -122,13 +177,14 @@
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
- private void processBody() {
+ private ListenableFuture<Void> processBody() {
+ ListenableFuture<Void> future = null;
switch (frameType) {
case CONTEXT_VALUE_FRAME:
- processContext();
+ future = processContext();
break;
case PAYLOAD_FRAME:
- processMessage();
+ future = processMessage();
break;
case STATUS_FRAME:
processStatus();
@@ -140,12 +196,13 @@
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
+ return future;
}
/**
* Processes the payload of a context frame.
*/
- private void processContext() {
+ private ListenableFuture<Void> processContext() {
Transport.ContextValue ctx;
try {
// Not clear if using proto encoding here is of any benefit.
@@ -162,16 +219,16 @@
// Call the handler.
Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
- listener.onContext(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
+ return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
ctxBuffer.readableBytes());
}
/**
* Processes the payload of a message frame.
*/
- private void processMessage() {
+ private ListenableFuture<Void> processMessage() {
try {
- listener.onPayload(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
+ return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
} finally {
// Don't close the frame, since the listener is now responsible for the life-cycle.
nextFrame = null;
@@ -198,6 +255,6 @@
*/
private void notifyStatus(Status status) {
statusNotified = true;
- listener.onStatus(status);
+ listener.closed(status);
}
}