blob: d4c3bc4253d64f37d90e74ca41aaf9b7b4d79eec [file] [log] [blame]
package com.google.net.stubby.newtransport;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
/**
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream implements Stream {
/**
* Global to enable gRPC v2 protocol support, which may be incomplete. This is a complete hack
* and should please, please, please be temporary to ease migration.
*/
// TODO(user): remove this once v1 support is dropped.
public static boolean GRPC_V2_PROTOCOL = false;
/**
* Indicates the phase of the GRPC stream in one direction.
*/
protected enum Phase {
HEADERS, MESSAGE, STATUS
}
private final Object writeLock = new Object();
private final Framer framer;
protected Phase inboundPhase = Phase.HEADERS;
protected Phase outboundPhase = Phase.HEADERS;
/**
* Handler for Framer output.
*/
private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
internalSendFrame(frame, endOfStream);
}
};
/**
* Internal handler for deframer output. Informs stream of inbound messages.
*/
private final GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() {
@Override
public ListenableFuture<Void> messageRead(InputStream input, int length) {
ListenableFuture<Void> future = null;
try {
future = receiveMessage(input, length);
disableWindowUpdate(future);
return future;
} finally {
closeWhenDone(future, input);
}
}
@Override
public void statusRead(Status status) {
receiveStatus(status);
}
@Override
public void endOfStream() {
remoteEndClosed();
}
};
protected AbstractStream() {
if (!GRPC_V2_PROTOCOL) {
framer = new MessageFramer(outboundFrameHandler, 4096);
} else {
framer = new MessageFramer2(outboundFrameHandler, 4096);
}
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
*/
public void dispose() {
synchronized (writeLock) {
framer.dispose();
}
}
@Override
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void flush() {
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.flush();
}
}
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
/** A message was deframed. */
protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
/** A status was deframed. */
protected abstract void receiveStatus(Status status);
/** Deframer reached end of stream. */
protected abstract void remoteEndClosed();
/**
* If the given future is non-{@code null}, temporarily disables window updates for inbound flow
* control for this stream until the future completes. If the given future is {@code null}, does
* nothing.
*/
protected abstract void disableWindowUpdate(@Nullable ListenableFuture<Void> processingFuture);
/**
* Gets the internal handler for inbound messages. Subclasses must use this as the target for a
* {@link com.google.net.stubby.newtransport.Deframer}.
*/
protected GrpcDeframer.Sink inboundMessageHandler() {
return inboundMessageHandler;
}
/**
* Transitions the inbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void inboundPhase(Phase nextPhase) {
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
}
/**
* Transitions the outbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void outboundPhase(Phase nextPhase) {
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
}
/**
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*
* @param status if not null, will write the status to the framer before closing it
*/
protected final void closeFramer(@Nullable Status status) {
synchronized (writeLock) {
if (!framer.isClosed()) {
if (status != null) {
framer.writeStatus(status);
}
framer.close();
}
}
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
* the {@link InputStream} is closed immediately.
*/
private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
final InputStream input) {
if (future == null) {
Closeables.closeQuietly(input);
return;
}
// Close the buffer when the future completes.
future.addListener(new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(input);
}
}, MoreExecutors.directExecutor());
}
}