blob: 121e063183109616c004ba110bda833f9b627394 [file] [log] [blame]
package com.google.net.stubby.newtransport;
import static com.google.net.stubby.newtransport.StreamState.CLOSED;
import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
/**
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream implements Stream {
/**
* Indicates the phase of the GRPC stream in one direction.
*/
protected enum Phase {
CONTEXT, MESSAGE, STATUS
}
private volatile StreamState state = StreamState.OPEN;
private Status status;
private final Object stateLock = new Object();
private final Object writeLock = new Object();
private final MessageFramer framer;
private final StreamListener listener;
protected Phase inboundPhase = Phase.CONTEXT;
protected Phase outboundPhase = Phase.CONTEXT;
/**
* Handler for Framer output.
*/
private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
}
};
/**
* Handler for Deframer output.
*/
private final Framer inboundMessageHandler = new Framer() {
@Override
public void writeContext(String name, InputStream value, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.CONTEXT);
future = listener.contextRead(name, value, length);
} finally {
closeWhenDone(future, value);
}
}
@Override
public void writePayload(InputStream input, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.MESSAGE);
future = listener.messageRead(input, length);
} finally {
closeWhenDone(future, input);
}
}
@Override
public void writeStatus(Status status) {
inboundPhase(Phase.STATUS);
setStatus(status);
}
@Override
public void flush() {}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {}
@Override
public void dispose() {}
};
protected AbstractStream(StreamListener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
framer = new MessageFramer(outboundFrameHandler, 4096);
// No compression at the moment.
framer.setAllowCompression(false);
}
@Override
public StreamState state() {
return state;
}
@Override
public final void close() {
outboundPhase(Phase.STATUS);
synchronized (stateLock) {
state = state == OPEN ? READ_ONLY : CLOSED;
}
synchronized (writeLock) {
framer.close();
}
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
*/
public void dispose() {
synchronized (writeLock) {
framer.dispose();
}
}
@Override
public final void writeContext(String name, InputStream value, int length,
@Nullable Runnable accepted) {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(value, "value");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.CONTEXT);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writeContext(name, value, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void flush() {
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.flush();
}
}
}
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* This method must be called from the transport thread.
*
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
public boolean setStatus(final Status newStatus) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
// Disallow override of current status.
return false;
}
status = newStatus;
state = CLOSED;
}
// Invoke the observer callback.
listener.closed(newStatus);
// Free any resources.
dispose();
return true;
}
/**
* Sends an outbound frame to the server.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Gets the handler for inbound messages. Subclasses must use this as the target for a
* {@link com.google.net.stubby.newtransport.Deframer}.
*/
protected final Framer inboundMessageHandler() {
return inboundMessageHandler;
}
/**
* Transitions the inbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void inboundPhase(Phase nextPhase) {
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
}
/**
* Transitions the outbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void outboundPhase(Phase nextPhase) {
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
* the {@link InputStream} is closed immediately.
*/
private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
final InputStream input) {
if (future == null) {
Closeables.closeQuietly(input);
return;
}
// Close the buffer when the future completes.
future.addListener(new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(input);
}
}, MoreExecutors.sameThreadExecutor());
}
}