core: abstract channel builder to accept LoadBalancer2 (#2583)
If a LoadBalancer2 is passed in, the builder will create ManagedChannelImpl2 instead of ManagedChannelImpl. This allows us to test the LBv2 classes on a large scale.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index de22269..7ccfaa7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -46,6 +46,8 @@
import io.grpc.DecompressorRegistry;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer2;
+import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
@@ -94,6 +96,7 @@
@Nullable
private Executor executor;
+
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
private final String target;
@@ -114,6 +117,9 @@
private LoadBalancer.Factory loadBalancerFactory;
@Nullable
+ private LoadBalancer2.Factory loadBalancer2Factory;
+
+ @Nullable
private DecompressorRegistry decompressorRegistry;
@Nullable
@@ -204,6 +210,17 @@
return thisT();
}
+ /**
+ * DO NOT CALL THIS, as its argument type will soon be renamed.
+ */
+ public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) {
+ Preconditions.checkState(directServerAddress == null,
+ "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+ directServerAddress);
+ this.loadBalancer2Factory = loadBalancerFactory;
+ return thisT();
+ }
+
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
this.decompressorRegistry = registry;
@@ -266,7 +283,7 @@
}
@Override
- public ManagedChannelImpl build() {
+ public ManagedChannel build() {
ClientTransportFactory transportFactory = buildTransportFactory();
if (authorityOverride != null) {
transportFactory = new AuthorityOverridingTransportFactory(
@@ -279,20 +296,39 @@
// getResource(), then this shouldn't be a problem unless called on the UI thread.
nameResolverFactory = NameResolverProvider.asFactory();
}
- return new ManagedChannelImpl(
- target,
- // TODO(carl-mastrangelo): Allow clients to pass this in
- new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory,
- getNameResolverParams(),
- firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
- transportFactory,
- firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
- firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
- GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
- executor, userAgent, interceptors,
- firstNonNull(statsFactory,
- firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ if (loadBalancer2Factory != null) {
+ return new ManagedChannelImpl2(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ nameResolverFactory,
+ getNameResolverParams(),
+ loadBalancer2Factory,
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
+ getExecutorPool(executor),
+ SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
+ GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+ userAgent, interceptors, firstNonNull(statsFactory,
+ firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ } else {
+ return new ManagedChannelImpl(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ nameResolverFactory,
+ getNameResolverParams(),
+ firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+ executor, userAgent, interceptors,
+ firstNonNull(statsFactory,
+ firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ }
}
/**
@@ -311,6 +347,24 @@
return Attributes.EMPTY;
}
+ private static ObjectPool<? extends Executor> getExecutorPool(final @Nullable Executor executor) {
+ if (executor != null) {
+ return new ObjectPool<Executor>() {
+ @Override
+ public Executor getObject() {
+ return executor;
+ }
+
+ @Override
+ public Executor returnObject(Object returned) {
+ return null;
+ }
+ };
+ } else {
+ return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+ }
+ }
+
private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
final ClientTransportFactory factory;
final String authorityOverride;
diff --git a/core/src/main/java/io/grpc/internal/SharedResourcePool.java b/core/src/main/java/io/grpc/internal/SharedResourcePool.java
new file mode 100644
index 0000000..fbcef43
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/SharedResourcePool.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+/**
+ * An ObjectPool backed by a {@link SharedResourceHolder.Resource}.
+ */
+public final class SharedResourcePool<T> implements ObjectPool<T> {
+ private final SharedResourceHolder.Resource<T> resource;
+
+ private SharedResourcePool(SharedResourceHolder.Resource<T> resource) {
+ this.resource = resource;
+ }
+
+ public static <T> SharedResourcePool<T> forResource(SharedResourceHolder.Resource<T> resource) {
+ return new SharedResourcePool<T>(resource);
+ }
+
+ @Override
+ public T getObject() {
+ return SharedResourceHolder.get(resource);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T returnObject(Object object) {
+ SharedResourceHolder.release(resource, (T) object);
+ return null;
+ }
+}