| /* |
| * Copyright 2014, Google Inc. All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| package com.google.net.stubby; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.Service.Listener; |
| import com.google.common.util.concurrent.Service.State; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.net.stubby.transport.ClientStream; |
| import com.google.net.stubby.transport.ClientStreamListener; |
| import com.google.net.stubby.transport.ClientTransport; |
| import com.google.net.stubby.transport.ClientTransportFactory; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| import javax.annotation.concurrent.ThreadSafe; |
| |
| /** A communication channel for making outgoing RPCs. */ |
| @ThreadSafe |
| public final class ChannelImpl implements Channel { |
| |
| private static final Logger log = Logger.getLogger(ChannelImpl.class.getName()); |
| |
| private static class NoopClientStream implements ClientStream { |
| @Override public void writeMessage(InputStream message, int length, Runnable accepted) {} |
| @Override public void flush() {} |
| @Override public void cancel() {} |
| @Override public void halfClose() {} |
| } |
| |
| private final ClientTransportFactory transportFactory; |
| private final ExecutorService executor; |
| /** |
| * All transports that are not stopped. At the very least {@link #activeTransport} will be |
| * present, but previously used transports that still have streams or are stopping may also be |
| * present. |
| */ |
| @GuardedBy("this") |
| private Collection<ClientTransport> transports = new ArrayList<ClientTransport>(); |
| /** The transport for new outgoing requests. */ |
| @GuardedBy("this") |
| private ClientTransport activeTransport; |
| @GuardedBy("this") |
| private boolean shutdown; |
| @GuardedBy("this") |
| private boolean terminated; |
| private Runnable terminationRunnable; |
| |
| public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) { |
| this.transportFactory = transportFactory; |
| this.executor = executor; |
| } |
| |
| /** Hack to allow executors to auto-shutdown. Not for general use. */ |
| // TODO(ejona): Replace with a real API. |
| void setTerminationRunnable(Runnable runnable) { |
| this.terminationRunnable = runnable; |
| } |
| |
| /** |
| * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately |
| * cancelled. |
| */ |
| public synchronized ChannelImpl shutdown() { |
| shutdown = true; |
| if (activeTransport != null) { |
| activeTransport.stopAsync(); |
| activeTransport = null; |
| } else if (transports.isEmpty()) { |
| terminated = true; |
| notifyAll(); |
| if (terminationRunnable != null) { |
| terminationRunnable.run(); |
| } |
| } |
| return this; |
| } |
| |
| /** |
| * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although |
| * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely |
| * return {@code false} immediately after this method returns. |
| * |
| * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown(). |
| */ |
| // TODO(ejona): cancel preexisting calls. |
| public synchronized ChannelImpl shutdownNow() { |
| shutdown(); |
| return this; |
| } |
| |
| /** |
| * Returns whether the channel is shutdown. Shutdown channels immediately cancel any new calls, |
| * but may still have some calls being processed. |
| * |
| * @see #shutdown() |
| * @see #isTerminated() |
| */ |
| public synchronized boolean isShutdown() { |
| return shutdown; |
| } |
| |
| /** |
| * Waits for the channel to become terminated, giving up if the timeout is reached. |
| * |
| * @return whether the channel is terminated, as would be done by {@link #isTerminated()}. |
| */ |
| public synchronized boolean awaitTerminated(long timeout, TimeUnit unit) |
| throws InterruptedException { |
| long timeoutNanos = unit.toNanos(timeout); |
| long endTimeNanos = System.nanoTime() + timeoutNanos; |
| while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { |
| TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos); |
| } |
| return terminated; |
| } |
| |
| /** |
| * Returns whether the channel is terminated. Terminated channels have no running calls and |
| * relevant resources released (like TCP connections). |
| * |
| * @see #isShutdown() |
| */ |
| public synchronized boolean isTerminated() { |
| return terminated; |
| } |
| |
| /** |
| * Creates a new outgoing call on the channel. |
| */ |
| @Override |
| public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) { |
| return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor)); |
| } |
| |
| private synchronized ClientTransport obtainActiveTransport() { |
| if (shutdown) { |
| return null; |
| } |
| if (activeTransport != null) { |
| return activeTransport; |
| } |
| ClientTransport newTransport = transportFactory.newClientTransport(); |
| activeTransport = newTransport; |
| transports.add(newTransport); |
| // activeTransport reference can be changed during calls to the transport, even if we hold the |
| // lock, due to reentrancy. |
| newTransport.addListener(new TransportListener(newTransport), |
| MoreExecutors.directExecutor()); |
| newTransport.startAsync(); |
| return newTransport; |
| } |
| |
| private synchronized void transportFailedOrTerminated(ClientTransport transport) { |
| if (activeTransport == transport) { |
| activeTransport = null; |
| } |
| transports.remove(transport); |
| if (shutdown && transports.isEmpty()) { |
| terminated = true; |
| notifyAll(); |
| if (terminationRunnable != null) { |
| terminationRunnable.run(); |
| } |
| } |
| } |
| |
| private class TransportListener extends Listener { |
| private final ClientTransport transport; |
| |
| public TransportListener(ClientTransport transport) { |
| this.transport = transport; |
| } |
| |
| @Override |
| public void stopping(State from) { |
| synchronized (ChannelImpl.this) { |
| if (activeTransport == transport) { |
| activeTransport = null; |
| } |
| } |
| } |
| |
| @Override |
| public void failed(State from, Throwable failure) { |
| log.log(Level.SEVERE, "Client transport failed", failure); |
| transportFailedOrTerminated(transport); |
| } |
| |
| @Override |
| public void terminated(State from) { |
| transportFailedOrTerminated(transport); |
| } |
| |
| @Override |
| public void running() {} |
| } |
| |
| private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> { |
| private final MethodDescriptor<ReqT, RespT> method; |
| private final SerializingExecutor callExecutor; |
| private ClientStream stream; |
| |
| public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor) { |
| this.method = method; |
| this.callExecutor = executor; |
| } |
| |
| @Override |
| public void start(Listener<RespT> observer, Metadata.Headers headers) { |
| Preconditions.checkState(stream == null, "Already started"); |
| ClientStreamListener listener = new ClientStreamListenerImpl(observer); |
| ClientTransport transport = obtainActiveTransport(); |
| if (transport == null) { |
| stream = new NoopClientStream(); |
| listener.closed(Status.CANCELLED.withDescription("Channel is shutdown"), |
| new Metadata.Trailers()); |
| return; |
| } |
| try { |
| stream = transport.newStream(method, headers, listener); |
| } catch (IllegalStateException ex) { |
| // We can race with the transport and end up trying to use a terminated transport. |
| // TODO(ejona): Improve the API to remove the possibility of the race. |
| stream = new NoopClientStream(); |
| listener.closed(Status.fromThrowable(ex), new Metadata.Trailers()); |
| return; |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| // Cancel is called in exception handling cases, so it may be the case that the |
| // stream was never successfully created. |
| if (stream != null) { |
| stream.cancel(); |
| } |
| } |
| |
| @Override |
| public void halfClose() { |
| Preconditions.checkState(stream != null, "Not started"); |
| stream.halfClose(); |
| } |
| |
| private int available(InputStream is) { |
| try { |
| return is.available(); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public void sendPayload(ReqT payload) { |
| Preconditions.checkState(stream != null, "Not started"); |
| boolean failed = true; |
| try { |
| InputStream payloadIs = method.streamRequest(payload); |
| stream.writeMessage(payloadIs, available(payloadIs), null); |
| failed = false; |
| } finally { |
| if (failed) { |
| cancel(); |
| } |
| } |
| stream.flush(); |
| } |
| |
| private class ClientStreamListenerImpl implements ClientStreamListener { |
| private final Listener<RespT> observer; |
| |
| public ClientStreamListenerImpl(Listener<RespT> observer) { |
| Preconditions.checkNotNull(observer); |
| this.observer = observer; |
| } |
| |
| private ListenableFuture<Void> dispatchCallable( |
| final Callable<ListenableFuture<Void>> callable) { |
| final SettableFuture<Void> ours = SettableFuture.create(); |
| callExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| ListenableFuture<Void> theirs = callable.call(); |
| if (theirs == null) { |
| ours.set(null); |
| } else { |
| Futures.addCallback(theirs, new FutureCallback<Void>() { |
| @Override |
| public void onSuccess(Void result) { |
| ours.set(null); |
| } |
| @Override |
| public void onFailure(Throwable t) { |
| ours.setException(t); |
| } |
| }, MoreExecutors.directExecutor()); |
| } |
| } catch (Throwable t) { |
| ours.setException(t); |
| } |
| } |
| }); |
| return ours; |
| } |
| |
| @Override |
| public ListenableFuture<Void> headersRead(final Metadata.Headers headers) { |
| return dispatchCallable(new Callable<ListenableFuture<Void>>() { |
| @Override |
| public ListenableFuture<Void> call() throws Exception { |
| return observer.onHeaders(headers); |
| } |
| }); |
| } |
| |
| @Override |
| public ListenableFuture<Void> messageRead(final InputStream message, final int length) { |
| return dispatchCallable(new Callable<ListenableFuture<Void>>() { |
| @Override |
| public ListenableFuture<Void> call() { |
| return observer.onPayload(method.parseResponse(message)); |
| } |
| }); |
| } |
| |
| @Override |
| public void closed(final Status status, final Metadata.Trailers trailers) { |
| callExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| observer.onClose(status, trailers); |
| } |
| }); |
| } |
| } |
| } |
| } |