blob: 3cf17ef71fe37e65b0ac540d6dbc6e45d090b98d [file] [log] [blame]
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* The abstract base class for {@link ClientStream} implementations.
*/
public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
implements ClientStream {
private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
private final ClientStreamListener listener;
private boolean listenerClosed;
// Stored status & trailers to report when deframer completes or
// transportReportStatus is directly called.
private Status status;
private Metadata.Trailers trailers;
private Runnable closeListenerTask;
protected AbstractClientStream(ClientStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
}
@Override
protected void receiveMessage(InputStream is, int length) {
if (!listenerClosed) {
listener.messageRead(is, length);
}
}
@Override
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
super.writeMessage(message, length, accepted);
}
/**
* The transport implementation has detected a protocol error on the stream. Transports are
* responsible for properly closing streams when protocol errors occur.
*
* @param errorStatus the error to report
*/
protected void inboundTransportError(Status errorStatus) {
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus});
return;
}
// For transport errors we immediately report status to the application layer
// and do not wait for additional payloads.
transportReportStatus(errorStatus, false, new Metadata.Trailers());
}
/**
* Called by transport implementations when they receive headers. When receiving headers
* a transport may determine that there is an error in the protocol at this phase which is
* why this method takes an error {@link Status}. If a transport reports an
* {@link com.google.net.stubby.Status.Code#INTERNAL} error
*
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata.Headers headers) {
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received headers on closed stream {0} {1}",
new Object[]{id(), headers});
}
inboundPhase(Phase.MESSAGE);
listener.headersRead(headers);
}
/**
* Process the contents of a received data frame from the server.
*/
protected void inboundDataReceived(Buffer frame) {
Preconditions.checkNotNull(frame, "frame");
if (inboundPhase() == Phase.STATUS) {
frame.close();
return;
}
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload"));
frame.close();
return;
}
inboundPhase(Phase.MESSAGE);
deframe(frame, false);
}
@Override
protected void inboundDeliveryPaused() {
runCloseListenerTask();
}
@Override
protected final void deframeFailed(Throwable cause) {
log.log(Level.WARNING, "Exception processing message", cause);
cancel();
}
/**
* Called by transport implementations when they receive trailers.
*/
protected void inboundTrailersReceived(Metadata.Trailers trailers, Status status) {
Preconditions.checkNotNull(trailers, "trailers");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {3}",
new Object[]{id(), status, trailers});
}
// Stash the status & trailers so they can be delivered by the deframer calls
// remoteEndClosed
this.status = status;
this.trailers = trailers;
deframe(Buffers.empty(), true);
}
@Override
protected void remoteEndClosed() {
transportReportStatus(status, true, trailers);
}
@Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Report stream closure with status to the application layer if not already reported. This method
* must be called from the transport thread.
*
* @param newStatus the new status to set
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
* may already be queued up in the deframer. If {@code false}, the listener will be
* notified immediately after all currently completed messages in the deframer have been
* delivered to the application.
*/
public void transportReportStatus(final Status newStatus, boolean stopDelivery,
final Metadata.Trailers trailers) {
Preconditions.checkNotNull(newStatus, "newStatus");
boolean closingLater = closeListenerTask != null && !stopDelivery;
if (listenerClosed || closingLater) {
// We already closed (or are about to close) the listener.
return;
}
inboundPhase(Phase.STATUS);
status = newStatus;
closeListenerTask = null;
// Determine if the deframer is stalled (i.e. currently has no complete messages to deliver).
boolean deliveryStalled = deframer.isStalled();
if (stopDelivery || deliveryStalled) {
// Close the listener immediately.
listenerClosed = true;
listener.closed(newStatus, trailers);
} else {
// Delay close until inboundDeliveryStalled()
closeListenerTask = newCloseListenerTask(newStatus, trailers);
}
}
/**
* Creates a new {@link Runnable} to close the listener with the given status/trailers.
*/
private Runnable newCloseListenerTask(final Status status, final Metadata.Trailers trailers) {
return new Runnable() {
@Override
public void run() {
if (!listenerClosed) {
// Status has not been reported to the application layer
listenerClosed = true;
listener.closed(status, trailers);
}
}
};
}
/**
* Executes the pending listener close task, if one exists.
*/
private void runCloseListenerTask() {
if (closeListenerTask != null) {
closeListenerTask.run();
closeListenerTask = null;
}
}
@Override
public final void halfClose() {
if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
closeFramer();
}
}
/**
* Cancel the stream. Called by the application layer, never called by the transport.
*/
@Override
public void cancel() {
outboundPhase(Phase.STATUS);
if (id() != null) {
// Only send a cancellation to remote side if we have actually been allocated
// a stream id and we are not already closed. i.e. the server side is aware of the stream.
sendCancel();
}
dispose();
}
/**
* Send a stream cancellation message to the remote server. Can be called by either the
* application or transport layers.
*/
protected abstract void sendCancel();
@Override
protected MoreObjects.ToStringHelper toStringHelper() {
MoreObjects.ToStringHelper toStringHelper = super.toStringHelper();
if (status != null) {
toStringHelper.add("status", status);
}
return toStringHelper;
}
@Override
public boolean isClosed() {
return super.isClosed() || listenerClosed;
}
}