blob: 158e99e684e256a4c17b45285f91ed8d56523bd5 [file] [log] [blame]
package com.google.net.stubby;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SerializingExecutor;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.ClientTransportFactory;
import com.google.net.stubby.newtransport.StreamListener;
import java.io.InputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/** A communication channel for making outgoing RPCs. */
@ThreadSafe
public final class ChannelImpl extends AbstractService implements Channel {
private final ClientTransportFactory transportFactory;
private final ExecutorService executor;
/**
* All transports that are not stopped. At the very least {@link #activeTransport} will be
* present, but previously used transports that still have streams or are stopping may also be
* present.
*/
@GuardedBy("this")
private Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
/** The transport for new outgoing requests. */
@GuardedBy("this")
private ClientTransport activeTransport;
public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
this.transportFactory = transportFactory;
this.executor = executor;
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected synchronized void doStop() {
if (activeTransport != null) {
activeTransport.stopAsync();
activeTransport = null;
// The last TransportListener will call notifyStopped().
} else {
notifyStopped();
}
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor));
}
private synchronized ClientTransport obtainActiveTransport() {
if (activeTransport == null) {
if (state() != State.RUNNING) {
throw new IllegalStateException("Not running");
}
activeTransport = transportFactory.newClientTransport();
activeTransport.addListener(
new TransportListener(activeTransport), MoreExecutors.directExecutor());
transports.add(activeTransport);
activeTransport.startAsync();
}
return activeTransport;
}
private synchronized void transportFailedOrStopped(ClientTransport transport) {
if (activeTransport == transport) {
activeTransport = null;
}
transports.remove(transport);
if (state() != State.RUNNING && transports.isEmpty()) {
notifyStopped();
}
}
private class TransportListener extends Listener {
private final ClientTransport transport;
public TransportListener(ClientTransport transport) {
this.transport = transport;
}
@Override
public void stopping(State from) {
synchronized (ChannelImpl.this) {
if (activeTransport == transport) {
activeTransport = null;
}
}
}
@Override
public void failed(State from, Throwable failure) {
transportFailedOrStopped(transport);
}
@Override
public void terminated(State from) {
transportFailedOrStopped(transport);
}
}
private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor callExecutor;
// TODO(user): Consider moving flow control notification/management to Call itself.
private final Collection<SettableFuture<Void>> inProcessFutures
= Collections.synchronizedSet(new HashSet<SettableFuture<Void>>());
private ClientStream stream;
public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor) {
this.method = method;
this.callExecutor = executor;
}
@Override
public void start(Listener<RespT> observer) {
Preconditions.checkState(stream == null, "Already started");
stream = obtainActiveTransport().newStream(method, new StreamListenerImpl(observer));
}
@Override
public void cancel() {
// Cancel is called in exception handling cases, so it may be the case that the
// stream was never successfully created.
if (stream != null) {
stream.cancel();
}
}
@Override
public void halfClose() {
Preconditions.checkState(stream != null, "Not started");
stream.halfClose();
}
private int available(InputStream is) {
try {
return is.available();
} catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
@Override
public void sendContext(String name, InputStream value, SettableFuture<Void> accepted) {
Preconditions.checkState(stream != null, "Not started");
if (accepted == null) {
stream.writeContext(name, value, available(value), null);
} else {
inProcessFutures.add(accepted);
stream.writeContext(name, value, available(value), new AcceptedRunnable(accepted));
}
}
@Override
public void sendPayload(ReqT payload, SettableFuture<Void> accepted) {
Preconditions.checkState(stream != null, "Not started");
InputStream payloadIs = method.streamRequest(payload);
if (accepted == null) {
stream.writeMessage(payloadIs, available(payloadIs), null);
} else {
inProcessFutures.add(accepted);
stream.writeMessage(payloadIs, available(payloadIs), new AcceptedRunnable(accepted));
}
}
private class AcceptedRunnable implements Runnable {
private final SettableFuture<Void> future;
public AcceptedRunnable(SettableFuture<Void> future) {
this.future = future;
}
@Override
public void run() {
inProcessFutures.remove(future);
future.set(null);
}
}
private class StreamListenerImpl implements StreamListener {
private final Listener<RespT> observer;
public StreamListenerImpl(Listener<RespT> observer) {
Preconditions.checkNotNull(observer);
this.observer = observer;
}
private ListenableFuture<Void> dispatchCallable(
final Callable<ListenableFuture<Void>> callable) {
final SettableFuture<Void> ours = SettableFuture.create();
callExecutor.execute(new Runnable() {
@Override
public void run() {
try {
ListenableFuture<Void> theirs = callable.call();
if (theirs == null) {
ours.set(null);
} else {
theirs.addListener(new Runnable() {
@Override
public void run() {
// TODO(user): If their Future fails, should we Call.cancel()?
ours.set(null);
}
}, MoreExecutors.directExecutor());
}
} catch (Throwable t) {
ours.set(null);
CallImpl.this.cancel();
Throwables.propagate(t);
}
}
});
return ours;
}
@Override
public ListenableFuture<Void> contextRead(final String name, final InputStream value,
final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() {
return observer.onContext(name, value);
}
});
}
@Override
public ListenableFuture<Void> messageRead(final InputStream message, final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() {
return observer.onPayload(method.parseResponse(message));
}
});
}
@Override
public void closed(final Status status) {
for (SettableFuture<Void> future : inProcessFutures) {
future.cancel(false);
}
inProcessFutures.clear();
callExecutor.execute(new Runnable() {
@Override
public void run() {
observer.onClose(status);
}
});
}
}
}
}