blob: bd6849e1b2dbd5515b8d3b0688edb4ebe9715d0b [file] [log] [blame]
package com.google.net.stubby.stub;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.net.stubby.Call;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Utility functions for processing different call idioms. We have one-to-one correspondence
* between utilities in this class and the potential signatures in a generated stub class so
* that the runtime can vary behavior without requiring regeneration of the stub.
*/
public class Calls {
/**
* Execute a unary call and return a {@link ListenableFuture} to the response.
* @return a future for the single response message.
*/
public static <ReqT, RespT> ListenableFuture<RespT> unaryFutureCall(
Call<ReqT, RespT> call,
ReqT param) {
SettableFuture<RespT> responseFuture = SettableFuture.create();
asyncServerStreamingCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture));
return responseFuture;
}
/**
* Returns the result of calling {@link Future#get()} interruptably on a task known not to throw a
* checked exception.
*
* <p>If interrupted, the interrupt is restored before throwing a {@code RuntimeException}.
*
* @throws RuntimeException if {@code get} is interrupted
* @throws CancellationException if {@code get} throws a {@code CancellationException}
* @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an
* {@code Exception} as its cause
* @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code
* Error} as its cause
*/
private static <V> V getUnchecked(Future<V> future) {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
if (e.getCause() == null) {
// Strange...
throw new UncheckedExecutionException(e);
} else {
if (e.getCause() instanceof Error) {
throw new ExecutionError((Error) e.getCause());
} else {
throw new UncheckedExecutionException(e.getCause());
}
}
}
}
/**
* Execute a unary call and block on the response.
* @return the single response message.
*/
public static <ReqT, RespT> RespT blockingUnaryCall(Call<ReqT, RespT> call, ReqT param) {
try {
return getUnchecked(unaryFutureCall(call, param));
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
}
/**
* Execute a unary call with a response {@link StreamObserver}.
*/
public static <ReqT, RespT> void asyncUnaryCall(
Call<ReqT, RespT> call,
ReqT param,
StreamObserver<RespT> observer) {
asyncServerStreamingCall(call, param, observer);
}
/**
* Execute a server-streaming call returning a blocking {@link Iterator} over the
* response stream.
* @return an iterator over the response stream.
*/
// TODO(user): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Call<ReqT, RespT> call, ReqT param) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>();
asyncServerStreamingCall(call, param, result.listener());
return result;
}
/**
* Execute a server-streaming call with a response {@link StreamObserver}.
*/
public static <ReqT, RespT> void asyncServerStreamingCall(
Call<ReqT, RespT> call,
ReqT param,
StreamObserver<RespT> responseObserver) {
asyncServerStreamingCall(call, param,
new StreamObserverToCallListenerAdapter<RespT>(responseObserver));
}
private static <ReqT, RespT> void asyncServerStreamingCall(
Call<ReqT, RespT> call,
ReqT param,
Call.Listener<RespT> responseListener) {
call.start(responseListener);
try {
call.sendPayload(param);
call.halfClose();
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
}
/**
* Execute a client-streaming call with a blocking {@link Iterator} of request messages.
* @return the single response value.
*/
public static <ReqT, RespT> RespT blockingClientStreamingCall(
Call<ReqT, RespT> call,
Iterator<ReqT> clientStream) {
SettableFuture<RespT> responseFuture = SettableFuture.create();
call.start(new UnaryStreamToFuture<RespT>(responseFuture));
try {
while (clientStream.hasNext()) {
call.sendPayload(clientStream.next());
}
call.halfClose();
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
try {
return getUnchecked(responseFuture);
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
}
/**
* Execute a client-streaming call returning a {@link StreamObserver} for the request messages.
* @return request stream observer.
*/
public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
Call<ReqT, RespT> call,
StreamObserver<RespT> responseObserver) {
return duplexStreamingCall(call, responseObserver);
}
/**
* Execute a duplex-streaming call.
* @return request stream observer.
*/
public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(
Call<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver));
return new CallToStreamObserverAdapter<ReqT>(call);
}
private static class CallToStreamObserverAdapter<T> implements StreamObserver<T> {
private final Call<T, ?> call;
public CallToStreamObserverAdapter(Call<T, ?> call) {
this.call = call;
}
@Override
public void onValue(T value) {
call.sendPayload(value);
}
@Override
public void onError(Throwable t) {
// TODO(user): log?
call.cancel();
}
@Override
public void onCompleted() {
call.halfClose();
}
}
private static class StreamObserverToCallListenerAdapter<T> extends Call.Listener<T> {
private final StreamObserver<T> observer;
public StreamObserverToCallListenerAdapter(StreamObserver<T> observer) {
this.observer = observer;
}
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// StreamObservers don't receive contexts.
return null;
}
@Override
public ListenableFuture<Void> onPayload(T payload) {
observer.onValue(payload);
return null;
}
@Override
public void onClose(Status status) {
if (status.isOk()) {
observer.onCompleted();
} else {
observer.onError(status.asRuntimeException());
}
}
}
/**
* Complete a SettableFuture using {@link StreamObserver} events.
*/
private static class UnaryStreamToFuture<RespT> extends Call.Listener<RespT> {
private final SettableFuture<RespT> responseFuture;
private RespT value;
public UnaryStreamToFuture(SettableFuture<RespT> responseFuture) {
this.responseFuture = responseFuture;
}
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// Don't care about contexts.
return null;
}
@Override
public ListenableFuture<Void> onPayload(RespT value) {
if (this.value != null) {
throw new Status(Transport.Code.INTERNAL, "More than one value received for unary call")
.asRuntimeException();
}
this.value = value;
return null;
}
@Override
public void onClose(Status status) {
if (status.isOk()) {
if (value == null) {
// No value received so mark the future as an error
responseFuture.setException(
new Status(Transport.Code.INTERNAL, "No value received for unary call")
.asRuntimeException().fillInStackTrace());
}
responseFuture.set(value);
} else {
responseFuture.setException(status.asRuntimeException());
}
}
}
/**
* Convert events on a {@link Call.Listener} into a blocking {@link Iterator}.
*
* <p>The class is not thread-safe, but it does permit Call.Listener calls in a separate thread
* from Iterator calls.
*/
// TODO(user): determine how to allow Call.cancel() in case of application error.
private static class BlockingResponseStream<T> implements Iterator<T> {
private final LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>();
private final Call.Listener<T> listener = new QueuingListener();
// Only accessed when iterating.
private Object last;
Call.Listener<T> listener() {
return listener;
}
@Override
public boolean hasNext() {
try {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = (last == null) ? buffer.take() : last;
} catch (InterruptedException ie) {
Thread.interrupted();
throw new RuntimeException(ie);
}
if (last instanceof Throwable) {
throw Throwables.propagate((Throwable) last);
}
return last != this;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
@SuppressWarnings("unchecked")
Payload<T> tmp = (Payload<T>) last;
last = null;
tmp.processed.set(null);
return tmp.value;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private class QueuingListener extends Call.Listener<T> {
private boolean done = false;
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// Don't care about contexts.
return null;
}
@Override
public ListenableFuture<Void> onPayload(T value) {
Preconditions.checkState(!done, "Call already closed");
SettableFuture<Void> future = SettableFuture.create();
buffer.add(new Payload<T>(value, future));
return future;
}
@Override
public void onClose(Status status) {
Preconditions.checkState(!done, "Call already closed");
if (status.isOk()) {
buffer.add(this);
} else {
buffer.add(status.asRuntimeException());
}
done = true;
}
}
}
private static class Payload<T> {
public final T value;
public final SettableFuture<Void> processed;
public Payload(T value, SettableFuture<Void> processed) {
this.value = value;
this.processed = processed;
}
}
}