| package com.google.net.stubby.newtransport; |
| |
| import static com.google.net.stubby.newtransport.StreamState.CLOSED; |
| import static com.google.net.stubby.newtransport.StreamState.OPEN; |
| import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.net.stubby.Status; |
| import com.google.net.stubby.transport.Transport; |
| |
| import java.io.InputStream; |
| |
| /** |
| * Abstract base class for {@link ServerStream} implementations. |
| */ |
| public abstract class AbstractServerStream extends AbstractStream implements ServerStream { |
| |
| private StreamListener listener; |
| |
| private final Object stateLock = new Object(); |
| private volatile StreamState state = StreamState.OPEN; |
| |
| @Override |
| protected final StreamListener listener() { |
| return listener; |
| } |
| |
| public final void setListener(StreamListener listener) { |
| this.listener = Preconditions.checkNotNull(listener, "listener"); |
| } |
| |
| @Override |
| protected final GrpcMessageListener inboundMessageHandler() { |
| // Wraps the base handler to get status update. |
| final GrpcMessageListener delegate = super.inboundMessageHandler(); |
| return new GrpcMessageListener() { |
| @Override |
| public void onContext(String name, InputStream value, int length) { |
| delegate.onContext(name, value, length); |
| } |
| |
| @Override |
| public void onPayload(InputStream input, int length) { |
| delegate.onPayload(input, length); |
| } |
| |
| @Override |
| public void onStatus(Status status) { |
| delegate.onStatus(status); |
| listener.closed(status); |
| } |
| }; |
| } |
| |
| @Override |
| public final void close(Status status) { |
| synchronized (stateLock) { |
| Preconditions.checkState(!status.isOk() || state == WRITE_ONLY, |
| "Cannot close with OK before client half-closes"); |
| state = CLOSED; |
| } |
| outboundPhase(Phase.STATUS); |
| closeFramer(status); |
| dispose(); |
| } |
| |
| @Override |
| public StreamState state() { |
| return state; |
| } |
| |
| /** |
| * Called when the remote end half-closes the stream. |
| */ |
| public final void remoteEndClosed() { |
| StreamState previousState; |
| synchronized (stateLock) { |
| previousState = state; |
| if (previousState == OPEN) { |
| state = WRITE_ONLY; |
| } |
| } |
| if (previousState == OPEN) { |
| inboundPhase(Phase.STATUS); |
| listener.closed(Status.OK); |
| } else { |
| abortStream( |
| new Status(Transport.Code.FAILED_PRECONDITION, "Client-end of the stream already closed"), |
| true); |
| } |
| } |
| |
| /** |
| * Aborts the stream with an error status, cleans up resources and notifies the listener if |
| * necessary. |
| * |
| * <p>Unlike {@link #close(Status)}, this method is only called from the gRPC framework, so that |
| * we need to call closed() on the listener if it has not been called. |
| * |
| * @param status the error status. Must not be Status.OK. |
| * @param notifyClient true if the stream is still writable and you want to notify the client |
| * about stream closure and send the status |
| */ |
| public final void abortStream(Status status, boolean notifyClient) { |
| Preconditions.checkArgument(!status.isOk(), "status must not be OK"); |
| StreamState previousState; |
| synchronized (stateLock) { |
| previousState = state; |
| if (state == CLOSED) { |
| return; |
| } |
| state = CLOSED; |
| } |
| |
| if (previousState == OPEN) { |
| listener.closed(status); |
| } // Otherwise, previousState is WRITE_ONLY thus closed() has already been called. |
| |
| outboundPhase(Phase.STATUS); |
| if (notifyClient) { |
| closeFramer(status); |
| } |
| |
| dispose(); |
| } |
| } |