blob: c08fa81d41bb9f230c5ff87fd46158b8e657e120 [file] [log] [blame]
package com.google.net.stubby.newtransport;
import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK;
import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
/**
* Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
* {@link Decompressor}.
*/
public class GrpcDeframer implements Closeable {
private enum State {
HEADER, BODY
}
private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
private final Decompressor decompressor;
private final Executor executor;
private final Runnable deliveryTask;
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private int frameType;
private boolean statusNotified;
private boolean endOfStream;
private boolean deliveryOutstanding;
private StreamListener listener;
private CompositeBuffer nextFrame;
/**
* Constructs the deframer.
*
* @param decompressor the object used for de-framing GRPC compression frames.
* @param listener the listener for fully read GRPC messages.
* @param executor the executor to be used for delivery. All calls to
* {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
* executor must not allow concurrent access to this class, so it must be either a single
* thread or have sequential processing of events.
*/
public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.listener = Preconditions.checkNotNull(listener, "listener");
this.executor = Preconditions.checkNotNull(executor, "executor");
deliveryTask = new Runnable() {
@Override
public void run() {
deliveryOutstanding = false;
deliver();
}
};
}
/**
* Adds the given data to this deframer and attempts delivery to the listener.
*/
public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
// Add the data to the decompression buffer.
decompressor.decompress(data);
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
// Deliver the next message if not already delivering.
deliver();
}
@Override
public void close() {
decompressor.close();
if (nextFrame != null) {
nextFrame.close();
}
}
/**
* If there is no outstanding delivery, attempts to read and deliver as many messages to the
* listener as possible. Only one outstanding delivery is allowed at a time.
*/
private void deliver() {
if (deliveryOutstanding) {
// Only allow one outstanding delivery at a time.
return;
}
// Process the uncompressed bytes.
while (readRequiredBytes()) {
if (statusNotified) {
throw new IllegalStateException("Inbound data after receiving status frame");
}
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
// Read the body and deliver the message to the listener.
deliveryOutstanding = true;
ListenableFuture<Void> processingFuture = processBody();
if (processingFuture != null) {
// A listener was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
processingFuture.addListener(deliveryTask, executor);
return;
}
// No future was returned, so assume processing is complete for the delivery.
deliveryOutstanding = false;
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
// If reached the end of stream without reading a status frame, fabricate one
// and deliver to the target.
if (!statusNotified && endOfStream) {
notifyStatus(Status.OK);
}
}
/**
* Attempts to read the required bytes into nextFrame.
*
* @returns {@code true} if all of the required bytes have been read.
*/
private boolean readRequiredBytes() {
if (nextFrame == null) {
nextFrame = new CompositeBuffer();
}
// Read until the buffer contains all the required bytes.
int missingBytes;
while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
Buffer buffer = decompressor.readBytes(missingBytes);
if (buffer == null) {
// No more data is available.
break;
}
// Add it to the composite buffer for the next frame.
nextFrame.addBuffer(buffer);
}
// Return whether or not all of the required bytes are now in the frame.
return nextFrame.readableBytes() == requiredLength;
}
/**
* Processes the GRPC compression header which is composed of the compression flag and the outer
* frame length.
*/
private void processHeader() {
// Peek, but do not read the header.
frameType = nextFrame.readUnsignedByte() & FRAME_TYPE_MASK;
// Update the required length to include the length of the frame.
requiredLength = nextFrame.readInt();
// Continue reading the frame body.
state = State.BODY;
}
/**
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
private ListenableFuture<Void> processBody() {
ListenableFuture<Void> future = null;
switch (frameType) {
case CONTEXT_VALUE_FRAME:
future = processContext();
break;
case PAYLOAD_FRAME:
future = processMessage();
break;
case STATUS_FRAME:
processStatus();
break;
default:
throw new AssertionError("Invalid frameType: " + frameType);
}
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
return future;
}
/**
* Processes the payload of a context frame.
*/
private ListenableFuture<Void> processContext() {
Transport.ContextValue ctx;
try {
// Not clear if using proto encoding here is of any benefit.
// Using ContextValue.parseFrom requires copying out of the framed chunk
// Writing a custom parser would have to do varint handling and potentially
// deal with out-of-order tags etc.
ctx = Transport.ContextValue.parseFrom(Buffers.openStream(nextFrame, false));
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
nextFrame.close();
nextFrame = null;
}
// Call the handler.
Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
ctxBuffer.readableBytes());
}
/**
* Processes the payload of a message frame.
*/
private ListenableFuture<Void> processMessage() {
try {
return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
} finally {
// Don't close the frame, since the listener is now responsible for the life-cycle.
nextFrame = null;
}
}
/**
* Processes the payload of a status frame.
*/
private void processStatus() {
try {
int statusCode = nextFrame.readUnsignedShort();
Transport.Code code = Transport.Code.valueOf(statusCode);
notifyStatus(code != null ? new Status(code)
: new Status(Transport.Code.UNKNOWN, "Unknown status code " + statusCode));
} finally {
nextFrame.close();
nextFrame = null;
}
}
/**
* Delivers the status notification to the listener.
*/
private void notifyStatus(Status status) {
statusNotified = true;
listener.closed(status);
}
}