blob: 8959408007c0001829932a7b15f7405639334c51 [file] [log] [blame]
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
/**
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream<IdT> implements Stream {
/**
* Indicates the phase of the GRPC stream in one direction.
*/
protected enum Phase {
HEADERS, MESSAGE, STATUS
}
private volatile IdT id;
private final MessageFramer framer;
final MessageDeframer deframer;
/**
* Inbound phase is exclusively written to by the transport thread.
*/
private Phase inboundPhase = Phase.HEADERS;
/**
* Outbound phase is exclusively written to by the application thread.
*/
private Phase outboundPhase = Phase.HEADERS;
AbstractStream() {
MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() {
@Override
public void bytesRead(int numBytes) {
returnProcessedBytes(numBytes);
}
@Override
public void messageRead(InputStream input, final int length) {
receiveMessage(input, length);
}
@Override
public void deliveryStalled() {
inboundDeliveryPaused();
}
@Override
public void endOfStream() {
remoteEndClosed();
}
};
MessageFramer.Sink<ByteBuffer> outboundFrameHandler = new MessageFramer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
internalSendFrame(frame, endOfStream);
}
};
framer = new MessageFramer(outboundFrameHandler, 4096);
this.deframer = new MessageDeframer(inboundMessageHandler);
}
/**
* Returns the internal id for this stream. Note that Id can be {@code null} for client streams
* as the transport may defer creating the stream to the remote side until is has payload or
* metadata to send.
*/
@Nullable
public IdT id() {
return id;
}
/**
* Set the internal id for this stream
*/
public void id(IdT id) {
Preconditions.checkState(id != null, "Can only set id once");
this.id = id;
}
@Override
public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
// TODO(nathanmittler): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void flush() {
if (!framer.isClosed()) {
framer.flush();
}
}
/**
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*/
final void closeFramer() {
if (!framer.isClosed()) {
framer.close();
}
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
* <p>
* NOTE. Can be called by both the transport thread and the application thread. Transport
* threads need to dispose when the remote side has terminated the stream. Application threads
* will dispose when the application decides to close the stream as part of normal processing.
* </p>
*/
public void dispose() {
framer.dispose();
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
/** A message was deframed. */
protected abstract void receiveMessage(InputStream is, int length);
/** Deframer has no pending deliveries. */
protected abstract void inboundDeliveryPaused();
/** Deframer reached end of stream. */
protected abstract void remoteEndClosed();
/**
* Returns the given number of processed bytes back to inbound flow control to enable receipt of
* more data.
*/
protected abstract void returnProcessedBytes(int processedBytes);
/**
* Called when a {@link #deframe(Buffer, boolean)} operation failed.
*/
protected abstract void deframeFailed(Throwable cause);
/**
* Called to parse a received frame and attempt delivery of any completed
* messages. Must be called from the transport thread.
*/
protected final void deframe(Buffer frame, boolean endOfStream) {
try {
deframer.deframe(frame, endOfStream);
} catch (Throwable t) {
deframeFailed(t);
}
}
/**
* Called to request the given number of messages from the deframer. Must be called
* from the transport thread.
*/
protected final void requestMessagesFromDeframer(int numMessages) {
try {
deframer.request(numMessages);
} catch (Throwable t) {
deframeFailed(t);
}
}
final Phase inboundPhase() {
return inboundPhase;
}
/**
* Transitions the inbound phase to the given phase and returns the previous phase.
* If the transition is disallowed, throws an {@link IllegalStateException}.
*/
final Phase inboundPhase(Phase nextPhase) {
Phase tmp = inboundPhase;
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
return tmp;
}
final Phase outboundPhase() {
return outboundPhase;
}
/**
* Transitions the outbound phase to the given phase and returns the previous phase.
* If the transition is disallowed, throws an {@link IllegalStateException}.
*/
final Phase outboundPhase(Phase nextPhase) {
Phase tmp = outboundPhase;
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
return tmp;
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal()) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* Can the stream receive data from its remote peer.
*/
public boolean canReceive() {
return inboundPhase() != Phase.STATUS;
}
/**
* Can the stream send data to its remote peer.
*/
public boolean canSend() {
return outboundPhase() != Phase.STATUS;
}
/**
* Is the stream fully closed. Note that this method is not thread-safe as inboundPhase and
* outboundPhase are mutated in different threads. Tests must account for thread coordination
* when calling.
*/
@VisibleForTesting
public boolean isClosed() {
return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS;
}
@Override
public String toString() {
return toStringHelper().toString();
}
protected MoreObjects.ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this)
.add("id", id())
.add("inboundPhase", inboundPhase().name())
.add("outboundPhase", outboundPhase().name());
}
}