blob: b4f3f226b4e5534f2c48d212e273fee896a3af77 [file] [log] [blame]
package com.google.net.stubby;
import static com.google.common.util.concurrent.Service.State.RUNNING;
import static com.google.common.util.concurrent.Service.State.STARTING;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.ClientStream;
import com.google.net.stubby.transport.ClientStreamListener;
import com.google.net.stubby.transport.ClientTransport;
import com.google.net.stubby.transport.ClientTransportFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/** A communication channel for making outgoing RPCs. */
@ThreadSafe
public final class ChannelImpl extends AbstractService implements Channel {
private static final Logger log = Logger.getLogger(ChannelImpl.class.getName());
private final ClientTransportFactory transportFactory;
private final ExecutorService executor;
/**
* All transports that are not stopped. At the very least {@link #activeTransport} will be
* present, but previously used transports that still have streams or are stopping may also be
* present.
*/
@GuardedBy("this")
private Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
/** The transport for new outgoing requests. */
@GuardedBy("this")
private ClientTransport activeTransport;
public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
this.transportFactory = transportFactory;
this.executor = executor;
}
@Override
protected void doStart() {
obtainActiveTransport(true);
}
@Override
protected synchronized void doStop() {
if (transports.isEmpty()) {
notifyStopped();
} else {
// The last TransportListener will call notifyStopped().
if (activeTransport != null) {
activeTransport.stopAsync();
activeTransport = null;
}
}
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor));
}
private synchronized ClientTransport obtainActiveTransport(boolean notifyWhenRunning) {
if (activeTransport == null) {
State state = state();
if (state != RUNNING && state != STARTING) {
throw new IllegalStateException("Not running");
}
ClientTransport newTransport = transportFactory.newClientTransport();
activeTransport = newTransport;
transports.add(newTransport);
// activeTransport reference can be changed during calls to the transport, even if we hold the
// lock, due to reentrancy.
newTransport.addListener(new TransportListener(newTransport, notifyWhenRunning),
MoreExecutors.directExecutor());
newTransport.startAsync();
return newTransport;
}
return activeTransport;
}
private synchronized void transportFailedOrStopped(ClientTransport transport, Throwable t) {
if (transport.state() == State.FAILED) {
log.log(Level.SEVERE, "client transport failed " + transport.getClass().getName(),
transport.failureCause());
}
if (activeTransport == transport) {
activeTransport = null;
}
transports.remove(transport);
if (state() != RUNNING && transports.isEmpty()) {
if (t != null) {
notifyFailed(t);
} else {
notifyStopped();
}
}
}
private class TransportListener extends Listener {
private final ClientTransport transport;
private final boolean notifyWhenRunning;
public TransportListener(ClientTransport transport, boolean notifyWhenRunning) {
this.transport = transport;
this.notifyWhenRunning = notifyWhenRunning;
}
@Override
public void stopping(State from) {
synchronized (ChannelImpl.this) {
if (activeTransport == transport) {
activeTransport = null;
}
}
}
@Override
public void failed(State from, Throwable failure) {
transportFailedOrStopped(transport, failure);
}
@Override
public void terminated(State from) {
transportFailedOrStopped(transport, null);
}
@Override
public void running() {
if (notifyWhenRunning) {
notifyStarted();
}
}
}
private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor callExecutor;
private ClientStream stream;
public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor) {
this.method = method;
this.callExecutor = executor;
}
@Override
public void start(Listener<RespT> observer, Metadata.Headers headers) {
Preconditions.checkState(stream == null, "Already started");
stream = obtainActiveTransport(false).newStream(method, headers,
new ClientStreamListenerImpl(observer));
}
@Override
public void cancel() {
// Cancel is called in exception handling cases, so it may be the case that the
// stream was never successfully created.
if (stream != null) {
stream.cancel();
}
}
@Override
public void halfClose() {
Preconditions.checkState(stream != null, "Not started");
stream.halfClose();
}
private int available(InputStream is) {
try {
return is.available();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void sendPayload(ReqT payload) {
Preconditions.checkState(stream != null, "Not started");
boolean failed = true;
try {
InputStream payloadIs = method.streamRequest(payload);
stream.writeMessage(payloadIs, available(payloadIs), null);
failed = false;
} finally {
if (failed) {
cancel();
}
}
stream.flush();
}
private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer;
public ClientStreamListenerImpl(Listener<RespT> observer) {
Preconditions.checkNotNull(observer);
this.observer = observer;
}
private ListenableFuture<Void> dispatchCallable(
final Callable<ListenableFuture<Void>> callable) {
final SettableFuture<Void> ours = SettableFuture.create();
callExecutor.execute(new Runnable() {
@Override
public void run() {
try {
ListenableFuture<Void> theirs = callable.call();
if (theirs == null) {
ours.set(null);
} else {
Futures.addCallback(theirs, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
ours.set(null);
}
@Override
public void onFailure(Throwable t) {
ours.setException(t);
}
}, MoreExecutors.directExecutor());
}
} catch (Throwable t) {
ours.setException(t);
}
}
});
return ours;
}
@Override
public ListenableFuture<Void> headersRead(final Metadata.Headers headers) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() throws Exception {
return observer.onHeaders(headers);
}
});
}
@Override
public ListenableFuture<Void> messageRead(final InputStream message, final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() {
return observer.onPayload(method.parseResponse(message));
}
});
}
@Override
public void closed(final Status status, final Metadata.Trailers trailers) {
callExecutor.execute(new Runnable() {
@Override
public void run() {
observer.onClose(status, trailers);
}
});
}
}
}
}