blob: ffe2c54207f12e08a6fb8251f9480a27b357fe85 [file] [log] [blame]
package com.google.net.stubby.newtransport;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executor;
import java.util.zip.GZIPInputStream;
import javax.annotation.concurrent.NotThreadSafe;
/**
* Deframer for GRPC frames.
*
* <p>This class is not thread-safe. All calls to this class must be made in the context of the
* executor provided during creation. That executor must not allow concurrent execution of tasks.
*/
@NotThreadSafe
public class MessageDeframer2 implements Closeable {
private static final int HEADER_LENGTH = 5;
private static final int COMPRESSED_FLAG_MASK = 1;
private static final int RESERVED_MASK = 0xFE;
public enum Compression {
NONE, GZIP;
}
public interface Sink {
public ListenableFuture<Void> messageRead(InputStream is, int length);
public void endOfStream();
}
private enum State {
HEADER, BODY
}
private final Sink sink;
private final Executor executor;
private final Compression compression;
private final Runnable deliveryTask = new Runnable() {
@Override
public void run() {
deliveryOutstanding = false;
deliver();
}
};
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag;
private boolean endOfStream;
private boolean deliveryOutstanding;
private CompositeBuffer nextFrame;
private CompositeBuffer unprocessed = new CompositeBuffer();
/**
* Create a deframer. All calls to this class must be made in the context of the provided
* executor, which also must not allow concurrent processing of Runnables. Compression will not
* be supported.
*
* @param sink callback for fully read GRPC messages
* @param executor used for internal event processing
*/
public MessageDeframer2(Sink sink, Executor executor) {
this(sink, executor, Compression.NONE);
}
/**
* Create a deframer. All calls to this class must be made in the context of the provided
* executor, which also must not allow concurrent processing of Runnables.
*
* @param sink callback for fully read GRPC messages
* @param executor used for internal event processing
* @param compression the compression used if a compressed frame is encountered, with NONE meaning
* unsupported
*/
public MessageDeframer2(Sink sink, Executor executor, Compression compression) {
this.sink = Preconditions.checkNotNull(sink, "sink");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.compression = Preconditions.checkNotNull(compression, "compression");
}
/**
* Adds the given data to this deframer and attempts delivery to the sink.
*/
public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
Preconditions.checkState(this.endOfStream == false, "Past end of stream");
unprocessed.addBuffer(data);
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
// Deliver the next message if not already delivering.
deliver();
}
@Override
public void close() {
unprocessed.close();
if (nextFrame != null) {
nextFrame.close();
}
}
public void delayProcessing(ListenableFuture<Void> future) {
Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently");
if (future != null) {
deliveryOutstanding = true;
// Once future completes, try to deliver the next message.
future.addListener(deliveryTask, executor);
}
}
/**
* If there is no outstanding delivery, attempts to read and deliver as many messages to the
* sink as possible. Only one outstanding delivery is allowed at a time.
*/
private void deliver() {
if (deliveryOutstanding) {
// Only allow one outstanding delivery at a time.
return;
}
// Process the uncompressed bytes.
while (readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
// Read the body and deliver the message to the sink.
deliveryOutstanding = true;
ListenableFuture<Void> processingFuture = processBody();
if (processingFuture != null) {
// A future was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
processingFuture.addListener(deliveryTask, executor);
return;
}
// No future was returned, so assume processing is complete for the delivery.
deliveryOutstanding = false;
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
if (endOfStream) {
if (nextFrame.readableBytes() != 0) {
// TODO(user): Investigate how this should be propagated, so that stream is aborted and
// application is properly notified of abortion.
throw new RuntimeException("Encountered end-of-stream mid-frame");
}
sink.endOfStream();
}
}
/**
* Attempts to read the required bytes into nextFrame.
*
* @returns {@code true} if all of the required bytes have been read.
*/
private boolean readRequiredBytes() {
if (nextFrame == null) {
nextFrame = new CompositeBuffer();
}
// Read until the buffer contains all the required bytes.
int missingBytes;
while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
if (unprocessed.readableBytes() == 0) {
// No more data is available.
return false;
}
int toRead = Math.min(missingBytes, unprocessed.readableBytes());
nextFrame.addBuffer(unprocessed.readBytes(toRead));
}
return true;
}
/**
* Processes the GRPC compression header which is composed of the compression flag and the outer
* frame length.
*/
private void processHeader() {
int type = nextFrame.readUnsignedByte();
if ((type & RESERVED_MASK) != 0) {
throw new RuntimeException("Frame header malformed: reserved bits not zero");
}
compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
// Update the required length to include the length of the frame.
requiredLength = nextFrame.readInt();
// Continue reading the frame body.
state = State.BODY;
}
/**
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
private ListenableFuture<Void> processBody() {
ListenableFuture<Void> future;
if (compressedFlag) {
if (compression == Compression.NONE) {
throw new IllegalStateException("Can't decode compressed frame with NONE compression");
} else if (compression == Compression.GZIP) {
// Fully drain frame.
byte[] bytes;
try {
bytes = ByteStreams.toByteArray(
new GZIPInputStream(Buffers.openStream(nextFrame, false)));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
future = sink.messageRead(new ByteArrayInputStream(bytes), bytes.length);
} else {
throw new AssertionError("Unknown compression type");
}
} else {
// Don't close the frame, since the sink is now responsible for the life-cycle.
future = sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
nextFrame = null;
}
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
return future;
}
}