blob: 4ed5eee267d4b170f190b34fe14801e55773065c [file] [log] [blame]
package com.google.net.stubby.newtransport;
import static com.google.net.stubby.newtransport.StreamState.CLOSED;
import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.net.stubby.Status;
import java.io.InputStream;
/**
* The abstract base class for {@link ClientStream} implementations.
*/
public abstract class AbstractClientStream extends AbstractStream implements ClientStream {
private final StreamListener listener;
private Status status;
private final Object stateLock = new Object();
private volatile StreamState state = StreamState.OPEN;
protected AbstractClientStream(StreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
}
@Override
protected final StreamListener listener() {
return listener;
}
@Override
protected final GrpcMessageListener inboundMessageHandler() {
// Wraps the base handler to get status update.
final GrpcMessageListener delegate = super.inboundMessageHandler();
return new GrpcMessageListener() {
@Override
public void onContext(String name, InputStream value, int length) {
delegate.onContext(name, value, length);
}
@Override
public void onPayload(InputStream input, int length) {
delegate.onPayload(input, length);
}
@Override
public void onStatus(Status status) {
delegate.onStatus(status);
setStatus(status);
}
};
}
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* This method must be called from the transport thread.
*
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
public boolean setStatus(final Status newStatus) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
// Disallow override of current status.
return false;
}
status = newStatus;
state = CLOSED;
}
// Invoke the observer callback.
listener.closed(newStatus);
// Free any resources.
dispose();
return true;
}
@Override
public final void halfClose() {
outboundPhase(Phase.STATUS);
synchronized (stateLock) {
state = state == OPEN ? READ_ONLY : CLOSED;
}
closeFramer(null);
}
@Override
public StreamState state() {
return state;
}
}