blob: dd7d80fa1b8043dbab86ea95bd70bde7657db3d3 [file] [log] [blame]
package com.google.net.stubby.newtransport;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
* reconstructs their messages and hands them off to a receiving {@link Operation}
*/
public abstract class Deframer<F> implements Framer.Sink<F> {
/**
* Unset frame length
*/
private static final int LENGTH_NOT_SET = -1;
private final GrpcDeframer.Sink target;
private boolean inFrame;
private byte currentFlags;
private int currentLength = LENGTH_NOT_SET;
private boolean statusDelivered;
public Deframer(GrpcDeframer.Sink target) {
this.target = target;
}
@Override
public void deliverFrame(F frame, boolean endOfStream) {
int remaining = internalDeliverFrame(frame);
if (endOfStream) {
if (remaining > 0) {
writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
} else if (!statusDelivered) {
writeStatus(Status.OK);
}
}
}
/**
* Consume a frame of bytes provided by the transport. Note that transport framing is not
* aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
* across transport frame boundaries.
*
* @return the number of unconsumed bytes remaining in the buffer
*/
private int internalDeliverFrame(F frame) {
try {
frame = decompress(frame);
DataInputStream grpcStream = prefix(frame);
// Loop until no more GRPC frames can be fully decoded
while (true) {
if (!inFrame) {
// Not in frame so attempt to read flags
if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
return consolidate();
}
currentFlags = grpcStream.readByte();
inFrame = true;
}
if (currentLength == LENGTH_NOT_SET) {
// Read the frame length
if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
return consolidate();
}
currentLength = grpcStream.readInt();
}
// Ensure that the entire frame length is available to read
InputStream framedChunk = ensureMessage(grpcStream, currentLength);
if (framedChunk == null) {
// Insufficient bytes available
return consolidate();
}
if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
// Advance stream now, because target.addPayload() may not or may process the frame on
// another thread.
framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
try {
// Report payload to the receiving operation
target.messageRead(framedChunk, currentLength);
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
}
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
int status = framedChunk.read() << 8 | framedChunk.read();
Transport.Code code = Transport.Code.valueOf(status);
// TODO(user): Resolve what to do with remainder of framedChunk
try {
if (code == null) {
// Log for unknown code
writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
} else {
writeStatus(new Status(code));
}
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
}
}
if (grpcStream.available() == 0) {
// We've processed all the data so consolidate the underlying buffers
return consolidate();
}
}
} catch (IOException ioe) {
Status status = new Status(Transport.Code.UNKNOWN, ioe);
writeStatus(status);
throw status.asRuntimeException();
}
}
/**
* Return a stream view over the current buffer prefixed to the input frame
*/
protected abstract DataInputStream prefix(F frame) throws IOException;
/**
* Consolidate the underlying buffers and return the number of buffered bytes remaining
*/
protected abstract int consolidate() throws IOException;
/**
* Decompress the raw frame buffer prior to prefixing it.
*/
protected abstract F decompress(F frame) throws IOException;
/**
* Ensure that {@code len} bytes are available in the buffer and frame
*/
private boolean ensure(InputStream input, int len) throws IOException {
return (input.available() >= len);
}
private void writeStatus(Status status) {
target.statusRead(status);
target.endOfStream();
statusDelivered = true;
}
/**
* Return a message of {@code len} bytes than can be read from the buffer. If sufficient
* bytes are unavailable then buffer the available bytes and return null.
*/
private InputStream ensureMessage(InputStream input, int len)
throws IOException {
if (input.available() < len) {
return null;
}
return ByteStreams.limit(input, len);
}
}