core: delete the old channel impl and LoadBalancer.
diff --git a/core/src/main/java/io/grpc/Internal.java b/core/src/main/java/io/grpc/Internal.java
index 479c1b1..e4cb1a4 100644
--- a/core/src/main/java/io/grpc/Internal.java
+++ b/core/src/main/java/io/grpc/Internal.java
@@ -41,7 +41,7 @@
* Annotates a program element (class, method, package etc) which is internal to gRPC, not part of
* the public API, and should not be used by users of gRPC.
*
- * <p>However, if you want to implement a custom {@link LoadBalancer}, an alternative transport, or
+ * <p>However, if you want to implement a custom {@link LoadBalancer2}, an alternative transport, or
* anything else that will be wired into gRPC library, you may use the internal parts. Please
* consult the gRPC team first, because internal APIs don't have the same API stability guarantee as
* the public APIs do.
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
deleted file mode 100644
index 550e24c..0000000
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc;
-
-import java.util.List;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
- * channel a usable transport when asked.
- *
- * <p>Note to implementations: all methods are expected to return quickly. Any work that may block
- * should be done asynchronously.
- *
- * @param <T> the transport type to balance
- */
-// TODO(zhangkun83): since it's also used for non-loadbalancing cases like pick-first,
-// "RequestRouter" might be a better name.
-@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
-@ThreadSafe
-public abstract class LoadBalancer<T> {
- /**
- * Pick a transport that Channel will use for next RPC.
- *
- * <p>If called after {@link #shutdown} has been called, this method will return
- * a transport that would fail all requests.
- *
- * @param affinity for affinity-based routing
- */
- @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1766")
- public abstract T pickTransport(Attributes affinity);
-
- /**
- * Shuts down this {@code LoadBalancer}.
- */
- public void shutdown() { }
-
- /**
- * Handles newly resolved server groups and metadata attributes from name resolution system.
- * {@code servers} contained in {@link ResolvedServerInfoGroup} should be considered equivalent
- * but may be flattened into a single list if needed.
- *
- * <p>Implementations should not modify the given {@code servers}.
- *
- * @param servers the resolved server addresses, never empty.
- * @param attributes extra metadata from naming system.
- */
- public void handleResolvedAddresses(List<ResolvedServerInfoGroup> servers,
- Attributes attributes) { }
-
- /**
- * Handles an error from the name resolution system.
- *
- * @param error a non-OK status
- */
- public void handleNameResolutionError(Status error) { }
-
- /**
- * Called when a transport is fully connected and ready to accept traffic.
- */
- public void handleTransportReady(EquivalentAddressGroup addressGroup) { }
-
- /**
- * Called when a transport is shutting down.
- */
- public void handleTransportShutdown(EquivalentAddressGroup addressGroup, Status s) { }
-
- public abstract static class Factory {
- /**
- * Creates a {@link LoadBalancer} that will be used inside a channel.
- *
- * @param serviceName the DNS-style service name, which is also the authority
- * @param tm the interface where an {@code LoadBalancer} implementation gets connected
- * transports from
- */
- public abstract <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm);
- }
-}
diff --git a/core/src/main/java/io/grpc/LoadBalancer2.java b/core/src/main/java/io/grpc/LoadBalancer2.java
index 2230106..a4772df 100644
--- a/core/src/main/java/io/grpc/LoadBalancer2.java
+++ b/core/src/main/java/io/grpc/LoadBalancer2.java
@@ -41,7 +41,7 @@
/**
* A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
* channel a usable subchannel when asked. This is the new interface that will replace {@link
- * LoadBalancer}.
+ * LoadBalancer2}.
*
* <p><strong>IMPORTANT NOTICE FOR IMPLEMENTORS: </strong>The name of this class is temporary. It
* will be renamed to {@code LoadBalancer} eventually. Make sure you have read through <a
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index d24012d..eb6734c 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -155,26 +155,11 @@
public abstract T nameResolverFactory(NameResolver.Factory resolverFactory);
/**
- * Provides a custom {@link LoadBalancer.Factory} for the channel.
- *
- * <p>If this method is not called, the builder will use {@link PickFirstBalancerFactory}
- * for the channel.
- *
- * <p>This overrides what was set by {@link #loadBalancerFactory(LoadBalancer2.Factory)}.
- *
- * <p>Calling this will revert the channel back to the original (LBv1) implementation.
- */
- @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
- public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
-
- /**
* Provides a custom {@link LoadBalancer2.Factory} for the channel.
*
- * <p>If this method is not called, the builder will use {@link PickFirstBalancerFactory}
+ * <p>If this method is not called, the builder will use {@link PickFirstBalancerFactory2}
* for the channel.
*
- * <p>This overrides what was set by {@link #loadBalancerFactory(LoadBalancer.Factory)}.
- *
* <p>Calling this will make the channel to run the LBv2 code path. See <a
* href="https://github.com/grpc/grpc-java/issues/2656" target="_blank">#2656</a> for more
* information.
diff --git a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java
deleted file mode 100644
index b125b03..0000000
--- a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc;
-
-import com.google.common.base.Supplier;
-import io.grpc.TransportManager.InterimTransport;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * A {@link LoadBalancer} that provides no load balancing mechanism over the
- * addresses from the {@link NameResolver}. The channel's default behavior
- * (currently pick-first) is used for all addresses found.
- */
-@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
-public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
-
- private static final PickFirstBalancerFactory instance = new PickFirstBalancerFactory();
-
- private PickFirstBalancerFactory() {
- }
-
- public static PickFirstBalancerFactory getInstance() {
- return instance;
- }
-
- @Override
- public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
- return new PickFirstBalancer<T>(tm);
- }
-
- private static class PickFirstBalancer<T> extends LoadBalancer<T> {
- private static final Status SHUTDOWN_STATUS =
- Status.UNAVAILABLE.augmentDescription("PickFirstBalancer has shut down");
-
- private final Object lock = new Object();
-
- /** "lock" must be held when mutating. */
- private volatile EquivalentAddressGroup addresses;
- @GuardedBy("lock")
- private InterimTransport<T> interimTransport;
- @GuardedBy("lock")
- private Status nameResolutionError;
- @GuardedBy("lock")
- private boolean closed;
-
- private final TransportManager<T> tm;
-
- private PickFirstBalancer(TransportManager<T> tm) {
- this.tm = tm;
- }
-
- @Override
- public T pickTransport(Attributes affinity) {
- EquivalentAddressGroup addressesCopy = addresses;
- if (addressesCopy != null) {
- return tm.getTransport(addressesCopy);
- }
- synchronized (lock) {
- if (closed) {
- return tm.createFailingTransport(SHUTDOWN_STATUS);
- }
- addressesCopy = addresses;
- if (addressesCopy == null) {
- if (nameResolutionError != null) {
- return tm.createFailingTransport(nameResolutionError);
- }
- if (interimTransport == null) {
- interimTransport = tm.createInterimTransport();
- }
- return interimTransport.transport();
- }
- }
- return tm.getTransport(addressesCopy);
- }
-
- @Override
- public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
- Attributes attributes) {
- InterimTransport<T> savedInterimTransport;
- final EquivalentAddressGroup newAddresses;
- synchronized (lock) {
- if (closed) {
- return;
- }
- newAddresses = resolvedServerInfoGroupsToEquivalentAddressGroup(updatedServers);
- if (newAddresses.equals(addresses)) {
- return;
- }
- addresses = newAddresses;
- nameResolutionError = null;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithRealTransports(new Supplier<T>() {
- @Override public T get() {
- return tm.getTransport(newAddresses);
- }
- });
- }
- }
-
- @Override
- public void handleNameResolutionError(Status error) {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (closed) {
- return;
- }
- error = error.augmentDescription("Name resolution failed");
- savedInterimTransport = interimTransport;
- interimTransport = null;
- nameResolutionError = error;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(error);
- }
- }
-
- @Override
- public void shutdown() {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (closed) {
- return;
- }
- closed = true;
- addresses = null;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(SHUTDOWN_STATUS);
- }
- }
-
- /**
- * Converts list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
- */
- private static EquivalentAddressGroup resolvedServerInfoGroupsToEquivalentAddressGroup(
- List<ResolvedServerInfoGroup> groupList) {
- List<SocketAddress> addrs = new ArrayList<SocketAddress>(groupList.size());
- for (ResolvedServerInfoGroup group : groupList) {
- for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
- addrs.add(srv.getAddress());
- }
- }
- return new EquivalentAddressGroup(addrs);
- }
- }
-}
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 5fa6449..7c3ba42 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -43,7 +43,6 @@
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
-import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer2;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -112,9 +111,6 @@
private LoadBalancer2.Factory loadBalancerFactory;
@Nullable
- private ChannelFactory channelFactory;
-
- @Nullable
private DecompressorRegistry decompressorRegistry;
@Nullable
@@ -202,34 +198,6 @@
"directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
directServerAddress);
this.loadBalancerFactory = loadBalancerFactory;
- this.channelFactory = null;
- return thisT();
- }
-
- @Override
- public final T loadBalancerFactory(final LoadBalancer.Factory loadBalancerFactory) {
- Preconditions.checkState(directServerAddress == null,
- "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
- directServerAddress);
- this.channelFactory = new ChannelFactory() {
- @Override
- public ManagedChannel create(ClientTransportFactory transportFactory) {
- return new ManagedChannelImpl(
- target,
- // TODO(carl-mastrangelo): Allow clients to pass this in
- new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory,
- getNameResolverParams(),
- loadBalancerFactory,
- transportFactory,
- firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
- firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
- GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
- executor, userAgent, interceptors,
- firstNonNull(statsFactory,
- firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
- }
- };
return thisT();
}
@@ -263,7 +231,7 @@
// We convert to the largest unit to avoid overflow
if (unit.toDays(value) >= IDLE_MODE_MAX_TIMEOUT_DAYS) {
// This disables idle mode
- this.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
+ this.idleTimeoutMillis = ManagedChannelImpl2.IDLE_TIMEOUT_MILLIS_DISABLE;
} else {
this.idleTimeoutMillis = Math.max(unit.toMillis(value), IDLE_MODE_MIN_TIMEOUT_MILLIS);
}
@@ -307,26 +275,22 @@
// getResource(), then this shouldn't be a problem unless called on the UI thread.
nameResolverFactory = NameResolverProvider.asFactory();
}
- if (channelFactory != null) {
- return channelFactory.create(transportFactory);
- } else {
- return new ManagedChannelImpl2(
- target,
- // TODO(carl-mastrangelo): Allow clients to pass this in
- new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory,
- getNameResolverParams(),
- firstNonNull(loadBalancerFactory, PickFirstBalancerFactory2.getInstance()),
- transportFactory,
- firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
- firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
- SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
- getExecutorPool(executor),
- SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
- GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
- userAgent, interceptors, firstNonNull(statsFactory,
- firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
- }
+ return new ManagedChannelImpl2(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ nameResolverFactory,
+ getNameResolverParams(),
+ firstNonNull(loadBalancerFactory, PickFirstBalancerFactory2.getInstance()),
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
+ getExecutorPool(executor),
+ SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
+ GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+ userAgent, interceptors, firstNonNull(statsFactory,
+ firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
}
/**
@@ -429,13 +393,4 @@
T thisT = (T) this;
return thisT;
}
-
- /**
- * A temporary solution to contain the reference to ManagedChannelImpl in the v1 LoadBalancer
- * setter, so that on Android ManagedChannelImpl can be stripped out by ProGuard since the v1
- * setter will not be called. This should be deleted along with v1 ManagedChannelImpl.
- */
- private interface ChannelFactory {
- ManagedChannel create(ClientTransportFactory transportFactory);
- }
}
diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
deleted file mode 100644
index 803c3c5..0000000
--- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import io.grpc.CallOptions;
-import io.grpc.Context;
-import io.grpc.LoadBalancer2;
-import io.grpc.LoadBalancer2.PickResult;
-import io.grpc.LoadBalancer2.Subchannel;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.Status;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * A client transport that queues requests before a real transport is available. When a backing
- * transport supplier is later provided, this class delegates to the transports from it.
- *
- * <p>This transport owns the streams that it has created before {@link #setTransport} is
- * called. When {@link #setTransport} is called, the ownership of pending streams and subsequent new
- * streams are transferred to the given transport, thus this transport won't own any stream.
- */
-class DelayedClientTransport implements ManagedClientTransport {
-
- private final LogId lodId = LogId.allocate(getClass().getName());
-
- private final Object lock = new Object();
-
- private final Executor streamCreationExecutor;
-
- private Listener listener;
- /** 'lock' must be held when assigning to transportSupplier. */
- private volatile Supplier<ClientTransport> transportSupplier;
-
- @GuardedBy("lock")
- private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();
-
- // TODO(zhangkun83): remove it once LBv2 refactor is done. In practice ping() is only called on
- // real transports.
- @GuardedBy("lock")
- private Collection<PendingPing> pendingPings = new ArrayList<PendingPing>();
-
- /**
- * When shutdown == true and pendingStreams == null, then the transport is considered terminated.
- */
- @GuardedBy("lock")
- private boolean shutdown;
-
- /**
- * The delayed client transport will come into a back-off interval if it fails to establish a real
- * transport for all addresses, namely the channel is in TRANSIENT_FAILURE. When in a back-off
- * interval, {@code backoffStatus != null}.
- *
- * <p>If the transport is in a back-off interval, then all fail fast streams (including the
- * pending as well as new ones) will fail immediately. New non-fail fast streams can be created as
- * {@link PendingStream} and will keep pending during this back-off period.
- */
- @GuardedBy("lock")
- @Nullable
- private Status backoffStatus;
-
- DelayedClientTransport(Executor streamCreationExecutor) {
- this.streamCreationExecutor = streamCreationExecutor;
- }
-
- @Override
- public final Runnable start(Listener listener) {
- this.listener = Preconditions.checkNotNull(listener, "listener");
- return null;
- }
-
- /**
- * If the transport has acquired a transport {@link Supplier}, then returned stream is delegated
- * from its supplier.
- *
- * <p>If the new stream to be created is with fail fast call option and the delayed transport is
- * in a back-off interval, then a {@link FailingClientStream} is returned.
- *
- * <p>If it is not the above cases and the delayed transport is not shutdown, then a
- * {@link PendingStream} is returned; if the transport is shutdown, then a
- * {@link FailingClientStream} is returned.
- */
- @Override
- public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
- CallOptions callOptions, StatsTraceContext statsTraceCtx) {
- Supplier<ClientTransport> supplier = transportSupplier;
- if (supplier == null) {
- synchronized (lock) {
- // Check again, since it may have changed while waiting for lock
- supplier = transportSupplier;
- if (supplier == null && !shutdown) {
- if (backoffStatus != null && !callOptions.isWaitForReady()) {
- return new FailingClientStream(backoffStatus);
- }
- PendingStream pendingStream = new PendingStream(method, headers, callOptions,
- statsTraceCtx);
- pendingStreams.add(pendingStream);
- if (pendingStreams.size() == 1) {
- listener.transportInUse(true);
- }
- return pendingStream;
- }
- }
- }
- if (supplier != null) {
- return supplier.get().newStream(method, headers, callOptions, statsTraceCtx);
- }
- return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown"));
- }
-
- @Override
- public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
- return newStream(method, headers, CallOptions.DEFAULT, StatsTraceContext.NOOP);
- }
-
- @Override
- public final void ping(final PingCallback callback, Executor executor) {
- Supplier<ClientTransport> supplier = transportSupplier;
- if (supplier == null) {
- synchronized (lock) {
- // Check again, since it may have changed while waiting for lock
- supplier = transportSupplier;
- if (supplier == null && !shutdown) {
- PendingPing pendingPing = new PendingPing(callback, executor);
- pendingPings.add(pendingPing);
- return;
- }
- }
- }
- if (supplier != null) {
- supplier.get().ping(callback, executor);
- return;
- }
- executor.execute(new Runnable() {
- @Override public void run() {
- callback.onFailure(
- Status.UNAVAILABLE.withDescription("transport shutdown").asException());
- }
- });
- }
-
- /**
- * Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are
- * not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called,
- * you still need to call {@link #setTransport} to make this transport terminated.
- */
- @Override
- public final void shutdown() {
- synchronized (lock) {
- if (shutdown) {
- return;
- }
- shutdown = true;
- listener.transportShutdown(
- Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
- if (pendingStreams == null || pendingStreams.isEmpty()) {
- pendingStreams = null;
- listener.transportTerminated();
- }
- }
- }
-
- /**
- * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
- * this transport.
- */
- @Override
- public final void shutdownNow(Status status) {
- shutdown();
- Collection<PendingStream> savedPendingStreams = null;
- synchronized (lock) {
- if (pendingStreams != null) {
- savedPendingStreams = pendingStreams;
- pendingStreams = null;
- }
- }
- if (savedPendingStreams != null) {
- for (PendingStream stream : savedPendingStreams) {
- stream.cancel(status);
- }
- listener.transportTerminated();
- }
- // If savedPendingStreams == null, transportTerminated() has already been called in shutdown().
- }
-
- /**
- * Transfers all the pending and future streams and pings to the given transport.
- *
- * <p>May only be called after {@link #start(Listener)}.
- *
- * <p>{@code transport} will be used for all future calls to {@link #newStream}, even if this
- * transport is {@link #shutdown}.
- */
- public final void setTransport(ClientTransport transport) {
- Preconditions.checkArgument(this != transport,
- "delayed transport calling setTransport on itself");
- setTransportSupplier(Suppliers.ofInstance(transport));
- }
-
- /**
- * Transfers all the pending and future streams and pings to the transports from the given {@link
- * Supplier}.
- *
- * <p>May only be called after {@link #start}. No effect if already called.
- *
- * <p>Each stream or ping will result in an invocation to {@link Supplier#get} once. The supplier
- * will be used for all future calls to {@link #newStream}, even if this transport is {@link
- * #shutdown}.
- */
- public final void setTransportSupplier(final Supplier<ClientTransport> supplier) {
- synchronized (lock) {
- if (transportSupplier != null) {
- return;
- }
- Preconditions.checkState(listener != null, "start() not called");
- transportSupplier = Preconditions.checkNotNull(supplier, "supplier");
- for (PendingPing ping : pendingPings) {
- ping.createRealPing(supplier.get());
- }
- pendingPings = null;
- if (shutdown && pendingStreams != null) {
- listener.transportTerminated();
- }
- if (pendingStreams != null && !pendingStreams.isEmpty()) {
- final Collection<PendingStream> savedPendingStreams = pendingStreams;
- // createRealStream may be expensive. It will start real streams on the transport. If there
- // are pending requests, they will be serialized too, which may be expensive. Since we are
- // now on transport thread, we need to offload the work to an executor.
- streamCreationExecutor.execute(new Runnable() {
- @Override public void run() {
- for (final PendingStream stream : savedPendingStreams) {
- stream.createRealStream(supplier.get());
- }
- // TODO(zhangkun83): some transports (e.g., netty) may have a short delay between
- // stream.start() and transportInUse(true). If netty's transportInUse(true) is called
- // after the delayed transport's transportInUse(false), the channel may have a brief
- // period where all transports are not in-use, and may go to IDLE mode unexpectedly if
- // the IDLE timeout is shorter (e.g., 0) than that brief period. Maybe we should
- // have a minimum IDLE timeout?
- synchronized (lock) {
- listener.transportInUse(false);
- }
- }
- });
- }
- pendingStreams = null;
- if (!shutdown) {
- listener.transportReady();
- }
- }
- }
-
- public final boolean hasPendingStreams() {
- synchronized (lock) {
- return pendingStreams != null && !pendingStreams.isEmpty();
- }
- }
-
- @VisibleForTesting
- final int getPendingStreamsCount() {
- synchronized (lock) {
- return pendingStreams == null ? 0 : pendingStreams.size();
- }
- }
-
- /**
- * True return value indicates that the delayed transport is in a back-off interval (in
- * TRANSIENT_FAILURE), that all fail fast streams (including pending as well as new ones) should
- * fail immediately, and that non-fail fast streams can be created as {@link PendingStream} and
- * should keep pending during this back-off period.
- */
- // TODO(zhangkun83): remove it once the LBv2 refactor is done.
- @VisibleForTesting
- final boolean isInBackoffPeriod() {
- synchronized (lock) {
- return backoffStatus != null;
- }
- }
-
- /**
- * Is only called at the beginning of {@link TransportSet#scheduleBackoff}.
- *
- * <p>Does jobs at the beginning of the back-off:
- *
- * <p>sets {@link #backoffStatus};
- *
- * <p>sets all pending streams with a fail fast call option of the delayed transport as
- * {@link FailingClientStream}s, and removes them from the list of pending streams of the
- * transport.
- *
- * @param status the causal status for triggering back-off.
- */
- // TODO(zhangkun83): remove it once the LBv2 refactor is done.
- final void startBackoff(final Status status) {
- synchronized (lock) {
- if (shutdown) {
- return;
- }
- Preconditions.checkState(backoffStatus == null,
- "Error when calling startBackoff: transport is already in backoff period");
- backoffStatus = Status.UNAVAILABLE.withDescription("Channel in TRANSIENT_FAILURE state")
- .withCause(status.asRuntimeException());
- final ArrayList<PendingStream> failFastPendingStreams = new ArrayList<PendingStream>();
- if (pendingStreams != null && !pendingStreams.isEmpty()) {
- final Iterator<PendingStream> it = pendingStreams.iterator();
- while (it.hasNext()) {
- PendingStream stream = it.next();
- if (!stream.callOptions.isWaitForReady()) {
- failFastPendingStreams.add(stream);
- it.remove();
- }
- }
-
- class FailTheFailFastPendingStreams implements Runnable {
- @Override
- public void run() {
- for (PendingStream stream : failFastPendingStreams) {
- stream.setStream(new FailingClientStream(status));
- }
- }
- }
-
- streamCreationExecutor.execute(new FailTheFailFastPendingStreams());
- }
- }
- }
-
- /**
- * Is only called at the beginning of the callback function of {@code endOfCurrentBackoff} in the
- * {@link TransportSet#scheduleBackoff} method.
- */
- // TODO(zhangkun83): remove it once the LBv2 refactor is done.
- final void endBackoff() {
- synchronized (lock) {
- Preconditions.checkState(backoffStatus != null,
- "Error when calling endBackoff: transport is not in backoff period");
- backoffStatus = null;
- }
- }
-
- /**
- * Use the picker to try picking a transport for every pending stream and pending ping, proceed
- * the stream or ping if the pick is successful, otherwise keep it pending.
- *
- * <p>If new pending streams are created while reprocess() is in progress, there is no guarantee
- * that the pending streams will or will not be processed by this picker.
- *
- * <p>This method <strong>must not</strong> be called concurrently, either with itself or with
- * {@link #setTransportSupplier}/{@link #setTransport}.
- */
- final void reprocess(LoadBalancer2.SubchannelPicker picker) {
- ArrayList<PendingStream> toProcess;
- ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
- synchronized (lock) {
- if (pendingStreams == null || pendingStreams.isEmpty()) {
- return;
- }
- toProcess = new ArrayList<PendingStream>(pendingStreams);
- }
-
- for (final PendingStream stream : toProcess) {
- PickResult pickResult = picker.pickSubchannel(
- stream.callOptions.getAffinity(), stream.headers);
- final ClientTransport realTransport;
- Subchannel subchannel = pickResult.getSubchannel();
- if (subchannel != null) {
- realTransport = ((SubchannelImpl) subchannel).obtainActiveTransport();
- } else {
- realTransport = null;
- }
- if (realTransport != null) {
- Executor executor = streamCreationExecutor;
- // createRealStream may be expensive. It will start real streams on the transport. If
- // there are pending requests, they will be serialized too, which may be expensive. Since
- // we are now on transport thread, we need to offload the work to an executor.
- if (stream.callOptions.getExecutor() != null) {
- executor = stream.callOptions.getExecutor();
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- stream.createRealStream(realTransport);
- }
- });
- toRemove.add(stream);
- } else if (!pickResult.getStatus().isOk() && !stream.callOptions.isWaitForReady()) {
- stream.setStream(new FailingClientStream(pickResult.getStatus()));
- toRemove.add(stream);
- } // other cases: stay pending
- }
-
- synchronized (lock) {
- // Between this synchronized and the previous one:
- // - Streams may have been cancelled, which may turn pendingStreams into emptiness.
- // - shutdown() may be called, which may turn pendingStreams into null.
- if (pendingStreams == null || pendingStreams.isEmpty()) {
- return;
- }
- pendingStreams.removeAll(toRemove);
- if (pendingStreams.isEmpty()) {
- // There may be a brief gap between delayed transport clearing in-use state, and first real
- // transport starting streams and setting in-use state. During the gap the whole channel's
- // in-use state may be false. However, it shouldn't cause spurious switching to idleness
- // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
- // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
- listener.transportInUse(false);
- if (shutdown) {
- pendingStreams = null;
- listener.transportTerminated();
- } else {
- // Because delayed transport is long-lived, we take this opportunity to down-size the
- // hashmap.
- pendingStreams = new LinkedHashSet<PendingStream>();
- }
- }
- }
- }
-
- // TODO(carl-mastrangelo): remove this once the Subchannel change is in.
- @Override
- public LogId getLogId() {
- return lodId;
- }
-
- @VisibleForTesting
- @Nullable
- final Supplier<ClientTransport> getTransportSupplier() {
- return transportSupplier;
- }
-
- private class PendingStream extends DelayedStream {
- private final MethodDescriptor<?, ?> method;
- private final Metadata headers;
- private final CallOptions callOptions;
- private final Context context;
- private final StatsTraceContext statsTraceCtx;
-
- private PendingStream(MethodDescriptor<?, ?> method, Metadata headers,
- CallOptions callOptions, StatsTraceContext statsTraceCtx) {
- this.method = method;
- this.headers = headers;
- this.callOptions = callOptions;
- this.context = Context.current();
- this.statsTraceCtx = statsTraceCtx;
- }
-
- private void createRealStream(ClientTransport transport) {
- ClientStream realStream;
- Context origContext = context.attach();
- try {
- realStream = transport.newStream(method, headers, callOptions, statsTraceCtx);
- } finally {
- context.detach(origContext);
- }
- setStream(realStream);
- }
-
- @Override
- public void cancel(Status reason) {
- super.cancel(reason);
- synchronized (lock) {
- if (pendingStreams != null) {
- boolean justRemovedAnElement = pendingStreams.remove(this);
- if (pendingStreams.isEmpty() && justRemovedAnElement) {
- listener.transportInUse(false);
- if (shutdown) {
- pendingStreams = null;
- listener.transportTerminated();
- }
- }
- }
- }
- }
- }
-
- private static class PendingPing {
- private final PingCallback callback;
- private final Executor executor;
-
- public PendingPing(PingCallback callback, Executor executor) {
- this.callback = callback;
- this.executor = executor;
- }
-
- public void createRealPing(ClientTransport transport) {
- try {
- transport.ping(callback, executor);
- } catch (final UnsupportedOperationException ex) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- callback.onFailure(ex);
- }
- });
- }
- }
- }
-}
diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java
deleted file mode 100644
index dc8a927..0000000
--- a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2016, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import java.util.HashSet;
-import javax.annotation.CheckReturnValue;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * Aggregates the in-use state of a set of objects.
- */
-abstract class InUseStateAggregator<T> {
-
- @GuardedBy("getLock()")
- private final HashSet<T> inUseObjects = new HashSet<T>();
-
- /**
- * Update the in-use state of an object. Initially no object is in use. When the return value is
- * non-{@code null}, the caller should execute the runnable after releasing locks.
- */
- @CheckReturnValue
- final Runnable updateObjectInUse(T object, boolean inUse) {
- Runnable runnable = null;
- synchronized (getLock()) {
- int origSize = inUseObjects.size();
- if (inUse) {
- inUseObjects.add(object);
- if (origSize == 0) {
- runnable = handleInUse();
- }
- } else {
- boolean removed = inUseObjects.remove(object);
- if (removed && origSize == 1) {
- handleNotInUse();
- }
- }
- }
- return runnable;
- }
-
- @CheckReturnValue
- final boolean isInUse() {
- synchronized (getLock()) {
- return !inUseObjects.isEmpty();
- }
- }
-
- abstract Object getLock();
-
- /**
- * Called when the aggregated in-use state has changed to true, which means at least one object is
- * in use. When the return value is non-{@code null}, then the runnable will be executed by the
- * caller of {@link #updateObjectInUse} after releasing locks.
- *
- * <p>This method is called under the lock returned by {@link #getLock}.
- */
- @GuardedBy("getLock()")
- abstract Runnable handleInUse();
-
- /**
- * Called when the aggregated in-use state has changed to false, which means no object is in use.
- *
- * <p>This method is called under the lock returned by {@link #getLock}.
- */
- @GuardedBy("getLock()")
- abstract void handleNotInUse();
-}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
deleted file mode 100644
index 80f7a0c..0000000
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- * Copyright 2014, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
-import com.google.instrumentation.stats.StatsContextFactory;
-import io.grpc.Attributes;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ClientInterceptors;
-import io.grpc.CompressorRegistry;
-import io.grpc.DecompressorRegistry;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.LoadBalancer;
-import io.grpc.ManagedChannel;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.NameResolver;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import io.grpc.TransportManager.OobTransportProvider;
-import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-
-/** A communication channel for making outgoing RPCs. */
-@ThreadSafe
-public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
- private static final Logger log = Logger.getLogger(ManagedChannelImpl.class.getName());
-
- // Matching this pattern means the target string is a URI target or at least intended to be one.
- // A URI target must be an absolute hierarchical URI.
- // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
- @VisibleForTesting
- static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
-
- static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
-
- /**
- * The time after idleTimeoutMillis expires before idleness takes effect. The time before
- * idleTimeoutMillis expires is part of a fast path for acquiring the load balancer. After
- * idleTimeoutMillis expires a slow path takes effect with extra synchronization.
- *
- * <p>Transports having open streams prevents entering idle mode. However, this creates an
- * inherent race between acquiring a transport, which can't be done in idle mode, and the RPC
- * actually being created on that transport, which inhibits idle mode. Thus we reset the idle
- * timer when acquiring a transport, and impose a minimum idle time (IDLE_MODE_MIN_TIMEOUT_MILLIS)
- * to make the chances of racing very small. If we do race, then the RPC will spuriously fail
- * because the transport chosen was shut down.
- *
- * <p>For heavy users, resetting the idle timer each RPC becomes highly contended. We instead only
- * need to reset the timer when it is close to expiring. We do the equivalent by having two
- * periods: a reduced regular idle time period and the extra time as a grace period. We ignore the
- * race during the regular idle time period, but any acquisition during the grace period must
- * reset the timer.
- */
- @VisibleForTesting
- static final long IDLE_GRACE_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
- private static final ClientTransport SHUTDOWN_TRANSPORT =
- new FailingClientTransport(Status.UNAVAILABLE.withDescription("Channel is shutdown"));
-
- @VisibleForTesting
- static final ClientTransport IDLE_MODE_TRANSPORT =
- new FailingClientTransport(Status.INTERNAL.withDescription("Channel is in idle mode"));
-
- private final String target;
- private final NameResolver.Factory nameResolverFactory;
- private final Attributes nameResolverParams;
- private final LoadBalancer.Factory loadBalancerFactory;
- private final ClientTransportFactory transportFactory;
- private final Executor executor;
- private final boolean usingSharedExecutor;
- private final Object lock = new Object();
- private final LogId logId = LogId.allocate(getClass().getName());
-
- private final DecompressorRegistry decompressorRegistry;
- private final CompressorRegistry compressorRegistry;
-
- private final SharedResourceHolder.Resource<ScheduledExecutorService> timerService;
- private final Supplier<Stopwatch> stopwatchSupplier;
- /** The timout before entering idle mode, less {@link #IDLE_GRACE_PERIOD_MILLIS}. */
- private final long idleTimeoutMillis;
- private final StatsContextFactory statsFactory;
-
- /**
- * Executor that runs deadline timers for requests.
- */
- private ScheduledExecutorService scheduledExecutor;
-
- private final BackoffPolicy.Provider backoffPolicyProvider;
-
- /**
- * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
- * any interceptors this will just be {@link RealChannel}.
- */
- private final Channel interceptorChannel;
- @Nullable private final String userAgent;
-
- // Never be null. Must be modified under lock.
- private NameResolver nameResolver;
-
- /** {@code null} when idle or when in grace idle period. "lock" must be held when modifying. */
- @Nullable
- private volatile LoadBalancer<ClientTransport> loadBalancer;
-
- /** non-{code null} iff channel is in grace idle period. */
- @GuardedBy("lock")
- @Nullable
- private LoadBalancer<ClientTransport> graceLoadBalancer;
-
- /**
- * Maps EquivalentAddressGroups to transports for that server. "lock" must be held when mutating.
- */
- // Even though we set a concurrency level of 1, this is better than Collections.synchronizedMap
- // because it doesn't need to acquire a lock for reads.
- private final ConcurrentMap<EquivalentAddressGroup, TransportSet> transports =
- new ConcurrentHashMap<EquivalentAddressGroup, TransportSet>(16, .75f, 1);
-
- /**
- * TransportSets that are shutdown (but not yet terminated) due to channel idleness or channel
- * shut down.
- */
- @GuardedBy("lock")
- private final HashSet<TransportSet> decommissionedTransports = new HashSet<TransportSet>();
-
- @GuardedBy("lock")
- private final HashSet<DelayedClientTransport> delayedTransports =
- new HashSet<DelayedClientTransport>();
-
- @VisibleForTesting
- final InUseStateAggregator<Object> inUseStateAggregator =
- new InUseStateAggregator<Object>() {
- @Override
- Object getLock() {
- return lock;
- }
-
- @Override
- @GuardedBy("lock")
- Runnable handleInUse() {
- return exitIdleMode();
- }
-
- @GuardedBy("lock")
- @Override
- void handleNotInUse() {
- if (shutdown) {
- return;
- }
- rescheduleIdleTimer();
- }
- };
-
- private class IdleModeTimer implements Runnable {
- @GuardedBy("lock")
- boolean cancelled;
-
- @Override
- public void run() {
- ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
- LoadBalancer<ClientTransport> savedBalancer;
- NameResolver oldResolver;
- synchronized (lock) {
- if (cancelled) {
- // Race detected: this task started before cancelIdleTimer() could cancel it.
- return;
- }
- if (loadBalancer != null) {
- // Enter grace period.
- graceLoadBalancer = loadBalancer;
- loadBalancer = null;
- assert idleModeTimer == this;
- idleModeTimerFuture = scheduledExecutor.schedule(new LogExceptionRunnable(idleModeTimer),
- IDLE_GRACE_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
- return;
- }
- log.log(Level.FINE, "[{0}] Entering idle mode", getLogId());
- // Enter idle mode
- savedBalancer = graceLoadBalancer;
- graceLoadBalancer = null;
- oldResolver = nameResolver;
- nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
- transportsCopy.addAll(transports.values());
- transports.clear();
- decommissionedTransports.addAll(transportsCopy);
- }
- for (TransportSet ts : transportsCopy) {
- ts.shutdown();
- }
- savedBalancer.shutdown();
- oldResolver.shutdown();
- }
- }
-
- @GuardedBy("lock")
- @Nullable
- private ScheduledFuture<?> idleModeTimerFuture;
- @GuardedBy("lock")
- @Nullable
- private IdleModeTimer idleModeTimer;
-
- /**
- * Make the channel exit idle mode, if it's in it. Return a LoadBalancer that can be used for
- * making new requests. Return null if the channel is shutdown.
- */
- @VisibleForTesting
- LoadBalancer<ClientTransport> exitIdleModeAndGetLb() {
- Runnable runnable;
- LoadBalancer<ClientTransport> balancer;
- synchronized (lock) {
- runnable = exitIdleMode();
- balancer = loadBalancer;
- }
- if (runnable != null) {
- runnable.run();
- }
- return balancer;
- }
-
- /**
- * Make the channel exit idle mode, if it's in it. If the returned runnable is non-{@code null},
- * then it should be executed by the caller after releasing {@code lock}.
- */
- @GuardedBy("lock")
- private Runnable exitIdleMode() {
- final LoadBalancer<ClientTransport> balancer;
- final NameResolver resolver;
- if (shutdown) {
- return null;
- }
- if (inUseStateAggregator.isInUse()) {
- cancelIdleTimer();
- } else {
- // exitIdleMode() may be called outside of inUseStateAggregator, which may still in
- // "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if
- // the aggregator receives actual uses.
- rescheduleIdleTimer();
- }
- if (graceLoadBalancer != null) {
- // Exit grace period; timer already rescheduled above.
- loadBalancer = graceLoadBalancer;
- graceLoadBalancer = null;
- }
- if (loadBalancer != null) {
- return null;
- }
- log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
- balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
- this.loadBalancer = balancer;
- resolver = this.nameResolver;
- class NameResolverStartTask implements Runnable {
- @Override
- public void run() {
- NameResolverListenerImpl listener = new NameResolverListenerImpl(balancer);
- // This may trigger quite a few non-trivial work in LoadBalancer and NameResolver,
- // we don't want to do it in the lock.
- try {
- resolver.start(listener);
- } catch (Throwable t) {
- listener.onError(Status.fromThrowable(t));
- }
- }
- }
-
- return new NameResolverStartTask();
- }
-
- @VisibleForTesting
- boolean isInIdleGracePeriod() {
- synchronized (lock) {
- return graceLoadBalancer != null;
- }
- }
-
- // ErrorProne's GuardedByChecker can't figure out that the idleModeTimer is a nested instance of
- // this particular instance. It is worried about something like:
- // ManagedChannelImpl a = ...;
- // ManagedChannelImpl b = ...;
- // a.idleModeTimer = b.idleModeTimer;
- // a.cancelIdleTimer(); // access of b.idleModeTimer is guarded by a.lock, not b.lock
- //
- // _We_ know that isn't happening, so we suppress the warning.
- @SuppressWarnings("GuardedByChecker")
- @GuardedBy("lock")
- private void cancelIdleTimer() {
- if (idleModeTimerFuture != null) {
- idleModeTimerFuture.cancel(false);
- idleModeTimer.cancelled = true;
- idleModeTimerFuture = null;
- idleModeTimer = null;
- }
- }
-
- @GuardedBy("lock")
- private void rescheduleIdleTimer() {
- if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
- return;
- }
- cancelIdleTimer();
- idleModeTimer = new IdleModeTimer();
- idleModeTimerFuture = scheduledExecutor.schedule(new LogExceptionRunnable(idleModeTimer),
- idleTimeoutMillis, TimeUnit.MILLISECONDS);
- }
-
- @GuardedBy("lock")
- private final HashSet<OobTransportProviderImpl> oobTransports =
- new HashSet<OobTransportProviderImpl>();
-
- @GuardedBy("lock")
- private boolean shutdown;
- @GuardedBy("lock")
- private boolean shutdownNowed;
- @GuardedBy("lock")
- private boolean terminated;
-
- private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
- @Override
- public ClientTransport get(CallOptions callOptions, Metadata headers) {
- LoadBalancer<ClientTransport> balancer = loadBalancer;
- if (balancer == null) {
- // Current state is either idle or in grace period
- balancer = exitIdleModeAndGetLb();
- }
- if (balancer == null) {
- return SHUTDOWN_TRANSPORT;
- }
- return balancer.pickTransport(callOptions.getAffinity());
- }
- };
-
- ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
- NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
- LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
- DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
- SharedResourceHolder.Resource<ScheduledExecutorService> timerService,
- Supplier<Stopwatch> stopwatchSupplier, long idleTimeoutMillis,
- @Nullable Executor executor, @Nullable String userAgent,
- List<ClientInterceptor> interceptors, StatsContextFactory statsFactory) {
- this.target = checkNotNull(target, "target");
- this.nameResolverFactory = checkNotNull(nameResolverFactory, "nameResolverFactory");
- this.nameResolverParams = checkNotNull(nameResolverParams, "nameResolverParams");
- this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
- this.loadBalancerFactory = checkNotNull(loadBalancerFactory, "loadBalancerFactory");
- if (executor == null) {
- usingSharedExecutor = true;
- this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
- } else {
- usingSharedExecutor = false;
- this.executor = executor;
- }
- this.backoffPolicyProvider = backoffPolicyProvider;
- this.transportFactory =
- new CallCredentialsApplyingTransportFactory(transportFactory, this.executor);
- this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
- this.timerService = timerService;
- this.scheduledExecutor = SharedResourceHolder.get(timerService);
- this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
- if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
- this.idleTimeoutMillis = idleTimeoutMillis;
- } else {
- assert IDLE_GRACE_PERIOD_MILLIS
- <= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS;
- checkArgument(idleTimeoutMillis >= IDLE_GRACE_PERIOD_MILLIS,
- "invalid idleTimeoutMillis %s", idleTimeoutMillis);
- this.idleTimeoutMillis = idleTimeoutMillis - IDLE_GRACE_PERIOD_MILLIS;
- }
- this.decompressorRegistry = decompressorRegistry;
- this.compressorRegistry = compressorRegistry;
- this.userAgent = userAgent;
- this.statsFactory = checkNotNull(statsFactory, "statsFactory");
-
- if (log.isLoggable(Level.INFO)) {
- log.log(Level.INFO, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
- }
- }
-
- @VisibleForTesting
- static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
- Attributes nameResolverParams) {
- // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
- // "dns:///".
- URI targetUri = null;
- StringBuilder uriSyntaxErrors = new StringBuilder();
- try {
- targetUri = new URI(target);
- // For "localhost:8080" this would likely cause newNameResolver to return null, because
- // "localhost" is parsed as the scheme. Will fall into the next branch and try
- // "dns:///localhost:8080".
- } catch (URISyntaxException e) {
- // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
- uriSyntaxErrors.append(e.getMessage());
- }
- if (targetUri != null) {
- NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
- if (resolver != null) {
- return resolver;
- }
- // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
- // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
- }
-
- // If we reached here, the targetUri couldn't be used.
- if (!URI_PATTERN.matcher(target).matches()) {
- // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
- // scheme from the factory.
- try {
- targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
- } catch (URISyntaxException e) {
- // Should not be possible.
- throw new IllegalArgumentException(e);
- }
- if (targetUri != null) {
- NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
- if (resolver != null) {
- return resolver;
- }
- }
- }
- throw new IllegalArgumentException(String.format(
- "cannot find a NameResolver for %s%s",
- target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
- }
-
- /**
- * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
- * cancelled.
- */
- @Override
- public ManagedChannelImpl shutdown() {
- log.log(Level.FINE, "[{0}] shutdown() called", getLogId());
- ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
- ArrayList<DelayedClientTransport> delayedTransportsCopy =
- new ArrayList<DelayedClientTransport>();
- ArrayList<OobTransportProviderImpl> oobTransportsCopy =
- new ArrayList<OobTransportProviderImpl>();
- LoadBalancer<ClientTransport> balancer;
- NameResolver resolver;
- synchronized (lock) {
- if (shutdown) {
- return this;
- }
- log.log(Level.FINE, "[{0}] Shutting down", getLogId());
- shutdown = true;
- // After shutdown there are no new calls, so no new cancellation tasks are needed
- scheduledExecutor = SharedResourceHolder.release(timerService, scheduledExecutor);
- maybeTerminateChannel();
- if (!terminated) {
- transportsCopy.addAll(transports.values());
- transports.clear();
- decommissionedTransports.addAll(transportsCopy);
- delayedTransportsCopy.addAll(delayedTransports);
- oobTransportsCopy.addAll(oobTransports);
- }
- balancer = getCurrentLoadBalancer();
- resolver = nameResolver;
- cancelIdleTimer();
- }
- if (balancer != null) {
- balancer.shutdown();
- }
- resolver.shutdown();
- for (TransportSet ts : transportsCopy) {
- ts.shutdown();
- }
- for (DelayedClientTransport transport : delayedTransportsCopy) {
- transport.shutdown();
- }
- for (OobTransportProviderImpl provider : oobTransportsCopy) {
- provider.close();
- }
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Shutting down", getLogId());
- }
- return this;
- }
-
- /**
- * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
- * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
- * return {@code false} immediately after this method returns.
- */
- @Override
- public ManagedChannelImpl shutdownNow() {
- log.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
- synchronized (lock) {
- // Short-circuiting not strictly necessary, but prevents transports from needing to handle
- // multiple shutdownNow invocations.
- if (shutdownNowed) {
- return this;
- }
- shutdownNowed = true;
- }
- shutdown();
- List<TransportSet> transportsCopy;
- List<DelayedClientTransport> delayedTransportsCopy;
- List<OobTransportProviderImpl> oobTransportsCopy;
- synchronized (lock) {
- transportsCopy = new ArrayList<TransportSet>(transports.values());
- transportsCopy.addAll(decommissionedTransports);
- delayedTransportsCopy = new ArrayList<DelayedClientTransport>(delayedTransports);
- oobTransportsCopy = new ArrayList<OobTransportProviderImpl>(oobTransports);
- }
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Shutting down now", getLogId());
- }
- Status nowStatus = Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
- for (TransportSet ts : transportsCopy) {
- ts.shutdownNow(nowStatus);
- }
- for (DelayedClientTransport transport : delayedTransportsCopy) {
- transport.shutdownNow(nowStatus);
- }
- for (OobTransportProviderImpl provider : oobTransportsCopy) {
- provider.shutdownNow(nowStatus);
- }
- return this;
- }
-
- @Override
- public boolean isShutdown() {
- synchronized (lock) {
- return shutdown;
- }
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- synchronized (lock) {
- long timeoutNanos = unit.toNanos(timeout);
- long endTimeNanos = System.nanoTime() + timeoutNanos;
- while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
- TimeUnit.NANOSECONDS.timedWait(lock, timeoutNanos);
- }
- return terminated;
- }
- }
-
- @Override
- public boolean isTerminated() {
- synchronized (lock) {
- return terminated;
- }
- }
-
- /*
- * Creates a new outgoing call on the channel.
- */
- @Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
- CallOptions callOptions) {
- return interceptorChannel.newCall(method, callOptions);
- }
-
- @Override
- public String authority() {
- return interceptorChannel.authority();
- }
-
- /** Returns {@code null} iff channel is in idle state. */
- @GuardedBy("lock")
- private LoadBalancer<ClientTransport> getCurrentLoadBalancer() {
- if (loadBalancer != null) {
- return loadBalancer;
- }
- return graceLoadBalancer;
- }
-
- private class RealChannel extends Channel {
- @Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
- CallOptions callOptions) {
- Executor executor = callOptions.getExecutor();
- if (executor == null) {
- executor = ManagedChannelImpl.this.executor;
- }
- StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
- method.getFullMethodName(), statsFactory, stopwatchSupplier);
- return new ClientCallImpl<ReqT, RespT>(
- method,
- executor,
- callOptions,
- statsTraceCtx,
- transportProvider,
- scheduledExecutor)
- .setDecompressorRegistry(decompressorRegistry)
- .setCompressorRegistry(compressorRegistry);
- }
-
- @Override
- public String authority() {
- String authority = nameResolver.getServiceAuthority();
- return checkNotNull(authority, "authority");
- }
- }
-
- /**
- * Terminate the channel if termination conditions are met.
- */
- @GuardedBy("lock")
- private void maybeTerminateChannel() {
- if (terminated) {
- return;
- }
- if (shutdown && transports.isEmpty() && decommissionedTransports.isEmpty()
- && delayedTransports.isEmpty() && oobTransports.isEmpty()) {
- if (log.isLoggable(Level.INFO)) {
- log.log(Level.INFO, "[{0}] Terminated", getLogId());
- }
- terminated = true;
- lock.notifyAll();
- if (usingSharedExecutor) {
- SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, (ExecutorService) executor);
- }
- // Release the transport factory so that it can deallocate any resources.
- transportFactory.close();
- }
- }
-
- @VisibleForTesting
- final TransportManager<ClientTransport> tm = new TransportManager<ClientTransport>() {
- @Override
- public void updateRetainedTransports(Collection<EquivalentAddressGroup> addrs) {
- // TODO(zhangkun83): warm-up new servers and discard removed servers.
- }
-
- @Override
- public ClientTransport getTransport(final EquivalentAddressGroup addressGroup) {
- checkNotNull(addressGroup, "addressGroup");
- TransportSet ts = transports.get(addressGroup);
- if (ts != null) {
- return ts.obtainActiveTransport();
- }
- synchronized (lock) {
- if (shutdown) {
- return SHUTDOWN_TRANSPORT;
- }
- if (getCurrentLoadBalancer() == null) {
- return IDLE_MODE_TRANSPORT;
- }
- ts = transports.get(addressGroup);
- if (ts == null) {
- ts = new TransportSet(addressGroup, authority(), userAgent, getCurrentLoadBalancer(),
- backoffPolicyProvider, transportFactory, scheduledExecutor, stopwatchSupplier,
- executor, new TransportSet.Callback() {
- @Override
- public void onTerminated(TransportSet ts) {
- synchronized (lock) {
- transports.remove(addressGroup);
- decommissionedTransports.remove(ts);
- maybeTerminateChannel();
- }
- }
-
- @Override
- public void onAllAddressesFailed() {
- nameResolver.refresh();
- }
-
- @Override
- public void onConnectionClosedByServer(Status status) {
- nameResolver.refresh();
- }
-
- @Override
- public Runnable onInUse(TransportSet ts) {
- return inUseStateAggregator.updateObjectInUse(ts, true);
- }
-
- @Override
- public void onNotInUse(TransportSet ts) {
- Runnable r = inUseStateAggregator.updateObjectInUse(ts, false);
- assert r == null;
- }
- });
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] {1} created for {2}",
- new Object[] {getLogId(), ts.getLogId(), addressGroup});
- }
- transports.put(addressGroup, ts);
- }
- }
- return ts.obtainActiveTransport();
- }
-
- @Override
- public Channel makeChannel(ClientTransport transport) {
- return new SingleTransportChannel(
- statsFactory, transport, executor, scheduledExecutor, authority(), stopwatchSupplier);
- }
-
- @Override
- public ClientTransport createFailingTransport(Status error) {
- return new FailingClientTransport(error);
- }
-
- @Override
- public InterimTransport<ClientTransport> createInterimTransport() {
- return new InterimTransportImpl();
- }
-
- @Override
- public OobTransportProvider<ClientTransport> createOobTransportProvider(
- EquivalentAddressGroup addressGroup, String authority) {
- return new OobTransportProviderImpl(addressGroup, authority);
- }
- };
-
- @Override
- public LogId getLogId() {
- return logId;
- }
-
- private class NameResolverListenerImpl implements NameResolver.Listener {
- final LoadBalancer<ClientTransport> balancer;
-
- NameResolverListenerImpl(LoadBalancer<ClientTransport> balancer) {
- this.balancer = balancer;
- }
-
- @Override
- public void onUpdate(List<ResolvedServerInfoGroup> servers, Attributes config) {
- if (servers.isEmpty()) {
- onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
- return;
- }
- log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
- new Object[] {getLogId(), servers, config});
-
- try {
- balancer.handleResolvedAddresses(servers, config);
- } catch (Throwable e) {
- log.log(Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
- // It must be a bug! Push the exception back to LoadBalancer in the hope that it may be
- // propagated to the application.
- balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
- .withDescription("Thrown from handleResolvedAddresses(): " + e));
- }
- }
-
- @Override
- public void onError(Status error) {
- checkArgument(!error.isOk(), "the error status must not be OK");
- log.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
- new Object[] {getLogId(), error});
- balancer.handleNameResolutionError(error);
- }
- }
-
- private class InterimTransportImpl implements InterimTransport<ClientTransport> {
- private final DelayedClientTransport delayedTransport;
- private boolean closed;
-
- InterimTransportImpl() {
- delayedTransport = new DelayedClientTransport(executor);
- delayedTransport.start(new ManagedClientTransport.Listener() {
- @Override public void transportShutdown(Status status) {}
-
- @Override public void transportTerminated() {
- synchronized (lock) {
- delayedTransports.remove(delayedTransport);
- maybeTerminateChannel();
- }
- Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, false);
- assert r == null;
- }
-
- @Override public void transportReady() {}
-
- @Override public void transportInUse(boolean inUse) {
- Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
- if (r != null) {
- r.run();
- }
- }
- });
- boolean savedShutdown;
- synchronized (lock) {
- delayedTransports.add(delayedTransport);
- savedShutdown = shutdown;
- }
- if (savedShutdown) {
- delayedTransport.setTransport(SHUTDOWN_TRANSPORT);
- delayedTransport.shutdown();
- }
- }
-
- @Override
- public ClientTransport transport() {
- checkState(!closed, "already closed");
- return delayedTransport;
- }
-
- @Override
- public void closeWithRealTransports(Supplier<ClientTransport> realTransports) {
- delayedTransport.setTransportSupplier(realTransports);
- delayedTransport.shutdown();
- }
-
- @Override
- public void closeWithError(Status error) {
- delayedTransport.shutdownNow(error);
- }
- }
-
- private class OobTransportProviderImpl implements OobTransportProvider<ClientTransport> {
- private final TransportSet transportSet;
- private final ClientTransport transport;
-
- OobTransportProviderImpl(EquivalentAddressGroup addressGroup, String authority) {
- synchronized (lock) {
- if (shutdown) {
- transportSet = null;
- transport = SHUTDOWN_TRANSPORT;
- } else if (getCurrentLoadBalancer() == null) {
- transportSet = null;
- transport = IDLE_MODE_TRANSPORT;
- } else {
- transport = null;
- transportSet = new TransportSet(addressGroup, authority, userAgent,
- getCurrentLoadBalancer(), backoffPolicyProvider, transportFactory, scheduledExecutor,
- stopwatchSupplier, executor, new TransportSet.Callback() {
- @Override
- public void onTerminated(TransportSet ts) {
- synchronized (lock) {
- oobTransports.remove(OobTransportProviderImpl.this);
- maybeTerminateChannel();
- }
- }
- });
- oobTransports.add(this);
- }
- }
- }
-
- @Override
- public ClientTransport get() {
- if (transport != null) {
- return transport;
- } else {
- return transportSet.obtainActiveTransport();
- }
- }
-
- @Override
- public void close() {
- if (transportSet != null) {
- transportSet.shutdown();
- }
- }
-
- void shutdownNow(Status reason) {
- if (transportSet != null) {
- transportSet.shutdownNow(reason);
- }
- }
- }
-}
diff --git a/core/src/main/java/io/grpc/internal/RoundRobinServerList.java b/core/src/main/java/io/grpc/internal/RoundRobinServerList.java
deleted file mode 100644
index c45790a..0000000
--- a/core/src/main/java/io/grpc/internal/RoundRobinServerList.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Manages a list of server addresses to round-robin on.
- */
-@ThreadSafe
-public class RoundRobinServerList<T> {
- private final TransportManager<T> tm;
- private final List<EquivalentAddressGroup> list;
- private final Iterator<EquivalentAddressGroup> cyclingIter;
- private final T requestDroppingTransport;
-
- private RoundRobinServerList(TransportManager<T> tm, List<EquivalentAddressGroup> list) {
- this.tm = tm;
- this.list = list;
- this.cyclingIter = new CycleIterator<EquivalentAddressGroup>(list);
- this.requestDroppingTransport =
- tm.createFailingTransport(Status.UNAVAILABLE.withDescription("Throttled by LB"));
- }
-
- /**
- * Returns the next transport in the list of servers.
- *
- * @return the next transport
- */
- public T getTransportForNextServer() {
- EquivalentAddressGroup currentServer;
- synchronized (cyclingIter) {
- // TODO(zhangkun83): receive transportShutdown and transportReady events, then skip addresses
- // that have been failing.
- currentServer = cyclingIter.next();
- }
- if (currentServer == null) {
- return requestDroppingTransport;
- }
- return tm.getTransport(currentServer);
- }
-
- @VisibleForTesting
- public List<EquivalentAddressGroup> getList() {
- return list;
- }
-
- public int size() {
- return list.size();
- }
-
- @NotThreadSafe
- public static class Builder<T> {
- private final List<EquivalentAddressGroup> list = new ArrayList<EquivalentAddressGroup>();
- private final TransportManager<T> tm;
-
- public Builder(TransportManager<T> tm) {
- this.tm = tm;
- }
-
- /**
- * Adds a server to the list, or {@code null} for a drop entry.
- */
- public Builder<T> addSocketAddress(@Nullable SocketAddress address) {
- list.add(new EquivalentAddressGroup(address));
- return this;
- }
-
- /**
- * Adds a address group to the list.
- *
- * @param addresses the addresses to add
- */
- public Builder<T> add(EquivalentAddressGroup addresses) {
- list.add(addresses);
- return this;
- }
-
- /**
- * Adds a list of address groups.
- *
- * @param addresses the list of addresses group.
- */
- public Builder<T> addAll(Collection<EquivalentAddressGroup> addresses) {
- list.addAll(addresses);
- return this;
- }
-
- public RoundRobinServerList<T> build() {
- return new RoundRobinServerList<T>(tm,
- Collections.unmodifiableList(new ArrayList<EquivalentAddressGroup>(list)));
- }
- }
-
- private static final class CycleIterator<T> implements Iterator<T> {
- private final List<T> list;
- private int index;
-
- public CycleIterator(List<T> list) {
- this.list = list;
- }
-
- @Override
- public boolean hasNext() {
- return !list.isEmpty();
- }
-
- @Override
- public T next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- T val = list.get(index);
- index++;
- if (index >= list.size()) {
- index -= list.size();
- }
- return val;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
deleted file mode 100644
index d42547f..0000000
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
-import io.grpc.CallOptions;
-import io.grpc.ClientCall;
-import io.grpc.ConnectivityState;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.LoadBalancer;
-import io.grpc.ManagedChannel;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.Status;
-import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.annotation.CheckReturnValue;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Transports for a single {@link SocketAddress}.
- */
-@ThreadSafe
-final class TransportSet extends ManagedChannel implements WithLogId {
- private static final Logger log = Logger.getLogger(TransportSet.class.getName());
- private static final ClientTransport SHUTDOWN_TRANSPORT =
- new FailingClientTransport(Status.UNAVAILABLE.withDescription("TransportSet is shutdown"));
-
- private final CountDownLatch terminatedLatch = new CountDownLatch(1);
- private final Object lock = new Object();
- private final LogId logId = LogId.allocate(getClass().getName());
- private final EquivalentAddressGroup addressGroup;
- private final String authority;
- private final String userAgent;
- private final BackoffPolicy.Provider backoffPolicyProvider;
- private final Callback callback;
- private final ClientTransportFactory transportFactory;
- private final ScheduledExecutorService scheduledExecutor;
- private final Executor appExecutor;
-
- @GuardedBy("lock")
- private int nextAddressIndex;
-
- /**
- * The policy to control back off between reconnects. Non-{@code null} when last connect failed.
- */
- @GuardedBy("lock")
- private BackoffPolicy reconnectPolicy;
-
- /**
- * Timer monitoring duration since entering CONNECTING state.
- */
- @GuardedBy("lock")
- private final Stopwatch connectingTimer;
-
- @GuardedBy("lock")
- @Nullable
- private ScheduledFuture<?> reconnectTask;
-
- /**
- * All transports that are not terminated. At the very least the value of {@link #activeTransport}
- * will be present, but previously used transports that still have streams or are stopping may
- * also be present.
- */
- @GuardedBy("lock")
- private final Collection<ManagedClientTransport> transports =
- new ArrayList<ManagedClientTransport>();
-
- private final InUseStateAggregator<ManagedClientTransport> inUseStateAggregator =
- new InUseStateAggregator<ManagedClientTransport>() {
- @Override
- Object getLock() {
- return lock;
- }
-
- @Override
- Runnable handleInUse() {
- return callback.onInUse(TransportSet.this);
- }
-
- @Override
- void handleNotInUse() {
- callback.onNotInUse(TransportSet.this);
- }
- };
-
- /**
- * The to-be active transport, which is not ready yet.
- */
- @GuardedBy("lock")
- @Nullable
- private ConnectionClientTransport pendingTransport;
-
- private final LoadBalancer<ClientTransport> loadBalancer;
-
- @GuardedBy("lock")
- private boolean shutdown;
-
- /**
- * The transport for new outgoing requests. 'lock' must be held when assigning to it.
- *
- * <pre><code>
- * State Value
- * ----- ------
- * IDLE null, shutdown == false
- * CONNECTING instanceof DelayedTransport, reconnectTask == null
- * READY connected transport
- * TRANSIENT_FAILURE instanceof DelayedTransport, reconnectTask != null
- * SHUTDOWN null, shutdown == true
- * </code></pre>
- */
- @Nullable
- private volatile ManagedClientTransport activeTransport;
-
- @GuardedBy("lock")
- private final ConnectivityStateManager stateManager =
- new ConnectivityStateManager(ConnectivityState.IDLE);
-
- TransportSet(EquivalentAddressGroup addressGroup, String authority, String userAgent,
- LoadBalancer<ClientTransport> loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
- ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
- Supplier<Stopwatch> stopwatchSupplier, Executor appExecutor, Callback callback) {
- this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
- this.authority = authority;
- this.userAgent = userAgent;
- this.loadBalancer = loadBalancer;
- this.backoffPolicyProvider = backoffPolicyProvider;
- this.transportFactory = transportFactory;
- this.scheduledExecutor = scheduledExecutor;
- this.connectingTimer = stopwatchSupplier.get();
- this.appExecutor = appExecutor;
- this.callback = callback;
- }
-
- /**
- * Returns the active transport that will be used to create new streams.
- *
- * <p>Never returns {@code null}.
- */
- final ClientTransport obtainActiveTransport() {
- ClientTransport savedTransport = activeTransport;
- if (savedTransport != null) {
- return savedTransport;
- }
- Runnable runnable;
- synchronized (lock) {
- // Check again, since it could have changed before acquiring the lock
- savedTransport = activeTransport;
- if (savedTransport != null) {
- return savedTransport;
- }
- if (shutdown) {
- return SHUTDOWN_TRANSPORT;
- }
- stateManager.gotoState(ConnectivityState.CONNECTING);
- DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor);
- transports.add(delayedTransport);
- delayedTransport.start(new BaseTransportListener(delayedTransport));
- savedTransport = activeTransport = delayedTransport;
- runnable = startNewTransport(delayedTransport);
- }
- if (runnable != null) {
- runnable.run();
- }
- return savedTransport;
- }
-
- @CheckReturnValue
- @GuardedBy("lock")
- private Runnable startNewTransport(DelayedClientTransport delayedTransport) {
- Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
-
- if (nextAddressIndex == 0) {
- connectingTimer.reset().start();
- }
- List<SocketAddress> addrs = addressGroup.getAddresses();
- final SocketAddress address = addrs.get(nextAddressIndex++);
- if (nextAddressIndex >= addrs.size()) {
- nextAddressIndex = 0;
- }
-
- ConnectionClientTransport transport =
- transportFactory.newClientTransport(address, authority, userAgent);
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Created {1} for {2}",
- new Object[] {getLogId(), transport.getLogId(), address});
- }
- pendingTransport = transport;
- transports.add(transport);
- return transport.start(new TransportListener(transport, delayedTransport, address));
- }
-
- /**
- * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
- * @param status the causal status when the channel begins transition to
- * TRANSIENT_FAILURE.
- */
- private void scheduleBackoff(
- final DelayedClientTransport delayedTransport, final Status status) {
- // This must be run outside of lock. The TransportSet lock is a channel level lock.
- // startBackoff() will acquire the delayed transport lock, which is a transport level
- // lock. Our lock ordering mandates transport lock > channel lock. Otherwise a deadlock
- // could happen (https://github.com/grpc/grpc-java/issues/2152).
- delayedTransport.startBackoff(status);
-
- class EndOfCurrentBackoff implements Runnable {
- @Override
- public void run() {
- try {
- delayedTransport.endBackoff();
- Runnable runnable = null;
- synchronized (lock) {
- reconnectTask = null;
- if (!shutdown) {
- stateManager.gotoState(ConnectivityState.CONNECTING);
- }
- runnable = startNewTransport(delayedTransport);
- }
- if (runnable != null) {
- runnable.run();
- }
- } catch (Throwable t) {
- log.log(Level.WARNING, "Exception handling end of backoff", t);
- }
- }
- }
-
- synchronized (lock) {
- if (shutdown) {
- return;
- }
- stateManager.gotoState(ConnectivityState.TRANSIENT_FAILURE);
- if (reconnectPolicy == null) {
- reconnectPolicy = backoffPolicyProvider.get();
- }
- long delayMillis =
- reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS);
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms",
- new Object[]{getLogId(), delayMillis});
- }
- Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
- reconnectTask = scheduledExecutor.schedule(
- new LogExceptionRunnable(new EndOfCurrentBackoff()),
- delayMillis,
- TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public ManagedChannel shutdown() {
- ManagedClientTransport savedActiveTransport;
- ConnectionClientTransport savedPendingTransport;
- boolean runCallback = false;
- synchronized (lock) {
- if (shutdown) {
- return this;
- }
- stateManager.gotoState(ConnectivityState.SHUTDOWN);
- shutdown = true;
- savedActiveTransport = activeTransport;
- savedPendingTransport = pendingTransport;
- activeTransport = null;
- if (transports.isEmpty()) {
- runCallback = true;
- terminatedLatch.countDown();
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Terminated in shutdown()", getLogId());
- }
- Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
- } // else: the callback will be run once all transports have been terminated
- }
- if (savedActiveTransport != null) {
- savedActiveTransport.shutdown();
- }
- if (savedPendingTransport != null) {
- savedPendingTransport.shutdown();
- }
- if (runCallback) {
- callback.onTerminated(this);
- }
- return this;
- }
-
- void shutdownNow(Status reason) {
- shutdown();
- Collection<ManagedClientTransport> transportsCopy;
- synchronized (lock) {
- transportsCopy = new ArrayList<ManagedClientTransport>(transports);
- }
- for (ManagedClientTransport transport : transportsCopy) {
- transport.shutdownNow(reason);
- }
- }
-
- @Override
- public ManagedChannel shutdownNow() {
- shutdownNow(Status.UNAVAILABLE.withDescription("TransportSet shutdown as ManagedChannel"));
- return this;
- }
-
- @GuardedBy("lock")
- private void cancelReconnectTask() {
- if (reconnectTask != null) {
- reconnectTask.cancel(false);
- reconnectTask = null;
- }
- }
-
- @Override
- public LogId getLogId() {
- return logId;
- }
-
- @Override
- public final <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
- MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
- return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
- new SerializingExecutor(appExecutor), callOptions, StatsTraceContext.NOOP,
- new ClientTransportProvider() {
- @Override
- public ClientTransport get(CallOptions callOptions, Metadata headers) {
- return obtainActiveTransport();
- }
- },
- scheduledExecutor);
- }
-
- @Override
- public boolean isShutdown() {
- synchronized (lock) {
- return shutdown;
- }
- }
-
- @Override
- public boolean isTerminated() {
- return terminatedLatch.getCount() == 0;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return terminatedLatch.await(timeout, unit);
- }
-
- @Override
- public String authority() {
- return authority;
- }
-
- @Override
- public ConnectivityState getState(boolean requestConnection) {
- if (requestConnection) {
- boolean connect = false;
- synchronized (lock) {
- connect = stateManager.getState() == ConnectivityState.IDLE;
- }
- if (connect) {
- obtainActiveTransport();
- }
- }
- synchronized (lock) {
- return stateManager.getState();
- }
- }
-
- @Override
- public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
- synchronized (lock) {
- stateManager.notifyWhenStateChanged(callback, appExecutor, source);
- }
- }
-
- /** Shared base for both delayed and real transports. */
- private class BaseTransportListener implements ManagedClientTransport.Listener {
- protected final ManagedClientTransport transport;
-
- public BaseTransportListener(ManagedClientTransport transport) {
- this.transport = transport;
- }
-
- @Override
- public void transportReady() {}
-
- @Override
- public void transportInUse(boolean inUse) {
- Runnable r = inUseStateAggregator.updateObjectInUse(transport, inUse);
- if (r != null) {
- r.run();
- }
- }
-
- @Override
- public void transportShutdown(Status status) {}
-
- @Override
- public void transportTerminated() {
- boolean runCallback = false;
- Runnable r = inUseStateAggregator.updateObjectInUse(transport, false);
- assert r == null;
- synchronized (lock) {
- transports.remove(transport);
- if (shutdown && transports.isEmpty()) {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", getLogId());
- }
- terminatedLatch.countDown();
- runCallback = true;
- cancelReconnectTask();
- }
- }
- if (runCallback) {
- callback.onTerminated(TransportSet.this);
- }
- }
- }
-
- /** Listener for real transports. */
- private class TransportListener extends BaseTransportListener {
- private final SocketAddress address;
- private final DelayedClientTransport delayedTransport;
-
- public TransportListener(ManagedClientTransport transport,
- DelayedClientTransport delayedTransport, SocketAddress address) {
- super(transport);
- this.address = address;
- this.delayedTransport = delayedTransport;
- }
-
- @Override
- public void transportReady() {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] {1} for {2} is ready",
- new Object[] {getLogId(), transport.getLogId(), address});
- }
- super.transportReady();
- boolean savedShutdown;
- synchronized (lock) {
- savedShutdown = shutdown;
- reconnectPolicy = null;
- nextAddressIndex = 0;
- if (shutdown) {
- // If TransportSet already shutdown, transport is only to take care of pending
- // streams in delayedTransport, but will not serve new streams, and it will be shutdown
- // as soon as it's set to the delayedTransport.
- // activeTransport should have already been set to null by shutdown(). We keep it null.
- Preconditions.checkState(activeTransport == null,
- "Unexpected non-null activeTransport");
- } else if (activeTransport == delayedTransport) {
- stateManager.gotoState(ConnectivityState.READY);
- Preconditions.checkState(pendingTransport == transport, "transport mismatch");
- activeTransport = transport;
- pendingTransport = null;
- }
- }
- delayedTransport.setTransport(transport);
- // This delayed transport will terminate and be removed from transports.
- delayedTransport.shutdown();
- if (savedShutdown) {
- // See comments in the synchronized block above on why we shutdown here.
- transport.shutdown();
- }
- loadBalancer.handleTransportReady(addressGroup);
- }
-
- @Override
- public void transportShutdown(Status s) {
- boolean allAddressesFailed = false;
- boolean closedByServer = false;
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
- new Object[] {getLogId(), transport.getLogId(), address, s});
- }
- super.transportShutdown(s);
- Runnable runnable = null;
- synchronized (lock) {
- if (activeTransport == transport) {
- // This is true only if the transport was ready.
- // shutdown() should have set activeTransport to null
- Preconditions.checkState(!shutdown, "unexpected shutdown state");
- stateManager.gotoState(ConnectivityState.IDLE);
- activeTransport = null;
- closedByServer = true;
- } else if (activeTransport == delayedTransport) {
- // shutdown() should have set activeTransport to null
- Preconditions.checkState(!shutdown, "unexpected shutdown state");
- // Continue reconnect if there are still addresses to try.
- if (nextAddressIndex == 0) {
- allAddressesFailed = true;
- } else {
- Preconditions.checkState(stateManager.getState() == ConnectivityState.CONNECTING,
- "Expected state is CONNECTING, actual state is %s", stateManager.getState());
- runnable = startNewTransport(delayedTransport);
- }
- }
- }
- if (allAddressesFailed) {
- // Initiate backoff
- // Transition to TRANSIENT_FAILURE
- scheduleBackoff(delayedTransport, s);
- }
- if (runnable != null) {
- runnable.run();
- }
- loadBalancer.handleTransportShutdown(addressGroup, s);
- if (allAddressesFailed) {
- callback.onAllAddressesFailed();
- }
- if (closedByServer) {
- callback.onConnectionClosedByServer(s);
- }
- }
-
- @Override
- public void transportTerminated() {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
- new Object[] {getLogId(), transport.getLogId(), address});
- }
- super.transportTerminated();
- Preconditions.checkState(activeTransport != transport,
- "activeTransport still points to the delayedTransport. "
- + "Seems transportShutdown() was not called.");
- }
- }
-
- abstract static class Callback {
- /**
- * Called when the TransportSet is terminated, which means it's shut down and all transports
- * have been terminated.
- */
- public void onTerminated(TransportSet ts) { }
-
- /**
- * Called when all addresses have failed to connect.
- */
- public void onAllAddressesFailed() { }
-
- /**
- * Called when a once-live connection is shut down by server-side.
- */
- public void onConnectionClosedByServer(Status status) { }
-
- /**
- * Called when the TransportSet's in-use state has changed to true, which means at least one
- * transport is in use. This method is called under a lock thus externally synchronized. If the
- * return value is non-{@code null}, the runnable will be executed after releasing the lock.
- */
- @CheckReturnValue
- public Runnable onInUse(TransportSet ts) {
- return null;
- }
-
- /**
- * Called when the TransportSet's in-use state has changed to false, which means no transport is
- * in use. This method is called under a lock thus externally synchronized.
- */
- public void onNotInUse(TransportSet ts) { }
- }
-}
diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
deleted file mode 100644
index 8351617..0000000
--- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright 2016, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.util;
-
-import com.google.common.base.Supplier;
-import io.grpc.Attributes;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.ExperimentalApi;
-import io.grpc.LoadBalancer;
-import io.grpc.NameResolver;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import io.grpc.internal.RoundRobinServerList;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-
-
-/**
- * A {@link LoadBalancer} that provides round-robin load balancing mechanism over the
- * addresses from the {@link NameResolver}. The sub-lists received from the name resolver
- * are considered to be an {@link EquivalentAddressGroup} and each of these sub-lists is
- * what is then balanced across.
- */
-@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
-public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
-
- private static final RoundRobinLoadBalancerFactory instance = new RoundRobinLoadBalancerFactory();
-
- private RoundRobinLoadBalancerFactory() {
- }
-
- public static RoundRobinLoadBalancerFactory getInstance() {
- return instance;
- }
-
- @Override
- public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
- return new RoundRobinLoadBalancer<T>(tm);
- }
-
- private static class RoundRobinLoadBalancer<T> extends LoadBalancer<T> {
- private static final Status SHUTDOWN_STATUS =
- Status.UNAVAILABLE.augmentDescription("RoundRobinLoadBalancer has shut down");
-
- private final Object lock = new Object();
-
- @GuardedBy("lock")
- private RoundRobinServerList<T> addresses;
- @GuardedBy("lock")
- private InterimTransport<T> interimTransport;
- @GuardedBy("lock")
- private Status nameResolutionError;
- @GuardedBy("lock")
- private boolean closed;
-
- private final TransportManager<T> tm;
-
- private RoundRobinLoadBalancer(TransportManager<T> tm) {
- this.tm = tm;
- }
-
- @Override
- public T pickTransport(Attributes affinity) {
- final RoundRobinServerList<T> addressesCopy;
- synchronized (lock) {
- if (closed) {
- return tm.createFailingTransport(SHUTDOWN_STATUS);
- }
- if (addresses == null) {
- if (nameResolutionError != null) {
- return tm.createFailingTransport(nameResolutionError);
- }
- if (interimTransport == null) {
- interimTransport = tm.createInterimTransport();
- }
- return interimTransport.transport();
- }
- addressesCopy = addresses;
- }
- return addressesCopy.getTransportForNextServer();
- }
-
- @Override
- public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
- Attributes attributes) {
- final InterimTransport<T> savedInterimTransport;
- final RoundRobinServerList<T> addressesCopy;
- synchronized (lock) {
- if (closed) {
- return;
- }
- addresses = new RoundRobinServerList.Builder<T>(tm).addAll(
- resolvedServerInfoGroupToEquivalentAddressGroup(updatedServers)).build();
- addressesCopy = addresses;
- nameResolutionError = null;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithRealTransports(new Supplier<T>() {
- @Override public T get() {
- return addressesCopy.getTransportForNextServer();
- }
- });
- }
- }
-
- @Override
- public void handleNameResolutionError(Status error) {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (closed) {
- return;
- }
- error = error.augmentDescription("Name resolution failed");
- savedInterimTransport = interimTransport;
- interimTransport = null;
- nameResolutionError = error;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(error);
- }
- }
-
- @Override
- public void shutdown() {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (closed) {
- return;
- }
- closed = true;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(SHUTDOWN_STATUS);
- }
- }
-
- private static List<EquivalentAddressGroup> resolvedServerInfoGroupToEquivalentAddressGroup(
- List<ResolvedServerInfoGroup> groupList) {
- List<EquivalentAddressGroup> addrs = new ArrayList<EquivalentAddressGroup>(groupList.size());
- for (ResolvedServerInfoGroup group : groupList) {
- addrs.add(group.toEquivalentAddressGroup());
- }
- return addrs;
- }
- }
-}
diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java
index a3d9e6a..072a531 100644
--- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java
+++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java
@@ -41,7 +41,6 @@
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
-import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer2;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.Subchannel;
@@ -65,7 +64,7 @@
import javax.annotation.concurrent.GuardedBy;
/**
- * A {@link LoadBalancer} that provides round-robin load balancing mechanism over the
+ * A {@link LoadBalancer2} that provides round-robin load balancing mechanism over the
* addresses from the {@link NameResolver}. The sub-lists received from the name resolver
* are considered to be an {@link EquivalentAddressGroup} and each of these sub-lists is
* what is then balanced across.
diff --git a/core/src/test/java/io/grpc/PickFirstBalancerTest.java b/core/src/test/java/io/grpc/PickFirstBalancerTest.java
deleted file mode 100644
index 5ae2de6..0000000
--- a/core/src/test/java/io/grpc/PickFirstBalancerTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc;
-
-import static org.junit.Assert.assertSame;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import io.grpc.TransportManager.InterimTransport;
-import java.net.SocketAddress;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit test for {@link PickFirstBalancerFactory}. */
-@RunWith(JUnit4.class)
-public class PickFirstBalancerTest {
- private LoadBalancer<Transport> loadBalancer;
-
- private List<ResolvedServerInfoGroup> servers;
- private EquivalentAddressGroup addressGroup;
-
- @Mock private TransportManager<Transport> mockTransportManager;
- @Mock private Transport mockTransport;
- @Mock private InterimTransport<Transport> mockInterimTransport;
- @Mock private Transport mockInterimTransportAsTransport;
- @Captor private ArgumentCaptor<Supplier<Transport>> transportSupplierCaptor;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- loadBalancer = PickFirstBalancerFactory.getInstance().newLoadBalancer(
- "fakeservice", mockTransportManager);
- servers = Lists.newArrayList();
- List<ResolvedServerInfo> resolvedServerInfoList = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- resolvedServerInfoList.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i)));
- }
- ResolvedServerInfoGroup resolvedServerInfoGroup = ResolvedServerInfoGroup.builder().addAll(
- resolvedServerInfoList).build();
- servers.add(resolvedServerInfoGroup);
- addressGroup = resolvedServerInfoGroup.toEquivalentAddressGroup();
- when(mockTransportManager.getTransport(eq(addressGroup))).thenReturn(mockTransport);
- when(mockTransportManager.createInterimTransport()).thenReturn(mockInterimTransport);
- when(mockInterimTransport.transport()).thenReturn(mockInterimTransportAsTransport);
- }
-
- @Test
- public void pickBeforeResolved() throws Exception {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
- verify(mockInterimTransport).closeWithRealTransports(transportSupplierCaptor.capture());
- for (int i = 0; i < 2; i++) {
- assertSame(mockTransport, transportSupplierCaptor.getValue().get());
- }
- verify(mockTransportManager, times(2)).getTransport(eq(addressGroup));
- verifyNoMoreInteractions(mockTransportManager);
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickBeforeNameResolutionError() {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.handleNameResolutionError(Status.UNAVAILABLE);
- verify(mockInterimTransport).closeWithError(any(Status.class));
- // Ensure a shutdown after error closes without incident
- loadBalancer.shutdown();
- // Ensure a name resolution error after shutdown does nothing
- loadBalancer.handleNameResolutionError(Status.UNAVAILABLE);
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickBeforeShutdown() {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.shutdown();
- verify(mockInterimTransport).closeWithError(any(Status.class));
- // Ensure double shutdown just returns immediately without closing again.
- loadBalancer.shutdown();
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickAfterResolved() throws Exception {
- loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
- Transport t = loadBalancer.pickTransport(null);
- assertSame(mockTransport, t);
- verify(mockTransportManager).getTransport(addressGroup);
- verifyNoMoreInteractions(mockTransportManager);
- }
-
- private static class FakeSocketAddress extends SocketAddress {
- final String name;
-
- FakeSocketAddress(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return "FakeSocketAddress-" + name;
- }
- }
-
- private static class Transport {}
-}
diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
index 3413e77..c825a5f 100644
--- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
@@ -78,11 +78,11 @@
builder.getIdleTimeoutMillis());
builder.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS);
- assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
+ assertEquals(ManagedChannelImpl2.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
builder.idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS,
TimeUnit.DAYS);
- assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
+ assertEquals(ManagedChannelImpl2.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
try {
builder.idleTimeout(0, TimeUnit.SECONDS);
diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
deleted file mode 100644
index 189f159..0000000
--- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.common.base.Supplier;
-import io.grpc.Attributes;
-import io.grpc.CallOptions;
-import io.grpc.IntegerMarshaller;
-import io.grpc.LoadBalancer2.PickResult;
-import io.grpc.LoadBalancer2.SubchannelPicker;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.Status;
-import io.grpc.StringMarshaller;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.Executor;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Unit tests for {@link DelayedClientTransport}.
- */
-@RunWith(JUnit4.class)
-public class DelayedClientTransportTest {
- @Mock private ManagedClientTransport.Listener transportListener;
- @Mock private ClientTransport mockRealTransport;
- @Mock private ClientTransport mockRealTransport2;
- @Mock private ClientStream mockRealStream;
- @Mock private ClientStream mockRealStream2;
- @Mock private ClientStreamListener streamListener;
- @Mock private ClientTransport.PingCallback pingCallback;
- @Mock private Executor mockExecutor;
- @Captor private ArgumentCaptor<Status> statusCaptor;
- @Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
-
- private static final Attributes.Key<Integer> SHARD_ID = Attributes.Key.of("shard-id");
-
- private final MethodDescriptor<String, Integer> method =
- MethodDescriptor.<String, Integer>newBuilder()
- .setType(MethodType.UNKNOWN)
- .setFullMethodName("/service/method")
- .setRequestMarshaller(new StringMarshaller())
- .setResponseMarshaller(new IntegerMarshaller())
- .build();
- private final MethodDescriptor<String, Integer> method2 =
- method.toBuilder().setFullMethodName("/service/method2").build();
- private final Metadata headers = new Metadata();
- private final Metadata headers2 = new Metadata();
-
- private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
- private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
- private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
- method.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
- GrpcUtil.STOPWATCH_SUPPLIER);
- private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
- method2.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
- GrpcUtil.STOPWATCH_SUPPLIER);
-
- private final FakeClock fakeExecutor = new FakeClock();
-
- private final DelayedClientTransport delayedTransport =
- new DelayedClientTransport(fakeExecutor.getScheduledExecutorService());
-
- @Before public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(mockRealTransport.newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx)))
- .thenReturn(mockRealStream);
- when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2),
- same(statsTraceCtx2)))
- .thenReturn(mockRealStream2);
- delayedTransport.start(transportListener);
- }
-
- @After public void noMorePendingTasks() {
- assertEquals(0, fakeExecutor.numPendingTasks());
- }
-
- @Test public void transportsAreUsedInOrder() {
- delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
- delayedTransport.newStream(method2, headers2, callOptions2, statsTraceCtx2);
- assertEquals(0, fakeExecutor.numPendingTasks());
- delayedTransport.setTransportSupplier(new Supplier<ClientTransport>() {
- final Iterator<ClientTransport> it =
- Arrays.asList(mockRealTransport, mockRealTransport2).iterator();
-
- @Override public ClientTransport get() {
- return it.next();
- }
- });
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx));
- verify(mockRealTransport2).newStream(same(method2), same(headers2), same(callOptions2),
- same(statsTraceCtx2));
- }
-
- @Test public void streamStartThenSetTransport() {
- assertFalse(delayedTransport.hasPendingStreams());
- ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
- stream.start(streamListener);
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- assertTrue(delayedTransport.hasPendingStreams());
- assertTrue(stream instanceof DelayedStream);
- assertEquals(0, fakeExecutor.numPendingTasks());
- delayedTransport.setTransport(mockRealTransport);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- assertFalse(delayedTransport.hasPendingStreams());
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx));
- verify(mockRealStream).start(listenerCaptor.capture());
- verifyNoMoreInteractions(streamListener);
- listenerCaptor.getValue().onReady();
- verify(streamListener).onReady();
- verifyNoMoreInteractions(streamListener);
- }
-
- @Test public void newStreamThenSetTransportThenShutdown() {
- ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- assertTrue(stream instanceof DelayedStream);
- delayedTransport.setTransport(mockRealTransport);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx));
- stream.start(streamListener);
- verify(mockRealStream).start(same(streamListener));
- }
-
- @Test public void transportTerminatedThenSetTransport() {
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- delayedTransport.setTransport(mockRealTransport);
- verifyNoMoreInteractions(transportListener);
- }
-
- @Test public void setTransportThenShutdownThenNewStream() {
- delayedTransport.setTransport(mockRealTransport);
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- stream.start(streamListener);
- assertFalse(stream instanceof DelayedStream);
- verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx));
- verify(mockRealStream).start(same(streamListener));
- }
-
- @Test public void setTransportThenShutdownNowThenNewStream() {
- delayedTransport.setTransport(mockRealTransport);
- delayedTransport.shutdownNow(Status.UNAVAILABLE);
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- stream.start(streamListener);
- assertFalse(stream instanceof DelayedStream);
- verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
- same(statsTraceCtx));
- verify(mockRealStream).start(same(streamListener));
- }
-
- @Test public void cancelStreamWithoutSetTransport() {
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- stream.cancel(Status.CANCELLED);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verifyNoMoreInteractions(mockRealTransport);
- verifyNoMoreInteractions(mockRealStream);
- }
-
- @Test public void startThenCancelStreamWithoutSetTransport() {
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- stream.start(streamListener);
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- stream.cancel(Status.CANCELLED);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
- verifyNoMoreInteractions(mockRealTransport);
- verifyNoMoreInteractions(mockRealStream);
- }
-
- @Test public void newStreamThenShutdownTransportThenCancelStream() {
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener, times(0)).transportTerminated();
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- stream.cancel(Status.CANCELLED);
- verify(transportListener).transportTerminated();
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verifyNoMoreInteractions(mockRealTransport);
- verifyNoMoreInteractions(mockRealStream);
- }
-
- @Test public void setTransportThenShutdownThenPing() {
- delayedTransport.setTransport(mockRealTransport);
- delayedTransport.shutdown();
- delayedTransport.ping(pingCallback, mockExecutor);
- verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
- }
-
- @Test public void pingThenSetTransport() {
- delayedTransport.ping(pingCallback, mockExecutor);
- delayedTransport.setTransport(mockRealTransport);
- verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
- }
-
- @Test public void shutdownThenPing() {
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- delayedTransport.ping(pingCallback, mockExecutor);
- verifyNoMoreInteractions(pingCallback);
- ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(mockExecutor).execute(runnableCaptor.capture());
- runnableCaptor.getValue().run();
- verify(pingCallback).onFailure(any(Throwable.class));
- }
-
- @Test public void shutdownThenNewStream() {
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- stream.start(streamListener);
- verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
- assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
- }
-
- @Test public void startStreamThenShutdownNow() {
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- stream.start(streamListener);
- delayedTransport.shutdownNow(Status.UNAVAILABLE);
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
- assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
- }
-
- @Test public void shutdownNowThenNewStream() {
- delayedTransport.shutdownNow(Status.UNAVAILABLE);
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener).transportTerminated();
- ClientStream stream = delayedTransport.newStream(method, new Metadata());
- stream.start(streamListener);
- verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
- assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
- }
-
- @Test public void startBackOff_ClearsFailFastPendingStreams() {
- final Status cause = Status.UNAVAILABLE.withDescription("some error when connecting");
- final CallOptions failFastCallOptions = CallOptions.DEFAULT;
- final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady();
- final ClientStream ffStream = delayedTransport.newStream(method, headers, failFastCallOptions,
- statsTraceCtx);
- ffStream.start(streamListener);
- delayedTransport.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- delayedTransport.newStream(method, headers, failFastCallOptions, statsTraceCtx);
- assertEquals(3, delayedTransport.getPendingStreamsCount());
-
- delayedTransport.startBackoff(cause);
- assertTrue(delayedTransport.isInBackoffPeriod());
- assertEquals(1, delayedTransport.getPendingStreamsCount());
-
- // Fail fast stream not failed yet.
- verify(streamListener, never()).closed(any(Status.class), any(Metadata.class));
-
- fakeExecutor.runDueTasks();
- // Now fail fast stream failed.
- verify(streamListener).closed(same(cause), any(Metadata.class));
- }
-
- @Test public void startBackOff_FailsFutureFailFastStreams() {
- final Status cause = Status.UNAVAILABLE.withDescription("some error when connecting");
- final CallOptions failFastCallOptions = CallOptions.DEFAULT;
- final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady();
- delayedTransport.startBackoff(cause);
- assertTrue(delayedTransport.isInBackoffPeriod());
-
- final ClientStream ffStream = delayedTransport.newStream(method, headers, failFastCallOptions,
- statsTraceCtx);
- ffStream.start(streamListener);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
- assertEquals(cause, Status.fromThrowable(statusCaptor.getValue().getCause()));
-
- delayedTransport.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- assertEquals(1, delayedTransport.getPendingStreamsCount());
- }
-
- @Test public void startBackoff_DoNothingIfAlreadyShutDown() {
- delayedTransport.shutdown();
-
- final Status cause = Status.UNAVAILABLE.withDescription("some error when connecting");
- delayedTransport.startBackoff(cause);
-
- assertFalse(delayedTransport.isInBackoffPeriod());
- }
-
- @Test public void reprocess() {
- Attributes affinity1 = Attributes.newBuilder().set(SHARD_ID, 1).build();
- Attributes affinity2 = Attributes.newBuilder().set(SHARD_ID, 2).build();
- CallOptions failFastCallOptions = CallOptions.DEFAULT.withAffinity(affinity1);
- CallOptions waitForReadyCallOptions =
- CallOptions.DEFAULT.withWaitForReady().withAffinity(affinity2);
-
- SubchannelImpl subchannel1 = mock(SubchannelImpl.class);
- SubchannelImpl subchannel2 = mock(SubchannelImpl.class);
- SubchannelImpl subchannel3 = mock(SubchannelImpl.class);
- when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
- any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream);
- when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class),
- any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream2);
- when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport);
- when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2);
- when(subchannel3.obtainActiveTransport()).thenReturn(null);
-
- // Fail-fast streams
- DelayedStream ff1 = (DelayedStream) delayedTransport.newStream(
- method, headers, failFastCallOptions, statsTraceCtx);
- verify(transportListener).transportInUse(true);
- DelayedStream ff2 = (DelayedStream) delayedTransport.newStream(
- method2, headers2, failFastCallOptions, statsTraceCtx);
- DelayedStream ff3 = (DelayedStream) delayedTransport.newStream(
- method, headers, failFastCallOptions, statsTraceCtx);
- DelayedStream ff4 = (DelayedStream) delayedTransport.newStream(
- method2, headers2, failFastCallOptions, statsTraceCtx);
-
- // Wait-for-ready streams
- FakeClock wfr3Executor = new FakeClock();
- DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream(
- method, headers, waitForReadyCallOptions, statsTraceCtx);
- DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream(
- method2, headers2, waitForReadyCallOptions, statsTraceCtx);
- DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream(
- method, headers,
- waitForReadyCallOptions.withExecutor(wfr3Executor.getScheduledExecutorService()),
- statsTraceCtx);
- DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
- method2, headers2, waitForReadyCallOptions, statsTraceCtx);
- assertEquals(8, delayedTransport.getPendingStreamsCount());
-
- // First reprocess(). Some will proceed, some will fail and the rest will stay buffered.
- SubchannelPicker picker = mock(SubchannelPicker.class);
- when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
- // For the fail-fast streams
- PickResult.withSubchannel(subchannel1), // ff1: proceed
- PickResult.withError(Status.UNAVAILABLE), // ff2: fail
- PickResult.withSubchannel(subchannel3), // ff3: stay
- PickResult.withNoResult(), // ff4: stay
- // For the wait-for-ready streams
- PickResult.withSubchannel(subchannel2), // wfr1: proceed
- PickResult.withError(Status.RESOURCE_EXHAUSTED), // wfr2: stay
- PickResult.withSubchannel(subchannel3), // wfr3: stay
- PickResult.withNoResult()); // wfr4: stay
- InOrder inOrder = inOrder(picker);
- delayedTransport.reprocess(picker);
-
- assertEquals(5, delayedTransport.getPendingStreamsCount());
- inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff1
- inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff2
- inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3
- inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4
- inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr1
- inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2
- inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3
- inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4
- inOrder.verifyNoMoreInteractions();
- // Make sure that real transport creates streams in the executor
- verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class),
- any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
- verify(mockRealTransport2, never()).newStream(any(MethodDescriptor.class),
- any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
- fakeExecutor.runDueTasks();
- assertEquals(0, fakeExecutor.numPendingTasks());
- // ff1 and wfr1 went through
- verify(mockRealTransport).newStream(method, headers, failFastCallOptions, statsTraceCtx);
- verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- assertSame(mockRealStream, ff1.getRealStream());
- assertSame(mockRealStream2, wfr1.getRealStream());
- // The ff2 has failed due to picker returning an error
- assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError());
- // Other streams are still buffered
- assertNull(ff3.getRealStream());
- assertNull(ff4.getRealStream());
- assertNull(wfr2.getRealStream());
- assertNull(wfr3.getRealStream());
- assertNull(wfr4.getRealStream());
-
- // Second reprocess(). All will proceed.
- picker = mock(SubchannelPicker.class);
- when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
- PickResult.withSubchannel(subchannel1), // ff3
- PickResult.withSubchannel(subchannel2), // ff4
- PickResult.withSubchannel(subchannel2), // wfr2
- PickResult.withSubchannel(subchannel1), // wfr3
- PickResult.withSubchannel(subchannel2)); // wfr4
- inOrder = inOrder(picker);
- assertEquals(0, wfr3Executor.numPendingTasks());
- verify(transportListener, never()).transportInUse(false);
-
- delayedTransport.reprocess(picker);
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verify(transportListener).transportInUse(false);
- inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3
- inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4
- inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2
- inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3
- inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4
- inOrder.verifyNoMoreInteractions();
- fakeExecutor.runDueTasks();
- assertEquals(0, fakeExecutor.numPendingTasks());
- assertSame(mockRealStream, ff3.getRealStream());
- assertSame(mockRealStream2, ff4.getRealStream());
- assertSame(mockRealStream2, wfr2.getRealStream());
- assertSame(mockRealStream2, wfr4.getRealStream());
-
- // If there is an executor in the CallOptions, it will be used to create the real tream.
- assertNull(wfr3.getRealStream());
- wfr3Executor.runDueTasks();
- assertSame(mockRealStream, wfr3.getRealStream());
-
- // Make sure the delayed transport still accepts new streams
- DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream(
- method, headers, waitForReadyCallOptions, statsTraceCtx);
- assertNull(wfr5.getRealStream());
- assertEquals(1, delayedTransport.getPendingStreamsCount());
-
- // wfr5 will stop delayed transport from terminating
- delayedTransport.shutdown();
- verify(transportListener).transportShutdown(any(Status.class));
- verify(transportListener, never()).transportTerminated();
- // ... until it's gone
- delayedTransport.reprocess(picker);
- fakeExecutor.runDueTasks();
- assertEquals(0, delayedTransport.getPendingStreamsCount());
- verify(transportListener).transportTerminated();
- }
-
- @Test
- public void reprocess_NoPendingStream() {
- SubchannelPicker picker = mock(SubchannelPicker.class);
- delayedTransport.reprocess(picker);
- verifyNoMoreInteractions(picker);
- verifyNoMoreInteractions(transportListener);
- }
-}
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplGetNameResolverTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplGetNameResolverTest.java
index cd6d599..3e57b62 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplGetNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplGetNameResolverTest.java
@@ -43,7 +43,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Unit tests for {@link ManagedChannelImpl#getNameResolver}. */
+/** Unit tests for {@link ManagedChannelImpl2#getNameResolver}. */
@RunWith(JUnit4.class)
public class ManagedChannelImplGetNameResolverTest {
private static final Attributes NAME_RESOLVER_PARAMS =
@@ -121,7 +121,7 @@
}
};
try {
- ManagedChannelImpl.getNameResolver(
+ ManagedChannelImpl2.getNameResolver(
"foo.googleapis.com:8080", nameResolverFactory, NAME_RESOLVER_PARAMS);
fail("Should fail");
} catch (IllegalArgumentException e) {
@@ -131,7 +131,7 @@
private void testValidTarget(String target, String expectedUriString, URI expectedUri) {
Factory nameResolverFactory = new FakeNameResolverFactory(expectedUri.getScheme());
- FakeNameResolver nameResolver = (FakeNameResolver) ManagedChannelImpl.getNameResolver(
+ FakeNameResolver nameResolver = (FakeNameResolver) ManagedChannelImpl2.getNameResolver(
target, nameResolverFactory, NAME_RESOLVER_PARAMS);
assertNotNull(nameResolver);
assertEquals(expectedUri, nameResolver.uri);
@@ -142,7 +142,7 @@
Factory nameResolverFactory = new FakeNameResolverFactory("dns");
try {
- FakeNameResolver nameResolver = (FakeNameResolver) ManagedChannelImpl.getNameResolver(
+ FakeNameResolver nameResolver = (FakeNameResolver) ManagedChannelImpl2.getNameResolver(
target, nameResolverFactory, NAME_RESOLVER_PARAMS);
fail("Should have failed, but got resolver with " + nameResolver.uri);
} catch (IllegalArgumentException e) {
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
deleted file mode 100644
index 6b86caa..0000000
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Copyright 2016, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.Lists;
-import io.grpc.Attributes;
-import io.grpc.CallOptions;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.CompressorRegistry;
-import io.grpc.DecompressorRegistry;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.IntegerMarshaller;
-import io.grpc.LoadBalancer;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.NameResolver;
-import io.grpc.ResolvedServerInfo;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.StringMarshaller;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.OobTransportProvider;
-import io.grpc.internal.TestUtils.MockClientTransportInfo;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Unit tests for {@link ManagedChannelImpl}'s idle mode.
- */
-@RunWith(JUnit4.class)
-public class ManagedChannelImplIdlenessTest {
- private final FakeClock timer = new FakeClock();
- private final FakeClock executor = new FakeClock();
- private static final String AUTHORITY = "fakeauthority";
- private static final String USER_AGENT = "fakeagent";
- private static final long IDLE_TIMEOUT_SECONDS = 30;
- private ManagedChannelImpl channel;
-
- private final MethodDescriptor<String, Integer> method =
- MethodDescriptor.<String, Integer>newBuilder()
- .setType(MethodType.UNKNOWN)
- .setFullMethodName("/service/method")
- .setRequestMarshaller(new StringMarshaller())
- .setResponseMarshaller(new IntegerMarshaller())
- .build();
- private final List<ResolvedServerInfoGroup> servers = Lists.newArrayList();
- private final List<EquivalentAddressGroup> addressGroupList =
- new ArrayList<EquivalentAddressGroup>();
-
- @Mock private SharedResourceHolder.Resource<ScheduledExecutorService> timerService;
- @Mock private ClientTransportFactory mockTransportFactory;
- @Mock private LoadBalancer<ClientTransport> mockLoadBalancer;
- @Mock private LoadBalancer.Factory mockLoadBalancerFactory;
- @Mock private NameResolver mockNameResolver;
- @Mock private NameResolver.Factory mockNameResolverFactory;
- @Mock private ClientCall.Listener<Integer> mockCallListener;
- @Captor private ArgumentCaptor<NameResolver.Listener> nameResolverListenerCaptor;
- private BlockingQueue<MockClientTransportInfo> newTransports;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(timerService.create()).thenReturn(timer.getScheduledExecutorService());
- when(mockLoadBalancerFactory
- .newLoadBalancer(anyString(), Matchers.<TransportManager<ClientTransport>>any()))
- .thenReturn(mockLoadBalancer);
- when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY);
- when(mockNameResolverFactory
- .newNameResolver(any(URI.class), any(Attributes.class)))
- .thenReturn(mockNameResolver);
-
- channel = new ManagedChannelImpl("fake://target", new FakeBackoffPolicyProvider(),
- mockNameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
- mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
- CompressorRegistry.getDefaultInstance(), timerService, timer.getStopwatchSupplier(),
- TimeUnit.SECONDS.toMillis(IDLE_TIMEOUT_SECONDS),
- executor.getScheduledExecutorService(), USER_AGENT,
- Collections.<ClientInterceptor>emptyList(),
- NoopStatsContextFactory.INSTANCE);
- newTransports = TestUtils.captureTransports(mockTransportFactory);
-
- for (int i = 0; i < 2; i++) {
- ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder();
- for (int j = 0; j < 2; j++) {
- resolvedServerInfoGroup.add(
- new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j)));
- }
- servers.add(resolvedServerInfoGroup.build());
- addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup());
- }
- verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
- // Verify the initial idleness
- verify(mockLoadBalancerFactory, never()).newLoadBalancer(
- anyString(), Matchers.<TransportManager<ClientTransport>>any());
- verify(mockTransportFactory, never()).newClientTransport(
- any(SocketAddress.class), anyString(), anyString());
- verify(mockNameResolver, never()).start(any(NameResolver.Listener.class));
- }
-
- @After
- public void allPendingTasksAreRun() {
- assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks());
- assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
- }
-
- @Test
- public void newCallExitsIdleness() throws Exception {
- final EquivalentAddressGroup addressGroup = addressGroupList.get(1);
- doAnswer(new Answer<ClientTransport>() {
- @Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- return channel.tm.getTransport(addressGroup);
- }
- }).when(mockLoadBalancer).pickTransport(any(Attributes.class));
-
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
-
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), same(channel.tm));
- // NameResolver is started in the scheduled executor
- timer.runDueTasks();
- verify(mockNameResolver).start(nameResolverListenerCaptor.capture());
-
- // LoadBalancer is used right after created.
- verify(mockLoadBalancer).pickTransport(any(Attributes.class));
- verify(mockTransportFactory).newClientTransport(
- addressGroup.getAddresses().get(0), AUTHORITY, USER_AGENT);
-
- // Simulate new address resolved
- nameResolverListenerCaptor.getValue().onUpdate(servers, Attributes.EMPTY);
- verify(mockLoadBalancer).handleResolvedAddresses(servers, Attributes.EMPTY);
- }
-
- @Test
- public void newCallResetsGracePeriod() throws Exception {
- final EquivalentAddressGroup addressGroup = addressGroupList.get(1);
- doAnswer(new Answer<ClientTransport>() {
- @Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- return channel.tm.getTransport(addressGroup);
- }
- }).when(mockLoadBalancer).pickTransport(any(Attributes.class));
-
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- call.cancel("cleanup", null);
- executor.runDueTasks();
-
- timer.runDueTasks();
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), same(channel.tm));
- verify(mockLoadBalancer).pickTransport(any(Attributes.class));
-
- // Enter grace period
- timer.forwardTime(TimeUnit.SECONDS.toMillis(IDLE_TIMEOUT_SECONDS)
- - ManagedChannelImpl.IDLE_GRACE_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
- assertTrue(channel.isInIdleGracePeriod());
-
- call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- assertFalse(channel.isInIdleGracePeriod());
- call.cancel("cleanup", null);
- executor.runDueTasks();
-
- // Load balancer was reused.
- timer.runDueTasks();
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), same(channel.tm));
- verify(mockLoadBalancer, times(2)).pickTransport(any(Attributes.class));
-
- // Now just let time pass to allow the original idle time to be well past expired.
- timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
-
- call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
-
- // Load balancer was reused; the idle time period must have been reset.
- timer.runDueTasks();
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), same(channel.tm));
- verify(mockLoadBalancer, times(3)).pickTransport(any(Attributes.class));
- }
-
- @Test
- public void shutdownDuringGracePeriodShutdownLb() throws Exception {
- forceExitIdleMode();
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), same(channel.tm));
- // Enter grace period
- timer.forwardTime(TimeUnit.SECONDS.toMillis(IDLE_TIMEOUT_SECONDS)
- - ManagedChannelImpl.IDLE_GRACE_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
- verify(mockLoadBalancer, never()).shutdown();
- channel.shutdown();
- verify(mockLoadBalancer).shutdown();
- }
-
- @Test
- public void enterIdleModeAfterForceExit() throws Exception {
- forceExitIdleMode();
-
- // Trigger the creation of TransportSets
- for (EquivalentAddressGroup addressGroup : addressGroupList) {
- channel.tm.getTransport(addressGroup);
- verify(mockTransportFactory).newClientTransport(
- addressGroup.getAddresses().get(0), AUTHORITY, USER_AGENT);
- }
- ArrayList<MockClientTransportInfo> transports = new ArrayList<MockClientTransportInfo>();
- newTransports.drainTo(transports);
- assertEquals(addressGroupList.size(), transports.size());
-
- channel.tm.createInterimTransport();
-
- // Without actually using these transports, will eventually enter idle mode
- walkIntoIdleMode(transports);
- }
-
- @Test
- public void interimTransportHoldsOffIdleness() throws Exception {
- doAnswer(new Answer<ClientTransport>() {
- @Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- return channel.tm.createInterimTransport().transport();
- }
- }).when(mockLoadBalancer).pickTransport(any(Attributes.class));
-
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- assertTrue(channel.inUseStateAggregator.isInUse());
- // NameResolver is started in the scheduled executor
- timer.runDueTasks();
-
- // As long as the interim transport is in-use (by the pending RPC), the channel won't go idle.
- timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
- assertTrue(channel.inUseStateAggregator.isInUse());
-
- // Cancelling the only RPC will reset the in-use state.
- assertEquals(0, executor.numPendingTasks());
- call.cancel("In test", null);
- assertEquals(1, executor.runDueTasks());
- assertFalse(channel.inUseStateAggregator.isInUse());
- // And allow the channel to go idle.
- walkIntoIdleMode(Collections.<MockClientTransportInfo>emptyList());
- }
-
- @Test
- public void realTransportsHoldsOffIdleness() throws Exception {
- final EquivalentAddressGroup addressGroup = addressGroupList.get(1);
- doAnswer(new Answer<ClientTransport>() {
- @Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- return channel.tm.getTransport(addressGroup);
- }
- }).when(mockLoadBalancer).pickTransport(any(Attributes.class));
-
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
-
- // A TransportSet is in-use, while the stream is pending in a delayed transport
- assertTrue(channel.inUseStateAggregator.isInUse());
- // NameResolver is started in the scheduled executor
- timer.runDueTasks();
-
- // Making the real transport ready, will release the delayed transport.
- // The TransportSet is *not* in-use before the real transport become in-use.
- MockClientTransportInfo t0 = newTransports.poll();
- assertEquals(0, executor.numPendingTasks());
- t0.listener.transportReady();
- // Real streams are started in the executor
- assertEquals(1, executor.runDueTasks());
- assertFalse(channel.inUseStateAggregator.isInUse());
- t0.listener.transportInUse(true);
- assertTrue(channel.inUseStateAggregator.isInUse());
-
- // As long as the transport is in-use, the channel won't go idle.
- timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
-
- t0.listener.transportInUse(false);
- assertFalse(channel.inUseStateAggregator.isInUse());
- // And allow the channel to go idle.
- walkIntoIdleMode(Arrays.asList(t0));
- }
-
- @Test
- public void idlenessDecommissionsTransports() throws Exception {
- EquivalentAddressGroup addressGroup = addressGroupList.get(0);
- forceExitIdleMode();
-
- channel.tm.getTransport(addressGroup);
- MockClientTransportInfo t0 = newTransports.poll();
- t0.listener.transportReady();
- assertSame(t0.transport, channelTmGetTransportUnwrapped(addressGroup));
-
- walkIntoIdleMode(Arrays.asList(t0));
- verify(t0.transport).shutdown();
-
- forceExitIdleMode();
- channel.tm.getTransport(addressGroup);
- MockClientTransportInfo t1 = newTransports.poll();
- t1.listener.transportReady();
-
- assertSame(t1.transport, channelTmGetTransportUnwrapped(addressGroup));
- assertNotSame(t0.transport, channelTmGetTransportUnwrapped(addressGroup));
-
- channel.shutdown();
- verify(t1.transport).shutdown();
- channel.shutdownNow();
- verify(t0.transport).shutdownNow(any(Status.class));
- verify(t1.transport).shutdownNow(any(Status.class));
-
- t1.listener.transportTerminated();
- assertFalse(channel.isTerminated());
- t0.listener.transportTerminated();
- assertTrue(channel.isTerminated());
- }
-
- @Test
- public void loadBalancerShouldNotCreateConnectionsWhenIdle() throws Exception {
- // Acts as a misbehaving LoadBalancer that tries to create connections when channel is in idle,
- // which means the LoadBalancer is supposedly shutdown.
- assertSame(ManagedChannelImpl.IDLE_MODE_TRANSPORT,
- channel.tm.getTransport(addressGroupList.get(0)));
- OobTransportProvider<ClientTransport> oobProvider =
- channel.tm.createOobTransportProvider(addressGroupList.get(0), AUTHORITY);
- assertSame(ManagedChannelImpl.IDLE_MODE_TRANSPORT, oobProvider.get());
- oobProvider.close();
- verify(mockTransportFactory, never()).newClientTransport(
- any(SocketAddress.class), anyString(), anyString());
- // We don't care for delayed (interim) transports, because they don't create connections.
- }
-
- private void walkIntoIdleMode(Collection<MockClientTransportInfo> currentTransports) {
- timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS);
- verify(mockLoadBalancer, never()).shutdown();
- verify(mockNameResolver, never()).shutdown();
- for (MockClientTransportInfo transport : currentTransports) {
- verify(transport.transport, never()).shutdown();
- }
- timer.forwardTime(1, TimeUnit.SECONDS);
- verify(mockLoadBalancer).shutdown();
- verify(mockNameResolver).shutdown();
- for (MockClientTransportInfo transport : currentTransports) {
- verify(transport.transport).shutdown();
- }
- }
-
- private void forceExitIdleMode() {
- channel.exitIdleModeAndGetLb();
- }
-
- private ClientTransport channelTmGetTransportUnwrapped(EquivalentAddressGroup addressGroup) {
- return ((ForwardingConnectionClientTransport) channel.tm.getTransport(addressGroup)).delegate();
- }
-
- private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
- @Override
- public BackoffPolicy get() {
- return new BackoffPolicy() {
- @Override
- public long nextBackoffMillis() {
- return 1;
- }
- };
- }
- }
-
- private static class FakeSocketAddress extends SocketAddress {
- final String name;
-
- FakeSocketAddress(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return "FakeSocketAddress-" + name;
- }
- }
-}
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
deleted file mode 100644
index f03a33c..0000000
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ /dev/null
@@ -1,1082 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import io.grpc.Attributes;
-import io.grpc.CallCredentials;
-import io.grpc.CallCredentials.MetadataApplier;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.Compressor;
-import io.grpc.CompressorRegistry;
-import io.grpc.Context;
-import io.grpc.DecompressorRegistry;
-import io.grpc.IntegerMarshaller;
-import io.grpc.LoadBalancer;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.NameResolver;
-import io.grpc.PickFirstBalancerFactory;
-import io.grpc.ResolvedServerInfo;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.SecurityLevel;
-import io.grpc.Status;
-import io.grpc.StringMarshaller;
-import io.grpc.TransportManager;
-import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/** Unit tests for {@link ManagedChannelImpl}. */
-@RunWith(JUnit4.class)
-public class ManagedChannelImplTest {
- private static final List<ClientInterceptor> NO_INTERCEPTOR =
- Collections.<ClientInterceptor>emptyList();
- private static final Attributes NAME_RESOLVER_PARAMS =
- Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
- private final MethodDescriptor<String, Integer> method =
- MethodDescriptor.<String, Integer>newBuilder()
- .setType(MethodType.UNKNOWN)
- .setFullMethodName("/service/method")
- .setRequestMarshaller(new StringMarshaller())
- .setResponseMarshaller(new IntegerMarshaller())
- .build();
- private final String serviceName = "fake.example.com";
- private final String authority = serviceName;
- private final String userAgent = "userAgent";
- private final String target = "fake://" + serviceName;
- private URI expectedUri;
- private final SocketAddress socketAddress = new SocketAddress() {};
- private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
- private final FakeClock timer = new FakeClock();
- private final FakeClock executor = new FakeClock();
- private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
- private SpyingLoadBalancerFactory loadBalancerFactory =
- new SpyingLoadBalancerFactory(PickFirstBalancerFactory.getInstance());
-
- @Rule public final ExpectedException thrown = ExpectedException.none();
-
- private ManagedChannelImpl channel;
- @Captor
- private ArgumentCaptor<Status> statusCaptor;
- @Captor
- private ArgumentCaptor<StatsTraceContext> statsTraceCtxCaptor;
- @Mock
- private ConnectionClientTransport mockTransport;
- @Mock
- private ClientTransportFactory mockTransportFactory;
- @Mock
- private ClientCall.Listener<Integer> mockCallListener;
- @Mock
- private ClientCall.Listener<Integer> mockCallListener2;
- @Mock
- private ClientCall.Listener<Integer> mockCallListener3;
- @Mock
- private SharedResourceHolder.Resource<ScheduledExecutorService> timerService;
- @Mock
- private CallCredentials creds;
-
- private ArgumentCaptor<ManagedClientTransport.Listener> transportListenerCaptor =
- ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
- private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
- ArgumentCaptor.forClass(ClientStreamListener.class);
-
- private void createChannel(
- NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
- channel = new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
- nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
- mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
- CompressorRegistry.getDefaultInstance(), timerService, timer.getStopwatchSupplier(),
- ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
- executor.getScheduledExecutorService(), userAgent, interceptors, statsCtxFactory);
- // Force-exit the initial idle-mode
- channel.exitIdleModeAndGetLb();
- }
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- expectedUri = new URI(target);
- when(mockTransportFactory.newClientTransport(
- any(SocketAddress.class), any(String.class), any(String.class)))
- .thenReturn(mockTransport);
- when(timerService.create()).thenReturn(timer.getScheduledExecutorService());
- }
-
- @After
- public void allPendingTasksAreRun() throws Exception {
- // The "never" verifications in the tests only hold up if all due tasks are done.
- // As for timer, although there may be scheduled tasks in a future time, since we don't test
- // any time-related behavior in this test suite, we only care the tasks that are due. This
- // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
- assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
- assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
- }
-
- /**
- * The counterpart of {@link ManagedChannelImplIdlenessTest#enterIdleModeAfterForceExit}.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void idleModeDisabled() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- assertEquals(1, loadBalancerFactory.balancers.size());
-
- // No task is scheduled to enter idle mode
- assertEquals(0, timer.numPendingTasks());
- assertEquals(0, executor.numPendingTasks());
- }
-
- @Test
- public void immediateDeadlineExceeded() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- ClientCall<String, Integer> call =
- channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
- call.start(mockCallListener, new Metadata());
- assertEquals(1, executor.runDueTasks());
-
- verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
- Status status = statusCaptor.getValue();
- assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
- }
-
- @Test
- public void shutdownWithNoTransportsEverCreated() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- verifyNoMoreInteractions(mockTransportFactory);
- channel.shutdown();
- assertTrue(channel.isShutdown());
- assertTrue(channel.isTerminated());
- }
-
- @Test
- public void twoCallsAndGracefulShutdown() {
- FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- verifyNoMoreInteractions(mockTransportFactory);
-
- // Create transport and call
- ClientStream mockStream = mock(ClientStream.class);
- Metadata headers = new Metadata();
- when(mockTransportFactory.newClientTransport(
- any(SocketAddress.class), any(String.class), any(String.class)))
- .thenReturn(mockTransport);
- when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class)))
- .thenReturn(mockStream);
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockTransportFactory)
- .newClientTransport(same(socketAddress), eq(authority), eq(userAgent));
- verify(mockTransport).start(transportListenerCaptor.capture());
- ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
- transportListener.transportReady();
- executor.runDueTasks();
-
- verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- statsTraceCtxCaptor.capture());
- assertEquals(statsCtxFactory.pollContextOrFail(),
- statsTraceCtxCaptor.getValue().getStatsContext());
- verify(mockStream).start(streamListenerCaptor.capture());
- verify(mockStream).setCompressor(isA(Compressor.class));
- ClientStreamListener streamListener = streamListenerCaptor.getValue();
-
- // Second call
- ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
- ClientStream mockStream2 = mock(ClientStream.class);
- Metadata headers2 = new Metadata();
- when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class)))
- .thenReturn(mockStream2);
- call2.start(mockCallListener2, headers2);
- verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
- statsTraceCtxCaptor.capture());
- assertEquals(statsCtxFactory.pollContextOrFail(),
- statsTraceCtxCaptor.getValue().getStatsContext());
-
- verify(mockStream2).start(streamListenerCaptor.capture());
- ClientStreamListener streamListener2 = streamListenerCaptor.getValue();
- Metadata trailers = new Metadata();
- streamListener2.closed(Status.CANCELLED, trailers);
- executor.runDueTasks();
-
- verify(mockCallListener2).onClose(Status.CANCELLED, trailers);
-
- // Shutdown
- channel.shutdown();
- assertTrue(channel.isShutdown());
- assertFalse(channel.isTerminated());
- verify(mockTransport).shutdown();
- assertEquals(1, nameResolverFactory.resolvers.size());
- assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
- assertEquals(1, loadBalancerFactory.balancers.size());
- verify(loadBalancerFactory.balancers.get(0)).shutdown();
-
- // Further calls should fail without going to the transport
- ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
- call3.start(mockCallListener3, new Metadata());
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
- assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
-
- // Finish shutdown
- transportListener.transportShutdown(Status.CANCELLED);
- assertFalse(channel.isTerminated());
- streamListener.closed(Status.CANCELLED, trailers);
- executor.runDueTasks();
-
- verify(mockCallListener).onClose(Status.CANCELLED, trailers);
- assertFalse(channel.isTerminated());
-
- transportListener.transportTerminated();
- assertTrue(channel.isTerminated());
-
- verify(mockTransportFactory).close();
- verifyNoMoreInteractions(mockTransportFactory);
- verify(mockTransport, atLeast(0)).getLogId();
- verifyNoMoreInteractions(mockTransport);
- verifyNoMoreInteractions(mockStream);
- }
-
- @Test
- public void callAndShutdownNow() {
- FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- verifyNoMoreInteractions(mockTransportFactory);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- verifyNoMoreInteractions(mockTransportFactory);
-
- // Create transport and call
- ClientStream mockStream = mock(ClientStream.class);
- Metadata headers = new Metadata();
- when(mockTransportFactory.newClientTransport(
- any(SocketAddress.class), any(String.class), any(String.class)))
- .thenReturn(mockTransport);
- when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class)))
- .thenReturn(mockStream);
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockTransportFactory)
- .newClientTransport(same(socketAddress), eq(authority), any(String.class));
- verify(mockTransport).start(transportListenerCaptor.capture());
- ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
- transportListener.transportReady();
- executor.runDueTasks();
-
- verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class));
-
- verify(mockStream).start(streamListenerCaptor.capture());
- verify(mockStream).setCompressor(isA(Compressor.class));
- ClientStreamListener streamListener = streamListenerCaptor.getValue();
-
- // ShutdownNow
- channel.shutdownNow();
- assertTrue(channel.isShutdown());
- assertFalse(channel.isTerminated());
- // ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
- // either way.
- verify(mockTransport, atMost(1)).shutdown();
- verify(mockTransport).shutdownNow(any(Status.class));
- assertEquals(1, nameResolverFactory.resolvers.size());
- assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
- assertEquals(1, loadBalancerFactory.balancers.size());
- verify(loadBalancerFactory.balancers.get(0)).shutdown();
-
- // Further calls should fail without going to the transport
- ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
- call3.start(mockCallListener3, new Metadata());
- executor.runDueTasks();
-
- verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
- assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
-
- // Finish shutdown
- transportListener.transportShutdown(Status.CANCELLED);
- assertFalse(channel.isTerminated());
- Metadata trailers = new Metadata();
- streamListener.closed(Status.CANCELLED, trailers);
- executor.runDueTasks();
-
- verify(mockCallListener).onClose(Status.CANCELLED, trailers);
- assertFalse(channel.isTerminated());
-
- transportListener.transportTerminated();
- assertTrue(channel.isTerminated());
-
- verify(mockTransportFactory).close();
- verifyNoMoreInteractions(mockTransportFactory);
- verify(mockTransport, atLeast(0)).getLogId();
- verifyNoMoreInteractions(mockTransport);
- verifyNoMoreInteractions(mockStream);
- }
-
- /** Make sure shutdownNow() after shutdown() has an effect. */
- @Test
- public void callAndShutdownAndShutdownNow() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
-
- // Create transport and call
- ClientStream mockStream = mock(ClientStream.class);
- Metadata headers = new Metadata();
- when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class)))
- .thenReturn(mockStream);
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockTransport).start(transportListenerCaptor.capture());
- ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
- transportListener.transportReady();
- executor.runDueTasks();
-
- verify(mockStream).start(streamListenerCaptor.capture());
- ClientStreamListener streamListener = streamListenerCaptor.getValue();
-
- // ShutdownNow
- channel.shutdown();
- channel.shutdownNow();
- // ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
- // either way.
- verify(mockTransport, atMost(2)).shutdown();
- verify(mockTransport).shutdownNow(any(Status.class));
-
- // Finish shutdown
- transportListener.transportShutdown(Status.CANCELLED);
- assertFalse(channel.isTerminated());
- Metadata trailers = new Metadata();
- streamListener.closed(Status.CANCELLED, trailers);
- executor.runDueTasks();
-
- verify(mockCallListener).onClose(Status.CANCELLED, trailers);
- assertFalse(channel.isTerminated());
-
- transportListener.transportTerminated();
- assertTrue(channel.isTerminated());
- }
-
-
- @Test
- public void interceptor() throws Exception {
- final AtomicLong atomic = new AtomicLong();
- ClientInterceptor interceptor = new ClientInterceptor() {
- @Override
- public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
- MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
- Channel next) {
- atomic.set(1);
- return next.newCall(method, callOptions);
- }
- };
- createChannel(new FakeNameResolverFactory(true), Arrays.asList(interceptor));
- assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
- assertEquals(1, atomic.get());
- }
-
- @Test
- public void testNoDeadlockOnShutdown() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- // Force creation of transport
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- Metadata headers = new Metadata();
- ClientStream mockStream = mock(ClientStream.class);
- when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
- call.cancel("Cancel for test", null);
- executor.runDueTasks();
-
- verify(mockTransport).start(transportListenerCaptor.capture());
- final ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
- final Object lock = new Object();
- final CyclicBarrier barrier = new CyclicBarrier(2);
- new Thread() {
- @Override
- public void run() {
- synchronized (lock) {
- try {
- barrier.await();
- } catch (Exception ex) {
- throw new AssertionError(ex);
- }
- // To deadlock, a lock would be needed for this call to proceed.
- transportListener.transportShutdown(Status.CANCELLED);
- }
- }
- }.start();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- // To deadlock, a lock would need to be held while this method is in progress.
- try {
- barrier.await();
- } catch (Exception ex) {
- throw new AssertionError(ex);
- }
- // If deadlock is possible with this setup, this sychronization completes the loop because
- // the transportShutdown needs a lock that Channel is holding while calling this method.
- synchronized (lock) {
- }
- return null;
- }
- }).when(mockTransport).shutdown();
- channel.shutdown();
-
- transportListener.transportTerminated();
- }
-
- @Test
- public void callOptionsExecutor() {
- Metadata headers = new Metadata();
- ClientStream mockStream = mock(ClientStream.class);
- when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class),
- any(StatsTraceContext.class)))
- .thenReturn(mockStream);
- FakeClock callExecutor = new FakeClock();
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- CallOptions options =
- CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
-
- ClientCall<String, Integer> call = channel.newCall(method, options);
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockTransport).start(transportListenerCaptor.capture());
- assertEquals(0, executor.numPendingTasks());
- transportListenerCaptor.getValue().transportReady();
- // Real streams are started in the channel's executor
- assertEquals(1, executor.runDueTasks());
-
- verify(mockTransport).newStream(same(method), same(headers), same(options),
- any(StatsTraceContext.class));
- verify(mockStream).start(streamListenerCaptor.capture());
- ClientStreamListener streamListener = streamListenerCaptor.getValue();
- Metadata trailers = new Metadata();
- assertEquals(0, callExecutor.numPendingTasks());
- streamListener.closed(Status.CANCELLED, trailers);
- verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
- assertEquals(1, callExecutor.runDueTasks());
-
- verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
- }
-
- @Test
- public void nameResolutionFailed() {
- Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
-
- // Name resolution is started as soon as channel is created.
- createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- timer.runDueTasks();
- executor.runDueTasks();
-
- // The call failed with the name resolution error
- verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
- Status status = statusCaptor.getValue();
- assertSame(error.getCode(), status.getCode());
- assertSame(error.getCause(), status.getCause());
- // LoadBalancer received the same error
- assertEquals(1, loadBalancerFactory.balancers.size());
- verify(loadBalancerFactory.balancers.get(0)).handleNameResolutionError(same(error));
- }
-
- @Test
- public void nameResolverReturnsEmptySubLists() {
- String errorDescription = "NameResolver returned an empty list";
-
- // Name resolution is started as soon as channel is created
- createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- timer.runDueTasks();
- executor.runDueTasks();
-
- // The call failed with the name resolution error
- verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
- Status status = statusCaptor.getValue();
- assertSame(Status.Code.UNAVAILABLE, status.getCode());
- assertTrue(status.getDescription(), status.getDescription().contains(errorDescription));
- // LoadBalancer received the same error
- assertEquals(1, loadBalancerFactory.balancers.size());
- verify(loadBalancerFactory.balancers.get(0)).handleNameResolutionError(statusCaptor.capture());
- status = statusCaptor.getValue();
- assertSame(Status.Code.UNAVAILABLE, status.getCode());
- assertEquals(errorDescription, status.getDescription());
- }
-
- @Test
- public void loadBalancerThrowsInHandleResolvedAddresses() {
- RuntimeException ex = new RuntimeException("simulated");
- // Delay the success of name resolution until allResolved() is called
- FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- call.start(mockCallListener, new Metadata());
- timer.runDueTasks();
- executor.runDueTasks();
-
- assertEquals(1, loadBalancerFactory.balancers.size());
- LoadBalancer<?> loadBalancer = loadBalancerFactory.balancers.get(0);
- doThrow(ex).when(loadBalancer).handleResolvedAddresses(
- Matchers.<List<ResolvedServerInfoGroup>>anyObject(), any(Attributes.class));
-
- // NameResolver returns addresses.
- nameResolverFactory.allResolved();
- executor.runDueTasks();
-
- // The call failed with the error thrown from handleResolvedAddresses()
- verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
- Status status = statusCaptor.getValue();
- assertSame(Status.Code.INTERNAL, status.getCode());
- assertSame(ex, status.getCause());
- // The LoadBalancer received the same error
- verify(loadBalancer).handleNameResolutionError(statusCaptor.capture());
- status = statusCaptor.getValue();
- assertSame(Status.Code.INTERNAL, status.getCode());
- assertSame(ex, status.getCause());
- }
-
- @Test
- public void nameResolvedAfterChannelShutdown() {
- // Delay the success of name resolution until allResolved() is called.
- FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- Metadata headers = new Metadata();
-
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
- channel.shutdown();
-
- assertTrue(channel.isShutdown());
- // Name resolved after the channel is shut down, which is possible if the name resolution takes
- // time and is not cancellable. The resolved address will still be passed to the LoadBalancer.
- nameResolverFactory.allResolved();
- executor.runDueTasks();
-
- verify(mockTransportFactory, never())
- .newClientTransport(any(SocketAddress.class), any(String.class), any(String.class));
- }
-
- /**
- * Verify that if the first resolved address points to a server that cannot be connected, the call
- * will end up with the second address which works.
- */
- @Test
- public void firstResolvedServerFailedToConnect() throws Exception {
- final SocketAddress goodAddress = new SocketAddress() {
- @Override public String toString() {
- return "goodAddress";
- }
- };
- final SocketAddress badAddress = new SocketAddress() {
- @Override public String toString() {
- return "badAddress";
- }
- };
- final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
- final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
- final ConnectionClientTransport goodTransport = mock(ConnectionClientTransport.class);
- final ConnectionClientTransport badTransport = mock(ConnectionClientTransport.class);
- when(goodTransport.newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class)))
- .thenReturn(mock(ClientStream.class));
- when(mockTransportFactory.newClientTransport(
- same(goodAddress), any(String.class), any(String.class)))
- .thenReturn(goodTransport);
- when(mockTransportFactory.newClientTransport(
- same(badAddress), any(String.class), any(String.class)))
- .thenReturn(badTransport);
-
- FakeNameResolverFactory nameResolverFactory =
- new FakeNameResolverFactory(Arrays.asList(badServer, goodServer));
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- Metadata headers = new Metadata();
-
- // Start a call. The channel will starts with the first address (badAddress)
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- ArgumentCaptor<ManagedClientTransport.Listener> badTransportListenerCaptor =
- ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
- verify(badTransport).start(badTransportListenerCaptor.capture());
- verify(mockTransportFactory)
- .newClientTransport(same(badAddress), any(String.class), any(String.class));
- verify(mockTransportFactory, times(0))
- .newClientTransport(same(goodAddress), any(String.class), any(String.class));
- badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
-
- // The channel then try the second address (goodAddress)
- ArgumentCaptor<ManagedClientTransport.Listener> goodTransportListenerCaptor =
- ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
- verify(mockTransportFactory)
- .newClientTransport(same(goodAddress), any(String.class), any(String.class));
- verify(goodTransport).start(goodTransportListenerCaptor.capture());
- goodTransportListenerCaptor.getValue().transportReady();
- executor.runDueTasks();
-
- verify(goodTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class));
- // The bad transport was never used.
- verify(badTransport, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
- }
-
- /**
- * Verify that if all resolved addresses failed to connect, the call will fail.
- */
- @Test
- public void allServersFailedToConnect() throws Exception {
- final SocketAddress addr1 = new SocketAddress() {
- @Override public String toString() {
- return "addr1";
- }
- };
- final SocketAddress addr2 = new SocketAddress() {
- @Override public String toString() {
- return "addr2";
- }
- };
- final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
- final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
- final ConnectionClientTransport transport1 = mock(ConnectionClientTransport.class);
- final ConnectionClientTransport transport2 = mock(ConnectionClientTransport.class);
- when(mockTransportFactory.newClientTransport(same(addr1), any(String.class), any(String.class)))
- .thenReturn(transport1);
- when(mockTransportFactory.newClientTransport(same(addr2), any(String.class), any(String.class)))
- .thenReturn(transport2);
-
- FakeNameResolverFactory nameResolverFactory =
- new FakeNameResolverFactory(Arrays.asList(server1, server2));
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- Metadata headers = new Metadata();
-
- // Start a call. The channel will starts with the first address, which will fail to connect.
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(transport1).start(transportListenerCaptor.capture());
- verify(mockTransportFactory)
- .newClientTransport(same(addr1), any(String.class), any(String.class));
- verify(mockTransportFactory, times(0))
- .newClientTransport(same(addr2), any(String.class), any(String.class));
- transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
-
- // The channel then try the second address, which will fail to connect too.
- verify(transport2).start(transportListenerCaptor.capture());
- verify(mockTransportFactory)
- .newClientTransport(same(addr2), any(String.class), any(String.class));
- verify(transport2).start(transportListenerCaptor.capture());
- transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
- executor.runDueTasks();
-
- // Call fails
- verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
- assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
- // No real stream was ever created
- verify(transport1, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
- verify(transport2, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
- }
-
- /**
- * Verify that if the first resolved address points to a server that is at first connected, but
- * disconnected later, all calls will stick to the first address.
- */
- @Test
- public void firstResolvedServerConnectedThenDisconnected() throws Exception {
- final SocketAddress addr1 = new SocketAddress() {
- @Override public String toString() {
- return "addr1";
- }
- };
- final SocketAddress addr2 = new SocketAddress() {
- @Override public String toString() {
- return "addr2";
- }
- };
- final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
- final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
- // Addr1 will have two transports throughout this test.
- final ConnectionClientTransport transport1 = mock(ConnectionClientTransport.class);
- final ConnectionClientTransport transport2 = mock(ConnectionClientTransport.class);
- when(transport1.newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class)))
- .thenReturn(mock(ClientStream.class));
- when(transport2.newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class)))
- .thenReturn(mock(ClientStream.class));
- when(mockTransportFactory.newClientTransport(same(addr1), any(String.class), any(String.class)))
- .thenReturn(transport1, transport2);
-
- FakeNameResolverFactory nameResolverFactory =
- new FakeNameResolverFactory(Arrays.asList(server1, server2));
- createChannel(nameResolverFactory, NO_INTERCEPTOR);
- ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- Metadata headers = new Metadata();
-
- // First call will use the first address
- call.start(mockCallListener, headers);
- timer.runDueTasks();
- executor.runDueTasks();
-
- verify(mockTransportFactory)
- .newClientTransport(same(addr1), any(String.class), any(String.class));
- verify(transport1).start(transportListenerCaptor.capture());
- transportListenerCaptor.getValue().transportReady();
- executor.runDueTasks();
-
- verify(transport1).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class));
- transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
-
- // Second call still use the first address, since it was successfully connected.
- ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
- call2.start(mockCallListener, headers);
- verify(transport2).start(transportListenerCaptor.capture());
- verify(mockTransportFactory, times(2))
- .newClientTransport(same(addr1), any(String.class), any(String.class));
- transportListenerCaptor.getValue().transportReady();
- executor.runDueTasks();
-
- verify(transport2).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
- any(StatsTraceContext.class));
- }
-
- @Test
- public void uriPattern() {
- assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
- assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
- assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
- assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
- assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
- assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
- }
-
- /**
- * Test that information such as the Call's context, MethodDescriptor, authority, executor are
- * propagated to newStream() and applyRequestMetadata().
- */
- @Test
- public void informationPropagatedToNewStreamAndCallCredentials() {
- createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
- CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
- final Context.Key<String> testKey = Context.key("testing");
- Context ctx = Context.current().withValue(testKey, "testValue");
- final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
- final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock in) throws Throwable {
- credsApplyContexts.add(Context.current());
- return null;
- }
- }).when(creds).applyRequestMetadata(
- any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
- any(MetadataApplier.class));
-
- final ConnectionClientTransport transport = mock(ConnectionClientTransport.class);
- when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
- when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class),
- any(String.class))).thenReturn(transport);
- doAnswer(new Answer<ClientStream>() {
- @Override
- public ClientStream answer(InvocationOnMock in) throws Throwable {
- newStreamContexts.add(Context.current());
- return mock(ClientStream.class);
- }
- }).when(transport).newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class));
-
- // First call will be on delayed transport. Only newCall() is run within the expected context,
- // so that we can verify that the context is explicitly attached before calling newStream() and
- // applyRequestMetadata(), which happens after we detach the context from the thread.
- Context origCtx = ctx.attach();
- assertEquals("testValue", testKey.get());
- ClientCall<String, Integer> call = channel.newCall(method, callOptions);
- ctx.detach(origCtx);
- assertNull(testKey.get());
- call.start(mockCallListener, new Metadata());
-
- ArgumentCaptor<ManagedClientTransport.Listener> transportListenerCaptor =
- ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
- verify(mockTransportFactory).newClientTransport(
- same(socketAddress), eq(authority), eq(userAgent));
- verify(transport).start(transportListenerCaptor.capture());
- verify(creds, never()).applyRequestMetadata(
- any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
- any(MetadataApplier.class));
-
- // applyRequestMetadata() is called after the transport becomes ready.
- transportListenerCaptor.getValue().transportReady();
- executor.runDueTasks();
- ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
- ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
- verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
- same(executor.getScheduledExecutorService()), applierCaptor.capture());
- assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
- assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
- assertEquals(SecurityLevel.NONE,
- attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
- verify(transport, never()).newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class));
-
- // newStream() is called after apply() is called
- applierCaptor.getValue().apply(new Metadata());
- verify(transport).newStream(same(method), any(Metadata.class), same(callOptions),
- any(StatsTraceContext.class));
- assertEquals("testValue", testKey.get(newStreamContexts.poll()));
- // The context should not live beyond the scope of newStream() and applyRequestMetadata()
- assertNull(testKey.get());
-
-
- // Second call will not be on delayed transport
- origCtx = ctx.attach();
- call = channel.newCall(method, callOptions);
- ctx.detach(origCtx);
- call.start(mockCallListener, new Metadata());
-
- verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
- same(executor.getScheduledExecutorService()), applierCaptor.capture());
- assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
- assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
- assertEquals(SecurityLevel.NONE,
- attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
- // This is from the first call
- verify(transport).newStream(
- any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
- any(StatsTraceContext.class));
-
- // Still, newStream() is called after apply() is called
- applierCaptor.getValue().apply(new Metadata());
- verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions),
- any(StatsTraceContext.class));
- assertEquals("testValue", testKey.get(newStreamContexts.poll()));
-
- assertNull(testKey.get());
- }
-
- private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
- @Override
- public BackoffPolicy get() {
- return new BackoffPolicy() {
- @Override
- public long nextBackoffMillis() {
- return 1;
- }
- };
- }
- }
-
- private class FakeNameResolverFactory extends NameResolver.Factory {
- final List<ResolvedServerInfoGroup> servers;
- final boolean resolvedAtStart;
- final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
-
- FakeNameResolverFactory(boolean resolvedAtStart) {
- this.resolvedAtStart = resolvedAtStart;
- servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build());
- }
-
- FakeNameResolverFactory(List<ResolvedServerInfo> servers) {
- resolvedAtStart = true;
- this.servers = Collections.singletonList(
- ResolvedServerInfoGroup.builder().addAll(servers).build());
- }
-
- public FakeNameResolverFactory() {
- resolvedAtStart = true;
- this.servers = ImmutableList.of();
- }
-
- @Override
- public NameResolver newNameResolver(final URI targetUri, Attributes params) {
- if (!expectedUri.equals(targetUri)) {
- return null;
- }
- assertSame(NAME_RESOLVER_PARAMS, params);
- FakeNameResolver resolver = new FakeNameResolver();
- resolvers.add(resolver);
- return resolver;
- }
-
- @Override
- public String getDefaultScheme() {
- return "fake";
- }
-
- void allResolved() {
- for (FakeNameResolver resolver : resolvers) {
- resolver.resolved();
- }
- }
-
- private class FakeNameResolver extends NameResolver {
- Listener listener;
- boolean shutdown;
-
- @Override public String getServiceAuthority() {
- return expectedUri.getAuthority();
- }
-
- @Override public void start(final Listener listener) {
- this.listener = listener;
- if (resolvedAtStart) {
- resolved();
- }
- }
-
- void resolved() {
- listener.onUpdate(servers, Attributes.EMPTY);
- }
-
- @Override public void shutdown() {
- shutdown = true;
- }
- }
- }
-
- private static class FailingNameResolverFactory extends NameResolver.Factory {
- final Status error;
-
- FailingNameResolverFactory(Status error) {
- this.error = error;
- }
-
- @Override
- public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
- return new NameResolver() {
- @Override public String getServiceAuthority() {
- return "irrelevant-authority";
- }
-
- @Override public void start(final Listener listener) {
- listener.onError(error);
- }
-
- @Override public void shutdown() {}
- };
- }
-
- @Override
- public String getDefaultScheme() {
- return "fake";
- }
- }
-
- private static class SpyingLoadBalancerFactory extends LoadBalancer.Factory {
- private final LoadBalancer.Factory delegate;
- private final List<LoadBalancer<?>> balancers = new ArrayList<LoadBalancer<?>>();
-
- private SpyingLoadBalancerFactory(LoadBalancer.Factory delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
- LoadBalancer<T> lb = spy(delegate.newLoadBalancer(serviceName, tm));
- balancers.add(lb);
- return lb;
- }
- }
-}
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
deleted file mode 100644
index a1ed139..0000000
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import io.grpc.Attributes;
-import io.grpc.CallOptions;
-import io.grpc.ClientInterceptor;
-import io.grpc.CompressorRegistry;
-import io.grpc.DecompressorRegistry;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.LoadBalancer;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.NameResolver;
-import io.grpc.Status;
-import io.grpc.StringMarshaller;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import io.grpc.TransportManager.OobTransportProvider;
-import io.grpc.internal.TestUtils.MockClientTransportInfo;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Unit tests for {@link ManagedChannelImpl}'s {@link TransportManager} implementation as well as
- * {@link TransportSet}.
- */
-@RunWith(JUnit4.class)
-public class ManagedChannelImplTransportManagerTest {
-
- private static final String AUTHORITY = "fakeauthority";
- private static final String USER_AGENT = "mosaic";
-
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final MethodDescriptor<String, String> method =
- MethodDescriptor.<String, String>newBuilder()
- .setType(MethodType.UNKNOWN)
- .setFullMethodName("/service/method")
- .setRequestMarshaller(new StringMarshaller())
- .setResponseMarshaller(new StringMarshaller())
- .build();
-
- private final MethodDescriptor<String, String> method2 = method.toBuilder()
- .setFullMethodName("/service/method2")
- .build();
- private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
- private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
- private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
- method.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
- GrpcUtil.STOPWATCH_SUPPLIER);
- private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
- method2.getFullMethodName(), NoopStatsContextFactory.INSTANCE,
- GrpcUtil.STOPWATCH_SUPPLIER);
-
- private ManagedChannelImpl channel;
-
- @Mock private ClientTransportFactory mockTransportFactory;
- @Mock private LoadBalancer.Factory mockLoadBalancerFactory;
- @Mock private NameResolver mockNameResolver;
- @Mock private NameResolver.Factory mockNameResolverFactory;
- @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
- @Mock private BackoffPolicy mockBackoffPolicy;
-
- private BlockingQueue<MockClientTransportInfo> transports;
-
- private TransportManager<ClientTransport> tm;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
- when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY);
- when(mockNameResolverFactory
- .newNameResolver(any(URI.class), any(Attributes.class)))
- .thenReturn(mockNameResolver);
- @SuppressWarnings("unchecked")
- LoadBalancer<ClientTransport> loadBalancer = mock(LoadBalancer.class);
- when(mockLoadBalancerFactory
- .newLoadBalancer(anyString(), Matchers.<TransportManager<ClientTransport>>any()))
- .thenReturn(loadBalancer);
-
- channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
- mockNameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
- mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
- CompressorRegistry.getDefaultInstance(), GrpcUtil.TIMER_SERVICE,
- GrpcUtil.STOPWATCH_SUPPLIER, ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
- executor, USER_AGENT, Collections.<ClientInterceptor>emptyList(),
- NoopStatsContextFactory.INSTANCE);
-
- ArgumentCaptor<TransportManager<ClientTransport>> tmCaptor
- = ArgumentCaptor.forClass(null);
- // Force Channel to exit the initial idleness to get NameResolver and LoadBalancer created.
- channel.exitIdleModeAndGetLb();
- verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
- verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
- tm = tmCaptor.getValue();
- transports = TestUtils.captureTransports(mockTransportFactory);
- // NameResolver is started in the executor
- verify(mockNameResolver, timeout(1000)).start(any(NameResolver.Listener.class));
- }
-
- @After
- public void tearDown() {
- channel.shutdown();
- executor.shutdown();
- }
-
- @Test
- public void createAndReuseTransport() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr);
- ClientTransport t1 = tm.getTransport(addressGroup);
- verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, AUTHORITY, USER_AGENT);
- // The real transport
- MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS);
- transportInfo.listener.transportReady();
- ForwardingConnectionClientTransport t2 =
- (ForwardingConnectionClientTransport) tm.getTransport(addressGroup);
- assertTrue(t1 instanceof DelayedClientTransport);
- assertSame(transportInfo.transport, t2.delegate());
- verify(mockBackoffPolicyProvider, times(0)).get();
- verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
- verifyNoMoreInteractions(mockTransportFactory);
- }
-
- @Test
- public void reconnect() throws Exception {
- SocketAddress addr1 = mock(SocketAddress.class);
- SocketAddress addr2 = mock(SocketAddress.class);
- EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
-
- // Invocation counters
- int backoffReset = 0;
-
- // Pick the first transport
- ClientTransport t1 = tm.getTransport(addressGroup);
- assertNotNull(t1);
- verify(mockTransportFactory, timeout(1000)).newClientTransport(addr1, AUTHORITY, USER_AGENT);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- // Fail the first transport, without setting it to ready
- MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS);
- ClientTransport rt1 = transportInfo.transport;
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
-
- // Subsequent getTransport() will use the next address
- ClientTransport t2 = tm.getTransport(addressGroup);
- assertNotNull(t2);
- t2.newStream(method, new Metadata(), callOptions, statsTraceCtx);
- // Will keep the previous back-off policy, and not consult back-off policy
- verify(mockTransportFactory, timeout(1000)).newClientTransport(addr2, AUTHORITY, USER_AGENT);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- transportInfo = transports.poll(1, TimeUnit.SECONDS);
- ClientTransport rt2 = transportInfo.transport;
- // Make the second transport ready
- transportInfo.listener.transportReady();
- verify(rt2, timeout(1000)).newStream(
- same(method), any(Metadata.class), same(callOptions), same(statsTraceCtx));
- verify(mockNameResolver, times(0)).refresh();
- // Disconnect the second transport
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- // Will trigger NameResolver refresh
- verify(mockNameResolver).refresh();
-
- // Subsequent getTransport() will use the first address, since last attempt was successful.
- ClientTransport t3 = tm.getTransport(addressGroup);
- t3.newStream(method2, new Metadata(), callOptions2, statsTraceCtx2);
- verify(mockTransportFactory, timeout(1000).times(2))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Still no back-off policy creation, because an address succeeded.
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- transportInfo = transports.poll(1, TimeUnit.SECONDS);
- ClientTransport rt3 = transportInfo.transport;
- transportInfo.listener.transportReady();
- verify(rt3, timeout(1000)).newStream(
- same(method2), any(Metadata.class), same(callOptions2), same(statsTraceCtx2));
-
- verify(rt1, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
- // Back-off policy was never consulted.
- verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
- verifyNoMoreInteractions(mockTransportFactory);
- // NameResolver was refreshed only once
- verify(mockNameResolver).refresh();
- }
-
- @Test
- public void reconnectWithBackoff() throws Exception {
- SocketAddress addr1 = mock(SocketAddress.class);
- SocketAddress addr2 = mock(SocketAddress.class);
- EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
-
- // Invocation counters
- int transportsAddr1 = 0;
- int transportsAddr2 = 0;
- int backoffConsulted = 0;
- int backoffReset = 0;
- int nameResolverRefresh = 0;
-
- // First pick succeeds
- ClientTransport t1 = tm.getTransport(addressGroup);
- assertNotNull(t1);
- verify(mockTransportFactory, timeout(1000).times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Back-off policy was unset initially.
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS);
- verify(mockNameResolver, times(nameResolverRefresh)).refresh();
- transportInfo.listener.transportReady();
- // Then close it
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- verify(mockNameResolver, times(++nameResolverRefresh)).refresh();
-
- // Second pick fails. This is the beginning of a series of failures.
- ClientTransport t2 = tm.getTransport(addressGroup);
- assertNotNull(t2);
- verify(mockTransportFactory, timeout(1000).times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Back-off policy was not reset.
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- transports.poll(1, TimeUnit.SECONDS).listener.transportShutdown(Status.UNAVAILABLE);
- verify(mockNameResolver, times(nameResolverRefresh)).refresh();
-
- // Third pick fails too
- ClientTransport t3 = tm.getTransport(addressGroup);
- assertNotNull(t3);
- verify(mockTransportFactory, timeout(1000).times(++transportsAddr2))
- .newClientTransport(addr2, AUTHORITY, USER_AGENT);
- // Back-off policy was not reset.
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- transports.poll(1, TimeUnit.SECONDS).listener.transportShutdown(Status.UNAVAILABLE);
- verify(mockNameResolver, times(++nameResolverRefresh)).refresh();
-
- // Forth pick is on the first address, back-off policy kicks in.
- ClientTransport t4 = tm.getTransport(addressGroup);
- assertNotNull(t4);
- // If backoff's DelayedTransport is still active, this is necessary. Otherwise it would be racy.
- t4.newStream(method, new Metadata(), CallOptions.DEFAULT.withWaitForReady(), statsTraceCtx);
- verify(mockTransportFactory, timeout(1000).times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Back-off policy was reset and consulted.
- verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
- verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis();
- verify(mockNameResolver, times(nameResolverRefresh)).refresh();
- }
-
- @Test
- public void createFailingTransport() {
- Status error = Status.UNAVAILABLE.augmentDescription("simulated");
- FailingClientTransport transport = (FailingClientTransport) tm.createFailingTransport(error);
- assertSame(error, transport.error);
- }
-
- @Test
- public void createInterimTransport() {
- InterimTransport<ClientTransport> interimTransport = tm.createInterimTransport();
- ClientTransport transport = interimTransport.transport();
- assertTrue(transport instanceof DelayedClientTransport);
- ClientStream s1 = transport.newStream(method, new Metadata());
- ClientStreamListener sl1 = mock(ClientStreamListener.class);
- s1.start(sl1);
-
- // Shutting down the channel will shutdown the interim transport, thus refusing further streams,
- // but will continue existing streams.
- channel.shutdown();
- ClientStream s2 = transport.newStream(method, new Metadata());
- ClientStreamListener sl2 = mock(ClientStreamListener.class);
- s2.start(sl2);
- verify(sl2).closed(any(Status.class), any(Metadata.class));
- verify(sl1, times(0)).closed(any(Status.class), any(Metadata.class));
- assertFalse(channel.isTerminated());
-
- // After channel has shut down, createInterimTransport() will get you a transport that has
- // already set error.
- ClientTransport transportAfterShutdown = tm.createInterimTransport().transport();
- ClientStream s3 = transportAfterShutdown.newStream(method, new Metadata());
- ClientStreamListener sl3 = mock(ClientStreamListener.class);
- s3.start(sl3);
- verify(sl3).closed(any(Status.class), any(Metadata.class));
-
- // Closing the interim transport with error will terminate the interim transport, which in turn
- // allows channel to terminate.
- interimTransport.closeWithError(Status.UNAVAILABLE);
- verify(sl1).closed(same(Status.UNAVAILABLE), any(Metadata.class));
- assertTrue(channel.isTerminated());
- }
-
- @Test
- public void createOobTransportProvider() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr);
- String oobAuthority = "oobauthority";
-
- OobTransportProvider<ClientTransport> p1 =
- tm.createOobTransportProvider(addressGroup, oobAuthority);
- ClientTransport t1 = p1.get();
- assertNotNull(t1);
- assertSame(t1, p1.get());
- verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, oobAuthority, USER_AGENT);
- MockClientTransportInfo transportInfo1 = transports.poll(1, TimeUnit.SECONDS);
-
- // OOB transport providers are not indexed by addresses, thus each time it creates
- // a new provider.
- OobTransportProvider<ClientTransport> p2 =
- tm.createOobTransportProvider(addressGroup, oobAuthority);
- assertNotSame(p1, p2);
- ClientTransport t2 = p2.get();
- verify(mockTransportFactory, timeout(1000).times(2))
- .newClientTransport(addr, oobAuthority, USER_AGENT);
- assertNotSame(t1, t2);
- MockClientTransportInfo transportInfo2 = transports.poll(1, TimeUnit.SECONDS);
- assertNotSame(transportInfo1.transport, transportInfo2.transport);
-
- // Closing the OobTransportProvider will shutdown the transport
- p1.close();
- verify(transportInfo1.transport).shutdown();
- transportInfo1.listener.transportTerminated();
-
- channel.shutdown();
- verify(transportInfo2.transport).shutdown();
-
- OobTransportProvider<ClientTransport> p3 =
- tm.createOobTransportProvider(addressGroup, oobAuthority);
- assertTrue(p3.get() instanceof FailingClientTransport);
-
- p2.close();
-
- // The channel will not be terminated until all OOB transports are terminated.
- assertFalse(channel.isTerminated());
- transportInfo2.listener.transportTerminated();
- assertTrue(channel.isTerminated());
- }
-
- @Test
- public void interimTransportShutdownNow() {
- InterimTransport<ClientTransport> interimTransport = tm.createInterimTransport();
- ClientTransport transport = interimTransport.transport();
- assertTrue(transport instanceof DelayedClientTransport);
- ClientStream s1 = transport.newStream(method, new Metadata());
- ClientStreamListener sl1 = mock(ClientStreamListener.class);
- s1.start(sl1);
-
- // Shutting down the channel will shutdownNow the interim transport, thus kill existing streams.
- channel.shutdownNow();
- verify(sl1).closed(any(Status.class), any(Metadata.class));
- assertTrue(channel.isShutdown());
- assertTrue(channel.isTerminated());
- }
-}
diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java
deleted file mode 100644
index 7fbbb34..0000000
--- a/core/src/test/java/io/grpc/internal/TransportSetTest.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import io.grpc.CallOptions;
-import io.grpc.ConnectivityState;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.IntegerMarshaller;
-import io.grpc.LoadBalancer;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.Status;
-import io.grpc.StringMarshaller;
-import io.grpc.internal.TestUtils.MockClientTransportInfo;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Unit tests for {@link TransportSet}.
- *
- * <p>It only tests the logic that is not covered by {@link ManagedChannelImplTransportManagerTest}.
- */
-@RunWith(JUnit4.class)
-public class TransportSetTest {
-
- private static final String AUTHORITY = "fakeauthority";
- private static final String USER_AGENT = "mosaic";
-
- private FakeClock fakeClock;
- private FakeClock fakeExecutor;
-
- @Mock private LoadBalancer<ClientTransport> mockLoadBalancer;
- @Mock private BackoffPolicy mockBackoffPolicy1;
- @Mock private BackoffPolicy mockBackoffPolicy2;
- @Mock private BackoffPolicy mockBackoffPolicy3;
- @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
- @Mock private ClientTransportFactory mockTransportFactory;
- @Mock private TransportSet.Callback mockTransportSetCallback;
- @Mock private ClientStreamListener mockStreamListener;
-
- private final MethodDescriptor<String, Integer> method =
- MethodDescriptor.<String, Integer>newBuilder()
- .setType(MethodDescriptor.MethodType.UNKNOWN)
- .setFullMethodName("/service/method")
- .setRequestMarshaller(new StringMarshaller())
- .setResponseMarshaller(new IntegerMarshaller())
- .build();
- private final Metadata headers = new Metadata();
- private final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady();
- private final CallOptions failFastCallOptions = CallOptions.DEFAULT;
- private final StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP;
-
- private TransportSet transportSet;
- private EquivalentAddressGroup addressGroup;
- private BlockingQueue<MockClientTransportInfo> transports;
-
- @Before public void setUp() {
- MockitoAnnotations.initMocks(this);
- fakeClock = new FakeClock();
- fakeExecutor = new FakeClock();
-
- when(mockBackoffPolicyProvider.get())
- .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3);
- when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L);
- when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L);
- when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L);
- transports = TestUtils.captureTransports(mockTransportFactory);
- }
-
- @After public void noMorePendingTasks() {
- assertEquals(0, fakeClock.numPendingTasks());
- assertEquals(0, fakeExecutor.numPendingTasks());
- }
-
- @Test public void singleAddressReconnect() {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
-
- // Invocation counters
- int transportsCreated = 0;
- int backoff1Consulted = 0;
- int backoff2Consulted = 0;
- int backoffReset = 0;
- int onAllAddressesFailed = 0;
-
- // First attempt
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // Fail this one
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
- // Backoff reset and using first back-off value interval
- verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
-
- // Second attempt
- // Transport creation doesn't happen until time is due
- fakeClock.forwardMillis(9);
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(1);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
- // Fail this one too
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
- // Second back-off interval
- verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
-
- // Third attempt
- // Transport creation doesn't happen until time is due
- fakeClock.forwardMillis(99);
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(1);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
- // Let this one succeed
- transports.peek().listener.transportReady();
- assertEquals(ConnectivityState.READY, transportSet.getState(false));
- fakeClock.runDueTasks();
- verify(mockTransportSetCallback, never()).onConnectionClosedByServer(any(Status.class));
- // And close it
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- verify(mockTransportSetCallback).onConnectionClosedByServer(same(Status.UNAVAILABLE));
- verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
-
- // Back-off is reset, and the next attempt will happen immediately
- transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // Final checks for consultations on back-off policies
- verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
- verify(mockTransportSetCallback, atLeast(0)).onInUse(transportSet);
- verify(mockTransportSetCallback, atLeast(0)).onNotInUse(transportSet);
- verifyNoMoreInteractions(mockTransportSetCallback);
- fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
- }
-
- @Test public void twoAddressesReconnect() {
- SocketAddress addr1 = mock(SocketAddress.class);
- SocketAddress addr2 = mock(SocketAddress.class);
- createTransportSet(addr1, addr2);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- // Invocation counters
- int transportsAddr1 = 0;
- int transportsAddr2 = 0;
- int backoff1Consulted = 0;
- int backoff2Consulted = 0;
- int backoff3Consulted = 0;
- int backoffReset = 0;
- int onAllAddressesFailed = 0;
-
- // First attempt
- DelayedClientTransport delayedTransport1 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- delayedTransport1.newStream(method, new Metadata(), waitForReadyCallOptions, statsTraceCtx);
- // Let this one fail without success
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertNull(delayedTransport1.getTransportSupplier());
- verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
-
- // Second attempt will start immediately. Still no back-off policy.
- DelayedClientTransport delayedTransport2 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertSame(delayedTransport1, delayedTransport2);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- verify(mockTransportFactory, times(++transportsAddr2))
- .newClientTransport(addr2, AUTHORITY, USER_AGENT);
- // Fail this one too
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- // All addresses have failed. Delayed transport will be in back-off interval.
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertNull(delayedTransport2.getTransportSupplier());
- assertTrue(delayedTransport2.isInBackoffPeriod());
- verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
- // Backoff reset and first back-off interval begins
- verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
-
- // Third attempt is the first address, thus controlled by the first back-off interval.
- DelayedClientTransport delayedTransport3 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertSame(delayedTransport2, delayedTransport3);
- fakeClock.forwardMillis(9);
- verify(mockTransportFactory, times(transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(1);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Fail this one too
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertNull(delayedTransport3.getTransportSupplier());
- verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
-
- // Forth attempt will start immediately. Keep back-off policy.
- DelayedClientTransport delayedTransport4 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertSame(delayedTransport3, delayedTransport4);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- verify(mockTransportFactory, times(++transportsAddr2))
- .newClientTransport(addr2, AUTHORITY, USER_AGENT);
- // Fail this one too
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- // All addresses have failed again. Delayed transport will be in back-off interval.
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertNull(delayedTransport4.getTransportSupplier());
- assertTrue(delayedTransport4.isInBackoffPeriod());
- verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
- // Second back-off interval begins
- verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
-
- // Fifth attempt for the first address, thus controlled by the second back-off interval.
- DelayedClientTransport delayedTransport5 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertSame(delayedTransport4, delayedTransport5);
- fakeClock.forwardMillis(99);
- verify(mockTransportFactory, times(transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(1);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Let it through
- transports.peek().listener.transportReady();
- assertEquals(ConnectivityState.READY, transportSet.getState(false));
- // Delayed transport will see the connected transport.
- assertSame(transports.peek().transport, delayedTransport5.getTransportSupplier().get());
- verify(mockTransportSetCallback, never()).onConnectionClosedByServer(any(Status.class));
- // Then close it.
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- verify(mockTransportSetCallback).onConnectionClosedByServer(same(Status.UNAVAILABLE));
- verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
-
- // First attempt after a successful connection. Old back-off policy should be ignored, but there
- // is not yet a need for a new one. Start from the first address.
- DelayedClientTransport delayedTransport6 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertNotSame(delayedTransport5, delayedTransport6);
- delayedTransport6.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- verify(mockTransportFactory, times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- // Fail the transport
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertNull(delayedTransport6.getTransportSupplier());
- verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
-
- // Second attempt will start immediately. Still no new back-off policy.
- DelayedClientTransport delayedTransport7 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertSame(delayedTransport6, delayedTransport7);
- verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- verify(mockTransportFactory, times(++transportsAddr2))
- .newClientTransport(addr2, AUTHORITY, USER_AGENT);
- // Fail this one too
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- // All addresses have failed. Delayed transport will be in back-off interval.
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertNull(delayedTransport7.getTransportSupplier());
- assertTrue(delayedTransport7.isInBackoffPeriod());
- verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
- // Back-off reset and first back-off interval begins
- verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
-
- // Third attempt is the first address, thus controlled by the first back-off interval.
- DelayedClientTransport delayedTransport8 =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertSame(delayedTransport7, delayedTransport8);
- fakeClock.forwardMillis(9);
- verify(mockTransportFactory, times(transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(1);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsAddr1))
- .newClientTransport(addr1, AUTHORITY, USER_AGENT);
-
- // Final checks on invocations on back-off policies
- verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
- verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis();
- verify(mockTransportSetCallback, atLeast(0)).onInUse(transportSet);
- verify(mockTransportSetCallback, atLeast(0)).onNotInUse(transportSet);
- verifyNoMoreInteractions(mockTransportSetCallback);
- fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
- }
-
- @Test
- public void verifyFailFastAndNonFailFastBehaviors() {
- int pendingStreamsCount = 0;
- int failFastPendingStreamsCount = 0;
-
- final SocketAddress addr1 = mock(SocketAddress.class);
- final SocketAddress addr2 = mock(SocketAddress.class);
- createTransportSet(addr1, addr2);
-
- final DelayedClientTransport delayedTransport =
- (DelayedClientTransport) transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertFalse(delayedTransport.isInBackoffPeriod());
-
- // Create a new fail fast stream.
- ClientStream ffStream = delayedTransport.newStream(method, headers, failFastCallOptions,
- statsTraceCtx);
- ffStream.start(mockStreamListener);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
- failFastPendingStreamsCount++;
- // Create a new non fail fast stream.
- delayedTransport.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
-
- // Let this 1st address fail without success.
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertFalse(delayedTransport.isInBackoffPeriod());
- // Verify pending streams still in queue.
- assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount());
-
- // Create a new fail fast stream.
- delayedTransport.newStream(method, headers, failFastCallOptions, statsTraceCtx);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
- failFastPendingStreamsCount++;
- // Create a new non fail fast stream
- delayedTransport.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
-
- // Let this 2nd address fail without success.
- Status failureStatus = Status.UNAVAILABLE.withDescription("some unique failure");
- transports.poll().listener.transportShutdown(failureStatus);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- assertTrue(delayedTransport.isInBackoffPeriod());
- // Fail fast pending streams should be cleared
- assertEquals(pendingStreamsCount - failFastPendingStreamsCount,
- delayedTransport.getPendingStreamsCount());
- pendingStreamsCount -= failFastPendingStreamsCount;
- failFastPendingStreamsCount = 0;
- fakeExecutor.runDueTasks();
- verify(mockStreamListener).closed(same(failureStatus), any(Metadata.class));
-
- // Create a new fail fast stream.
- delayedTransport.newStream(method, headers, failFastCallOptions, statsTraceCtx);
- // Verify it is not queued.
- assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount());
- // Create a new non fail fast stream
- delayedTransport.newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
-
- fakeClock.forwardMillis(10);
- // Now back-off is over
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- assertFalse(delayedTransport.isInBackoffPeriod());
-
- // Create a new fail fast stream.
- delayedTransport.newStream(method, headers, failFastCallOptions, statsTraceCtx);
- // Verify it is queued.
- assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
- failFastPendingStreamsCount++;
- assertEquals(1, failFastPendingStreamsCount);
-
- fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
- }
-
- @Test
- public void connectIsLazy() {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- // Invocation counters
- int transportsCreated = 0;
-
- // Won't connect until requested
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // First attempt
- transportSet.obtainActiveTransport().newStream(method, new Metadata());
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // Fail this one
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
-
- // Will always reconnect after back-off
- fakeClock.forwardMillis(10);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // Make this one proceed
- transports.peek().listener.transportReady();
- assertEquals(ConnectivityState.READY, transportSet.getState(false));
- // Then go-away
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
-
- // Request immediately
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, AUTHORITY, USER_AGENT);
- fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
- }
-
- @Test
- public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- // First transport is created immediately
- ClientTransport pick = transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT);
- assertNotNull(pick);
- // Fail this one
- MockClientTransportInfo transportInfo = transports.poll();
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- transportInfo.listener.transportTerminated();
-
- // Second transport will wait for back-off
- pick = transportSet.obtainActiveTransport();
- assertTrue(pick instanceof DelayedClientTransport);
- // Start a stream, which will be pending in the delayed transport
- ClientStream pendingStream = pick.newStream(method, headers, waitForReadyCallOptions,
- statsTraceCtx);
- pendingStream.start(mockStreamListener);
-
- // Shut down TransportSet before the transport is created. Further call to
- // obtainActiveTransport() gets failing transports
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- transportSet.shutdown();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- pick = transportSet.obtainActiveTransport();
- assertNotNull(pick);
- assertTrue(pick instanceof FailingClientTransport);
- verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT);
-
- // Reconnect will eventually happen, even though TransportSet has been shut down
- fakeClock.forwardMillis(10);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verify(mockTransportFactory, times(2)).newClientTransport(addr, AUTHORITY, USER_AGENT);
- // The pending stream will be started on this newly started transport after it's ready.
- // The transport is shut down by TransportSet right after the stream is created.
- transportInfo = transports.poll();
- verify(transportInfo.transport, times(0)).shutdown();
- assertEquals(0, fakeExecutor.numPendingTasks());
- transportInfo.listener.transportReady();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verify(transportInfo.transport, times(0)).newStream(
- any(MethodDescriptor.class), any(Metadata.class));
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(transportInfo.transport).newStream(same(method), same(headers),
- same(waitForReadyCallOptions), any(StatsTraceContext.class));
- verify(transportInfo.transport).shutdown();
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verify(mockTransportSetCallback, never()).onTerminated(any(TransportSet.class));
- // Terminating the transport will let TransportSet to be terminated.
- transportInfo.listener.transportTerminated();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verify(mockTransportSetCallback).onTerminated(transportSet);
-
- // No more transports will be created.
- fakeClock.forwardMillis(10000);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verifyNoMoreInteractions(mockTransportFactory);
- assertEquals(0, transports.size());
- }
-
- @Test
- public void shutdownBeforeTransportCreatedWithoutPendingStream() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- // First transport is created immediately
- ClientTransport pick = transportSet.obtainActiveTransport();
- verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT);
- assertNotNull(pick);
- // Fail this one
- MockClientTransportInfo transportInfo = transports.poll();
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- transportInfo.listener.transportTerminated();
-
- // Second transport will wait for back-off
- pick = transportSet.obtainActiveTransport();
- assertTrue(pick instanceof DelayedClientTransport);
-
- // Shut down TransportSet before the transport is created. Futher call to
- // obtainActiveTransport() gets failing transports
- transportSet.shutdown();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- pick = transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- assertNotNull(pick);
- assertTrue(pick instanceof FailingClientTransport);
-
- // TransportSet terminated promptly.
- verify(mockTransportSetCallback).onTerminated(transportSet);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
-
- // No more transports will be created.
- fakeClock.forwardMillis(10000);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- verifyNoMoreInteractions(mockTransportFactory);
- assertEquals(0, transports.size());
- }
-
- @Test
- public void shutdownBeforeTransportReady() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- ClientTransport pick = transportSet.obtainActiveTransport();
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- MockClientTransportInfo transportInfo = transports.poll();
- assertNotSame(transportInfo.transport, pick);
-
- // Shutdown the TransportSet before the pending transport is ready
- transportSet.shutdown();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
-
- // The transport should've been shut down even though it's not the active transport yet.
- verify(transportInfo.transport).shutdown();
- transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- transportInfo.listener.transportTerminated();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- }
-
- @Test
- public void obtainTransportAfterShutdown() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- transportSet.shutdown();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- ClientTransport pick = transportSet.obtainActiveTransport();
- assertNotNull(pick);
- verify(mockTransportFactory, times(0)).newClientTransport(addr, AUTHORITY, USER_AGENT);
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- }
-
- @Test
- public void requireConnectionThroughGetState() throws Exception {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(true));
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(true));
-
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(true));
-
- // Fail it
- transports.peek().listener.transportShutdown(Status.UNAVAILABLE);
- // requireConnection == true doesn't skip the back-off
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(true));
- transports.poll().listener.transportTerminated();
- fakeClock.forwardMillis(9);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(true));
- fakeClock.forwardMillis(1);
- // Only when back-off is over, do we try to connect again
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
-
- // Let it through and fail, thus a go-away
- transports.peek().listener.transportReady();
- transports.peek().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- transports.poll().listener.transportTerminated();
-
- // Request for connecting again
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(true));
- // And fail it again
- transports.peek().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(true));
-
- // Shut it down
- transportSet.shutdown();
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(true));
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
- assertFalse(transportSet.isTerminated());
-
- // Terminate it
- transports.poll().listener.transportTerminated();
- assertTrue(transportSet.isTerminated());
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(true));
- assertEquals(ConnectivityState.SHUTDOWN, transportSet.getState(false));
-
- fakeExecutor.runDueTasks(); // What tasks are scheduled is not important to this test.
- }
-
- @Test
- public void logId() {
- createTransportSet(mock(SocketAddress.class));
-
- assertNotNull(transportSet.getLogId());
- }
-
- @Test
- public void inUseState() {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- // Invocation counters
- int inUse = 0;
- int notInUse = 0;
-
- verify(mockTransportSetCallback, never()).onInUse(any(TransportSet.class));
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- verify(mockTransportSetCallback, times(++inUse)).onInUse(transportSet);
-
- MockClientTransportInfo t0 = transports.poll();
-
- verify(mockTransportSetCallback, never()).onNotInUse(any(TransportSet.class));
- t0.listener.transportReady();
- // The race between delayed transport being terminated (thus not in-use) and
- // the real transport become in-use, caused a brief period of TransportSet not in-use.
- verify(mockTransportSetCallback, times(++notInUse)).onNotInUse(transportSet);
- // Delayed transport calls newStream() on the real transport in the executor
- fakeExecutor.runDueTasks();
- verify(t0.transport).newStream(
- same(method), any(Metadata.class), same(waitForReadyCallOptions),
- any(StatsTraceContext.class));
- verify(mockTransportSetCallback, times(inUse)).onInUse(transportSet);
- t0.listener.transportInUse(true);
- verify(mockTransportSetCallback, times(++inUse)).onInUse(transportSet);
-
- t0.listener.transportInUse(false);
- verify(mockTransportSetCallback, times(++notInUse)).onNotInUse(transportSet);
-
- t0.listener.transportInUse(true);
- verify(mockTransportSetCallback, times(++inUse)).onInUse(transportSet);
-
- // Simulate that the server sends a go-away
- t0.listener.transportShutdown(Status.UNAVAILABLE);
-
- // Creates a new transport
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- MockClientTransportInfo t1 = transports.poll();
- t1.listener.transportReady();
- // Delayed transport calls newStream() on the real transport in the executor
- fakeExecutor.runDueTasks();
- verify(t1.transport).newStream(
- same(method), any(Metadata.class), same(waitForReadyCallOptions),
- any(StatsTraceContext.class));
- t1.listener.transportInUse(true);
- // No turbulance from the race mentioned eariler, because t0 has been in-use
- verify(mockTransportSetCallback, times(inUse)).onInUse(transportSet);
- verify(mockTransportSetCallback, times(notInUse)).onNotInUse(transportSet);
-
- // TransportSet is not in-use when both transports are not in-use.
- t1.listener.transportInUse(false);
- verify(mockTransportSetCallback, times(notInUse)).onNotInUse(transportSet);
- t0.listener.transportInUse(false);
- verify(mockTransportSetCallback, times(++notInUse)).onNotInUse(transportSet);
- verify(mockTransportSetCallback, times(inUse)).onInUse(transportSet);
- }
-
- @Test
- public void scheduleBackoff_DoNotScheduleEndOfBackoffIfAlreadyShutdown() {
- // Setup
- final boolean[] startBackoffAndShutdownAreCalled = {false};
- Executor executor = new Executor() {
- @Override
- public void execute(Runnable command) {
- if (command.getClass().getName().contains("FailTheFailFastPendingStreams")) {
- // shutdown during startBackoff
- transportSet.shutdown();
- startBackoffAndShutdownAreCalled[0] = true;
- }
- fakeExecutor.getScheduledExecutorService().execute(command);
- }
- };
- SocketAddress addr = mock(SocketAddress.class);
- addressGroup = new EquivalentAddressGroup(Arrays.asList(addr));
- transportSet = new TransportSet(addressGroup, AUTHORITY, USER_AGENT, mockLoadBalancer,
- mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
- fakeClock.getStopwatchSupplier(), executor, mockTransportSetCallback);
-
- // Attempt and fail, scheduleBackoff should be triggered,
- // and transportSet.shutdown should be triggered by setup
- transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions,
- statsTraceCtx);
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- verify(mockTransportSetCallback, times(1)).onAllAddressesFailed();
- assertTrue(startBackoffAndShutdownAreCalled[0]);
-
- fakeExecutor.runDueTasks();
- // verify endOfBackoff not scheduled
- verify(mockBackoffPolicy1, never()).nextBackoffMillis();
- }
-
- private void createTransportSet(SocketAddress ... addrs) {
- addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
- transportSet = new TransportSet(addressGroup, AUTHORITY, USER_AGENT, mockLoadBalancer,
- mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
- fakeClock.getStopwatchSupplier(), fakeExecutor.getScheduledExecutorService(),
- mockTransportSetCallback);
- }
-}
diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
deleted file mode 100644
index 7dc3f7e..0000000
--- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.util;
-
-import static org.junit.Assert.assertSame;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import io.grpc.Attributes;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.LoadBalancer;
-import io.grpc.ResolvedServerInfo;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import java.net.SocketAddress;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/** Unit test for {@link RoundRobinLoadBalancerFactory}. */
-@RunWith(JUnit4.class)
-public class RoundRobinLoadBalancerTest {
- private LoadBalancer<Transport> loadBalancer;
-
- private List<ResolvedServerInfoGroup> servers;
- private List<EquivalentAddressGroup> addressGroupList;
-
- @Mock private TransportManager<Transport> mockTransportManager;
- @Mock private Transport mockTransport0;
- @Mock private Transport mockTransport1;
- @Mock private Transport mockTransport2;
- @Mock private InterimTransport<Transport> mockInterimTransport;
- @Mock private Transport mockInterimTransportAsTransport;
- @Captor private ArgumentCaptor<Supplier<Transport>> transportSupplierCaptor;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- loadBalancer = RoundRobinLoadBalancerFactory.getInstance().newLoadBalancer(
- "fakeservice", mockTransportManager);
- addressGroupList = Lists.newArrayList();
- servers = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder();
- for (int j = 0; j < 3; j++) {
- resolvedServerInfoGroup.add(
- new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j)));
- }
- servers.add(resolvedServerInfoGroup.build());
- addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup());
- }
- when(mockTransportManager.getTransport(eq(addressGroupList.get(0))))
- .thenReturn(mockTransport0);
- when(mockTransportManager.getTransport(eq(addressGroupList.get(1))))
- .thenReturn(mockTransport1);
- when(mockTransportManager.getTransport(eq(addressGroupList.get(2))))
- .thenReturn(mockTransport2);
- when(mockTransportManager.createInterimTransport()).thenReturn(mockInterimTransport);
- when(mockInterimTransport.transport()).thenReturn(mockInterimTransportAsTransport);
- }
-
- @Test
- public void pickBeforeResolved() throws Exception {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
- verify(mockInterimTransport).closeWithRealTransports(transportSupplierCaptor.capture());
- assertSame(mockTransport0, transportSupplierCaptor.getValue().get());
- assertSame(mockTransport1, transportSupplierCaptor.getValue().get());
- InOrder inOrder = Mockito.inOrder(mockTransportManager);
- inOrder.verify(mockTransportManager).getTransport(eq(addressGroupList.get(0)));
- inOrder.verify(mockTransportManager).getTransport(eq(addressGroupList.get(1)));
- inOrder.verifyNoMoreInteractions();
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickBeforeNameResolutionError() {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.handleNameResolutionError(Status.UNAVAILABLE);
- verify(mockInterimTransport).closeWithError(any(Status.class));
- // Ensure a shutdown after error closes without incident
- loadBalancer.shutdown();
- // Ensure a name resolution error after shutdown does nothing
- loadBalancer.handleNameResolutionError(Status.UNAVAILABLE);
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickBeforeShutdown() {
- Transport t1 = loadBalancer.pickTransport(null);
- Transport t2 = loadBalancer.pickTransport(null);
- assertSame(mockInterimTransportAsTransport, t1);
- assertSame(mockInterimTransportAsTransport, t2);
- verify(mockTransportManager).createInterimTransport();
- verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
- verify(mockInterimTransport, times(2)).transport();
-
- loadBalancer.shutdown();
- verify(mockInterimTransport).closeWithError(any(Status.class));
- // Ensure double shutdown just returns immediately without closing again.
- loadBalancer.shutdown();
- verifyNoMoreInteractions(mockInterimTransport);
- }
-
- @Test
- public void pickAfterResolved() throws Exception {
- loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
- InOrder inOrder = Mockito.inOrder(mockTransportManager);
- for (int i = 0; i < 100; i++) {
- assertSame(mockTransport0, loadBalancer.pickTransport(null));
- inOrder.verify(mockTransportManager).getTransport(eq(addressGroupList.get(0)));
- assertSame(mockTransport1, loadBalancer.pickTransport(null));
- inOrder.verify(mockTransportManager).getTransport(eq(addressGroupList.get(1)));
- assertSame(mockTransport2, loadBalancer.pickTransport(null));
- inOrder.verify(mockTransportManager).getTransport(eq(addressGroupList.get(2)));
- }
- inOrder.verifyNoMoreInteractions();
- }
-
- private static class FakeSocketAddress extends SocketAddress {
- final String name;
-
- FakeSocketAddress(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return "FakeSocketAddress-" + name;
- }
- }
-
- private static class Transport {}
-}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
deleted file mode 100644
index e5e899a..0000000
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.grpclb;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import io.grpc.Attributes;
-import io.grpc.Channel;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.LoadBalancer;
-import io.grpc.ResolvedServerInfo;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import io.grpc.internal.GrpcUtil;
-import io.grpc.internal.RoundRobinServerList;
-import io.grpc.internal.SharedResourceHolder;
-import io.grpc.stub.StreamObserver;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.logging.Logger;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * A {@link LoadBalancer} that uses the GRPCLB protocol.
- */
-class GrpclbLoadBalancer<T> extends LoadBalancer<T> {
- private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName());
-
- private static final Status SHUTDOWN_STATUS =
- Status.UNAVAILABLE.augmentDescription("GrpclbLoadBalancer has shut down");
-
- private final Object lock = new Object();
- private final String serviceName;
- private final TransportManager<T> tm;
-
- // General states
- @GuardedBy("lock")
- private InterimTransport<T> interimTransport;
- @GuardedBy("lock")
- private Status lastError;
-
- @GuardedBy("lock")
- private boolean closed;
-
- // Load-balancer service states
- @GuardedBy("lock")
- private EquivalentAddressGroup lbAddresses;
- @GuardedBy("lock")
- private T lbTransport;
- @GuardedBy("lock")
- private T directTransport;
- @GuardedBy("lock")
- private StreamObserver<LoadBalanceResponse> lbResponseObserver;
- @GuardedBy("lock")
- private StreamObserver<LoadBalanceRequest> lbRequestWriter;
-
- // Server list states
- @GuardedBy("lock")
- private HashMap<SocketAddress, ResolvedServerInfo> servers;
- @GuardedBy("lock")
- @VisibleForTesting
- private RoundRobinServerList<T> roundRobinServerList;
-
- private ExecutorService executor;
-
- GrpclbLoadBalancer(String serviceName, TransportManager<T> tm) {
- this.serviceName = serviceName;
- this.tm = tm;
- executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
- }
-
- @VisibleForTesting
- StreamObserver<LoadBalanceResponse> getLbResponseObserver() {
- synchronized (lock) {
- return lbResponseObserver;
- }
- }
-
- @VisibleForTesting
- RoundRobinServerList<T> getRoundRobinServerList() {
- synchronized (lock) {
- return roundRobinServerList;
- }
- }
-
- @Override
- public T pickTransport(Attributes affinity) {
- RoundRobinServerList<T> serverListCopy;
- synchronized (lock) {
- if (closed) {
- return tm.createFailingTransport(SHUTDOWN_STATUS);
- }
- if (directTransport != null) {
- return directTransport;
- }
- if (roundRobinServerList == null) {
- if (lastError != null) {
- return tm.createFailingTransport(lastError);
- }
- if (interimTransport == null) {
- interimTransport = tm.createInterimTransport();
- }
- return interimTransport.transport();
- }
- serverListCopy = roundRobinServerList;
- }
- return serverListCopy.getTransportForNextServer();
- }
-
- @Override
- public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
- Attributes attributes) {
- synchronized (lock) {
- if (closed) {
- return;
- }
- EquivalentAddressGroup newLbAddresses = resolvedServerInfoGroupsToEquivalentAddressGroup(
- updatedServers);
- if (!newLbAddresses.equals(lbAddresses)) {
- lbAddresses = newLbAddresses;
- connectToLb();
- }
- }
- updateRetainedTransports();
- }
-
- @GuardedBy("lock")
- private void connectToLb() {
- directTransport = null;
- if (closed) {
- return;
- }
- lbResponseObserver = null;
- Preconditions.checkNotNull(lbAddresses, "lbAddresses");
- // TODO(zhangkun83): LB servers may use an authority different from the service's.
- // getTransport() will need to add an argument for the authority.
- lbTransport = tm.getTransport(lbAddresses);
- startNegotiation();
- }
-
- @GuardedBy("lock")
- private void startNegotiation() {
- if (closed) {
- return;
- }
- Preconditions.checkState(lbTransport != null, "lbTransport must be available");
- logger.info("Starting LB negotiation");
- LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
- .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
- .setName(serviceName).build())
- .build();
- lbResponseObserver = new LbResponseObserver();
- sendLbRequest(lbTransport, initRequest);
- }
-
- @VisibleForTesting // to be mocked in tests
- @GuardedBy("lock")
- void sendLbRequest(T transport, LoadBalanceRequest request) {
- Channel channel = tm.makeChannel(transport);
- LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(channel);
- lbRequestWriter = stub.balanceLoad(lbResponseObserver);
- lbRequestWriter.onNext(request);
- }
-
- @Override
- public void handleNameResolutionError(Status error) {
- handleError(error.augmentDescription("Name resolution failed"));
- }
-
- @Override
- public void shutdown() {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (closed) {
- return;
- }
- closed = true;
- if (lbRequestWriter != null) {
- lbRequestWriter.onCompleted();
- }
- savedInterimTransport = interimTransport;
- interimTransport = null;
- executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(SHUTDOWN_STATUS);
- }
- }
-
- @Override
- public void handleTransportShutdown(EquivalentAddressGroup addressGroup, Status status) {
- handleError(status.augmentDescription("Transport to LB server closed"));
- synchronized (lock) {
- if (closed) {
- return;
- }
- if (addressGroup.equals(lbAddresses)) {
- connectToLb();
- }
- }
- }
-
- private void handleError(Status error) {
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- savedInterimTransport = interimTransport;
- interimTransport = null;
- lastError = error;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithError(error);
- }
- }
-
- private void updateRetainedTransports() {
- HashSet<EquivalentAddressGroup> addresses = new HashSet<EquivalentAddressGroup>();
- synchronized (lock) {
- if (lbAddresses != null) {
- addresses.add(lbAddresses);
- }
- if (servers != null) {
- for (SocketAddress addr : servers.keySet()) {
- addresses.add(new EquivalentAddressGroup(addr));
- }
- }
- }
- tm.updateRetainedTransports(addresses);
- }
-
- /**
- * Converts list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
- */
- private static EquivalentAddressGroup resolvedServerInfoGroupsToEquivalentAddressGroup(
- List<ResolvedServerInfoGroup> groupList) {
- List<SocketAddress> addrs = new ArrayList<SocketAddress>(groupList.size());
- for (ResolvedServerInfoGroup group : groupList) {
- for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
- addrs.add(srv.getAddress());
- }
- }
- return new EquivalentAddressGroup(addrs);
- }
-
- private class LbResponseObserver implements StreamObserver<LoadBalanceResponse> {
- @Override public void onNext(LoadBalanceResponse response) {
- logger.info("Got a LB response: " + response);
- // TODO(zhangkun83): make use of initialResponse
- // InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
- RoundRobinServerList.Builder<T> listBuilder = new RoundRobinServerList.Builder<T>(tm);
- ServerList serverList = response.getServerList();
- HashMap<SocketAddress, ResolvedServerInfo> newServerMap =
- new HashMap<SocketAddress, ResolvedServerInfo>();
- // TODO(zhangkun83): honor expiration_interval
- for (Server server : serverList.getServersList()) {
- if (server.getDropRequest()) {
- listBuilder.addSocketAddress(null);
- } else {
- try {
- InetSocketAddress address = new InetSocketAddress(
- InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
- listBuilder.addSocketAddress(address);
- // TODO(zhangkun83): fill the LB token to the attributes, and insert it to the
- // application RPCs.
- if (!newServerMap.containsKey(address)) {
- newServerMap.put(address, new ResolvedServerInfo(address, Attributes.EMPTY));
- }
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
- }
- final RoundRobinServerList<T> newRoundRobinServerList = listBuilder.build();
- if (newRoundRobinServerList.size() == 0) {
- // initialResponse and serverList are under a oneof group. If initialResponse is set,
- // serverList will be empty.
- return;
- }
- InterimTransport<T> savedInterimTransport;
- synchronized (lock) {
- if (lbResponseObserver != this) {
- // Make sure I am still the current stream.
- return;
- }
- roundRobinServerList = newRoundRobinServerList;
- servers = newServerMap;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- updateRetainedTransports();
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithRealTransports(new Supplier<T>() {
- @Override
- public T get() {
- return newRoundRobinServerList.getTransportForNextServer();
- }
- });
- }
- }
-
- @Override public void onError(Throwable error) {
- onStreamClosed(Status.fromThrowable(error)
- .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
- }
-
- @Override public void onCompleted() {
- onStreamClosed(Status.UNAVAILABLE.augmentDescription(
- "Stream to GRPCLB LoadBalancer was closed"));
- }
-
- private void onStreamClosed(Status status) {
- if (status.getCode() == Status.Code.UNIMPLEMENTED) {
- InterimTransport<T> savedInterimTransport;
- final T transport;
- // This LB transport doesn't seem to be an actual LB server, if the LB address comes
- // directly from NameResolver, just use it to serve normal RPCs.
- // TODO(zhangkun83): check if lbAddresses are from NameResolver after we start getting
- // lbAddresses from LoadBalanceResponse.
- synchronized (lock) {
- if (lbResponseObserver != this) {
- return;
- }
- directTransport = transport = lbTransport;
- savedInterimTransport = interimTransport;
- interimTransport = null;
- }
- if (savedInterimTransport != null) {
- savedInterimTransport.closeWithRealTransports(Suppliers.ofInstance(transport));
- }
- } else {
- handleError(status);
- synchronized (lock) {
- if (lbResponseObserver != this) {
- return;
- }
- // TODO(zhangkun83): apply back-off, otherwise this will spam the server continually
- // with requests if the server tends to fail it for any reason.
- // I am still the active LB stream. Reopen the stream.
- startNegotiation();
- }
- }
- }
- }
-}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java
deleted file mode 100644
index 226c06a..0000000
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.grpclb;
-
-import io.grpc.ExperimentalApi;
-import io.grpc.LoadBalancer;
-import io.grpc.TransportManager;
-
-/**
- * A factory for {@link LoadBalancer}s that uses the GRPCLB protocol.
- *
- * <p><b>Experimental:</b>This only works with the GRPCLB load-balancer service, which is not
- * available yet. Right now it's only good for internal testing. It's not feature-complete either,
- * so before using it, make sure you have read all the {@code TODO} comments in {@link
- * GrpclbLoadBalancer}.
- */
-@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1782")
-public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
-
- private static final GrpclbLoadBalancerFactory instance = new GrpclbLoadBalancerFactory();
-
- private GrpclbLoadBalancerFactory() {
- }
-
- public static GrpclbLoadBalancerFactory getInstance() {
- return instance;
- }
-
- @Override
- public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
- return new GrpclbLoadBalancer<T>(serviceName, tm);
- }
-}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
deleted file mode 100644
index 5e0d594..0000000
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/*
- * Copyright 2015, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.grpclb;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import com.google.common.base.Supplier;
-import com.google.protobuf.ByteString;
-import io.grpc.Attributes;
-import io.grpc.EquivalentAddressGroup;
-import io.grpc.ResolvedServerInfo;
-import io.grpc.ResolvedServerInfoGroup;
-import io.grpc.Status;
-import io.grpc.TransportManager;
-import io.grpc.TransportManager.InterimTransport;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link GrpclbLoadBalancer}. */
-@RunWith(JUnit4.class)
-public class GrpclbLoadBalancerTest {
-
- private static final String serviceName = "testlbservice";
-
- @Mock private TransportManager<Transport> mockTransportManager;
- @Mock private InterimTransport<Transport> interimTransport;
- @Mock private Transport interimTransportAsTransport;
- @Mock private Transport failingTransport;
- @Captor private ArgumentCaptor<Supplier<Transport>> transportSupplierCaptor;
-
- // The test subject
- private TestGrpclbLoadBalancer loadBalancer;
-
- // Current addresses of the LB server
- private EquivalentAddressGroup lbAddressGroup;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(mockTransportManager.createInterimTransport()).thenReturn(interimTransport);
- when(mockTransportManager.createFailingTransport(any(Status.class)))
- .thenReturn(failingTransport);
- when(interimTransport.transport()).thenReturn(interimTransportAsTransport);
- loadBalancer = new TestGrpclbLoadBalancer();
- }
-
- @Test
- public void balancing() throws Exception {
- List<ResolvedServerInfo> servers = createResolvedServerInfoList(4000, 4001);
-
- // Set up mocks
- List<Transport> transports = new ArrayList<Transport>(servers.size());
- for (ResolvedServerInfo server : servers) {
- Transport transport = mock(Transport.class, withSettings().name("Transport for " + server));
- transports.add(transport);
- when(mockTransportManager.getTransport(eq(new EquivalentAddressGroup(server.getAddress()))))
- .thenReturn(transport);
- }
-
- Transport pick0;
- Transport pick1;
- Transport pick2;
-
- // Pick before name resolved
- pick0 = loadBalancer.pickTransport(null);
-
- // Name resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- // Pick after name resolved
- pick1 = loadBalancer.pickTransport(null);
- pick2 = loadBalancer.pickTransport(null);
-
- // Both picks end up with interimTransport
- verify(mockTransportManager).createInterimTransport();
- assertSame(interimTransportAsTransport, pick0);
- assertSame(interimTransportAsTransport, pick1);
- assertSame(interimTransportAsTransport, pick2);
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate an initial LB response
- loadBalancer.getLbResponseObserver().onNext(
- LoadBalanceResponse.newBuilder().setInitialResponse(
- InitialLoadBalanceResponse.getDefaultInstance())
- .build());
-
- // Simulate that the LB server reponses, with servers 0, 1, 1
- List<ResolvedServerInfo> serverList1 = new ArrayList<ResolvedServerInfo>();
- Collections.addAll(serverList1, servers.get(0), servers.get(1), servers.get(1));
- assertNotNull(loadBalancer.getLbResponseObserver());
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList1));
-
- verify(mockTransportManager).updateRetainedTransports(eq(buildRetainedAddressSet(servers)));
-
- verify(interimTransport).closeWithRealTransports(transportSupplierCaptor.capture());
- assertSame(transports.get(0), transportSupplierCaptor.getValue().get());
- assertSame(transports.get(1), transportSupplierCaptor.getValue().get());
- assertSame(transports.get(1), transportSupplierCaptor.getValue().get());
- verify(mockTransportManager).getTransport(eq(buildAddressGroup(servers.get(0))));
- verify(mockTransportManager, times(2)).getTransport(eq(buildAddressGroup(servers.get(1))));
-
- // Pick beyond the end of the list. Go back to the beginning.
- pick0 = loadBalancer.pickTransport(null);
- assertSame(transports.get(0), pick0);
-
- // Only one LB request has ever been sent at this point
- assertEquals(0, loadBalancer.sentLbRequests.size());
- }
-
- @Test public void serverListUpdated() throws Exception {
- // Simulate the initial set of LB addresses resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate LB server responds a server list
- List<ResolvedServerInfo> serverList = createResolvedServerInfoList(4000, 4001);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- verify(mockTransportManager).updateRetainedTransports(eq(buildRetainedAddressSet(serverList)));
-
- // The server list is in effect
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
-
- // Simulate LB server responds another server list
- serverList = createResolvedServerInfoList(4002, 4003);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- verify(mockTransportManager).updateRetainedTransports(eq(buildRetainedAddressSet(serverList)));
-
- // The new list is in effect
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
- }
-
- @Test public void newLbAddressesResolved() throws Exception {
- // Simulate the initial set of LB addresses resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- EquivalentAddressGroup lbAddress1 = lbAddressGroup;
- verify(mockTransportManager).updateRetainedTransports(eq(Collections.singleton(lbAddress1)));
- verify(mockTransportManager).getTransport(eq(lbAddressGroup));
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate a second set of LB addresses resolved
- lbTransport = simulateLbAddressResolved(30002);
- EquivalentAddressGroup lbAddress2 = lbAddressGroup;
- assertNotEquals(lbAddress1, lbAddress2);
- verify(mockTransportManager).updateRetainedTransports(eq(Collections.singleton(lbAddress2)));
- verify(mockTransportManager).getTransport(eq(lbAddressGroup));
-
- // Another LB request is sent
- sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate that an identical set of LB addresses is resolved
- simulateLbAddressResolved(30002);
- EquivalentAddressGroup lbAddress3 = lbAddressGroup;
- assertEquals(lbAddress2, lbAddress3);
- verify(mockTransportManager).getTransport(eq(lbAddressGroup));
-
- // Only when LB address changes, getTransport is called.
- verify(mockTransportManager, times(2)).getTransport(any(EquivalentAddressGroup.class));
- }
-
- @Test public void lbStreamErrorAfterResponse() throws Exception {
- // Simulate the initial set of LB addresses resolved
- simulateLbAddressResolved(30001);
-
- // An LB request is sent
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
-
- // Simulate LB server responds a server list
- List<ResolvedServerInfo> serverList = createResolvedServerInfoList(4000, 4001);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
-
- // Simulate a stream error
- loadBalancer.getLbResponseObserver().onError(
- Status.UNAVAILABLE.withDescription("simulated").asException());
-
- // Another LB request is sent
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
-
- // Simulate LB server responds a new list
- serverList = createResolvedServerInfoList(4002, 4003);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
- }
-
- @Test public void lbStreamErrorWithoutResponse() throws Exception {
- // Simulate the initial set of LB addresses resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- // First pick, will be pending
- Transport pick = loadBalancer.pickTransport(null);
- assertSame(interimTransportAsTransport, pick);
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate that the LB stream fails
- loadBalancer.getLbResponseObserver().onError(
- Status.UNAVAILABLE.withDescription("simulated").asException());
-
- // The pending pick will fail
- verifyInterimTransportClosedWithError(Status.Code.UNAVAILABLE, "simulated",
- "Stream to GRPCLB LoadBalancer had an error");
-
- // Another LB request is sent
- sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
-
- // Round-robin list not available at this point
- assertNull(loadBalancer.getRoundRobinServerList());
- }
-
- @Test public void lbStreamUnimplemented() throws Exception {
- // Simulate the initial set of LB addresses resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- // First pick, will be pending
- Transport pick = loadBalancer.pickTransport(null);
- assertSame(interimTransportAsTransport, pick);
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate that the LB stream fails with UNIMPLEMENTED
- loadBalancer.getLbResponseObserver().onError(Status.UNIMPLEMENTED.asException());
-
- // The pending pick will succeed with lbTransport
- verify(interimTransport).closeWithRealTransports(transportSupplierCaptor.capture());
- assertSame(lbTransport, transportSupplierCaptor.getValue().get());
-
- // Subsequent picks will also get lbTransport
- pick = loadBalancer.pickTransport(null);
- assertSame(lbTransport, pick);
-
- // Round-robin list NOT available at this point
- assertNull(loadBalancer.getRoundRobinServerList());
-
- verify(mockTransportManager, times(1)).getTransport(eq(lbAddressGroup));
-
- // Didn't send additional requests other than the initial one
- assertEquals(0, loadBalancer.sentLbRequests.size());
-
- // Shut down the transport
- loadBalancer.handleTransportShutdown(lbAddressGroup,
- Status.UNAVAILABLE.withDescription("simulated"));
-
- // Subsequent pick will result in a failing transport because an error has occurred
- pick = loadBalancer.pickTransport(null);
- assertSame(failingTransport, pick);
- verifyCreateFailingTransport(Status.Code.UNAVAILABLE,
- "simulated", "Transport to LB server closed");
-
- // Will get another lbTransport, and send another LB request
- verify(mockTransportManager, times(2)).getTransport(eq(lbAddressGroup));
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
- }
-
- @Test public void lbConnectionClosedAfterResponse() throws Exception {
- // Simulate the initial set of LB addresses resolved
- simulateLbAddressResolved(30001);
-
- // An LB request is sent
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
-
- // Simulate LB server responds a server list
- List<ResolvedServerInfo> serverList = createResolvedServerInfoList(4000, 4001);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
-
- // Simulate transport closes
- loadBalancer.handleTransportShutdown(lbAddressGroup, Status.UNAVAILABLE);
-
- // Will get another transport
- verify(mockTransportManager, times(2)).getTransport(eq(lbAddressGroup));
-
- // Another LB request is sent
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
- }
-
- @Test public void lbConnectionClosedWithoutResponse() throws Exception {
- // Simulate the initial set of LB addresses resolved
- Transport lbTransport = simulateLbAddressResolved(30001);
-
- // First pick, will be pending
- Transport pick = loadBalancer.pickTransport(null);
- assertSame(interimTransportAsTransport, pick);
-
- // An LB request is sent
- SendLbRequestArgs sentLbRequest = loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS);
- assertNotNull(sentLbRequest);
- assertSame(lbTransport, sentLbRequest.transport);
-
- // Simulate that the transport closed
- loadBalancer.handleTransportShutdown(
- lbAddressGroup, Status.UNAVAILABLE.withDescription("simulated"));
-
- // The interim transport will close with error
- verifyInterimTransportClosedWithError(Status.Code.UNAVAILABLE,
- "simulated", "Transport to LB server closed");
-
- // Will try to get another transport
- verify(mockTransportManager, times(2)).getTransport(eq(lbAddressGroup));
-
- // Round-robin list not available at this point
- assertNull(loadBalancer.getRoundRobinServerList());
- }
-
- @Test public void nameResolutionFailed() throws Exception {
- Transport pick0 = loadBalancer.pickTransport(null);
- assertSame(interimTransportAsTransport, pick0);
-
- loadBalancer.handleNameResolutionError(Status.UNAVAILABLE);
- verifyInterimTransportClosedWithError(Status.Code.UNAVAILABLE, "Name resolution failed");
- Transport pick1 = loadBalancer.pickTransport(null);
- assertSame(failingTransport, pick1);
- verifyCreateFailingTransport(Status.Code.UNAVAILABLE, "Name resolution failed");
- }
-
- @Test public void shutdown() throws Exception {
- // Simulate the initial set of LB addresses resolved
- simulateLbAddressResolved(30001);
-
- // An LB request is sent
- assertNotNull(loadBalancer.sentLbRequests.poll(1000, TimeUnit.SECONDS));
-
- // Simulate LB server responds a server list
- List<ResolvedServerInfo> serverList = createResolvedServerInfoList(4000, 4001);
- loadBalancer.getLbResponseObserver().onNext(buildLbResponse(serverList));
- verify(mockTransportManager).getTransport(eq(lbAddressGroup));
- assertEquals(buildRoundRobinList(serverList), loadBalancer.getRoundRobinServerList().getList());
-
- // Shut down the LoadBalancer
- loadBalancer.shutdown();
-
- // Simulate a stream error
- loadBalancer.getLbResponseObserver().onError(Status.CANCELLED.asException());
-
- // Won't send a request
- assertEquals(0, loadBalancer.sentLbRequests.size());
-
- // Simulate transport closure
- loadBalancer.handleTransportShutdown(lbAddressGroup, Status.CANCELLED);
-
- // Won't get a new transport. getTransport() was call once before.
- verify(mockTransportManager).getTransport(any(EquivalentAddressGroup.class));
- }
-
- /**
- * Simulates a single LB address is resolved and sets up lbAddressGroup. Returns the transport
- * to LB.
- */
- private Transport simulateLbAddressResolved(int lbPort) {
- ResolvedServerInfo lbServerInfo = new ResolvedServerInfo(
- new InetSocketAddress("127.0.0.1", lbPort), Attributes.EMPTY);
- lbAddressGroup = buildAddressGroup(lbServerInfo);
- Transport lbTransport = new Transport();
- when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport);
- loadBalancer.handleResolvedAddresses(
- Collections.singletonList(ResolvedServerInfoGroup.builder().add(lbServerInfo).build()),
- Attributes.EMPTY);
- verify(mockTransportManager).getTransport(eq(lbAddressGroup));
- return lbTransport;
- }
-
- private HashSet<EquivalentAddressGroup> buildRetainedAddressSet(
- Collection<ResolvedServerInfo> serverInfos) {
- HashSet<EquivalentAddressGroup> addrs = new HashSet<EquivalentAddressGroup>();
- for (ResolvedServerInfo serverInfo : serverInfos) {
- addrs.add(new EquivalentAddressGroup(serverInfo.getAddress()));
- }
- addrs.add(lbAddressGroup);
- return addrs;
- }
-
- /**
- * A slightly modified {@link GrpclbLoadBalancerTest} that saves LB requests in a queue instead of
- * sending them out.
- */
- private class TestGrpclbLoadBalancer extends GrpclbLoadBalancer<Transport> {
- final LinkedBlockingQueue<SendLbRequestArgs> sentLbRequests =
- new LinkedBlockingQueue<SendLbRequestArgs>();
-
- TestGrpclbLoadBalancer() {
- super(serviceName, mockTransportManager);
- }
-
- @Override void sendLbRequest(Transport transport, LoadBalanceRequest request) {
- sentLbRequests.add(new SendLbRequestArgs(transport, request));
- }
- }
-
- private static class SendLbRequestArgs {
- final Transport transport;
- final LoadBalanceRequest request;
-
- SendLbRequestArgs(Transport transport, LoadBalanceRequest request) {
- this.transport = transport;
- this.request = request;
- }
- }
-
- private static LoadBalanceResponse buildLbResponse(List<ResolvedServerInfo> servers) {
- ServerList.Builder serverListBuilder = ServerList.newBuilder();
- for (ResolvedServerInfo server : servers) {
- InetSocketAddress addr = (InetSocketAddress) server.getAddress();
- serverListBuilder.addServers(Server.newBuilder()
- .setIpAddress(ByteString.copyFrom(addr.getAddress().getAddress()))
- .setPort(addr.getPort())
- .build());
- }
- return LoadBalanceResponse.newBuilder()
- .setServerList(serverListBuilder.build())
- .build();
- }
-
- private static EquivalentAddressGroup buildAddressGroup(ResolvedServerInfo serverInfo) {
- return new EquivalentAddressGroup(serverInfo.getAddress());
- }
-
- private static List<EquivalentAddressGroup> buildRoundRobinList(
- List<ResolvedServerInfo> serverList) {
- ArrayList<EquivalentAddressGroup> roundRobinList = new ArrayList<EquivalentAddressGroup>();
- for (ResolvedServerInfo serverInfo : serverList) {
- roundRobinList.add(new EquivalentAddressGroup(serverInfo.getAddress()));
- }
- return roundRobinList;
- }
-
- private static List<ResolvedServerInfo> createResolvedServerInfoList(int ... ports) {
- List<ResolvedServerInfo> result = new ArrayList<ResolvedServerInfo>(ports.length);
- for (int port : ports) {
- InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", port);
- result.add(new ResolvedServerInfo(inetSocketAddress, Attributes.EMPTY));
- }
- return result;
- }
-
- private void verifyInterimTransportClosedWithError(
- Status.Code statusCode, String ... descriptions) {
- ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
- verify(interimTransport).closeWithError(statusCaptor.capture());
- assertError(statusCaptor.getValue(), statusCode, descriptions);
- }
-
- private void verifyCreateFailingTransport(
- Status.Code statusCode, String ... descriptions) {
- ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
- verify(mockTransportManager).createFailingTransport(statusCaptor.capture());
- assertError(statusCaptor.getValue(), statusCode, descriptions);
- }
-
-
- private static void assertError(Status s, Status.Code statusCode, String ... descriptions) {
- assertEquals(statusCode, s.getCode());
- for (String desc : descriptions) {
- assertTrue("'" + s.getDescription() + "' contains '" + desc + "'",
- s.getDescription().contains(desc));
- }
- }
-
- public static class Transport {}
-}