core: abstract channel builder to accept LoadBalancer2 (#2583)
If a LoadBalancer2 is passed in, the builder will create ManagedChannelImpl2 instead of ManagedChannelImpl. This allows us to test the LBv2 classes on a large scale.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index de22269..7ccfaa7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -46,6 +46,8 @@
import io.grpc.DecompressorRegistry;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer2;
+import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
@@ -94,6 +96,7 @@
@Nullable
private Executor executor;
+
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
private final String target;
@@ -114,6 +117,9 @@
private LoadBalancer.Factory loadBalancerFactory;
@Nullable
+ private LoadBalancer2.Factory loadBalancer2Factory;
+
+ @Nullable
private DecompressorRegistry decompressorRegistry;
@Nullable
@@ -204,6 +210,17 @@
return thisT();
}
+ /**
+ * DO NOT CALL THIS, as its argument type will soon be renamed.
+ */
+ public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) {
+ Preconditions.checkState(directServerAddress == null,
+ "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+ directServerAddress);
+ this.loadBalancer2Factory = loadBalancerFactory;
+ return thisT();
+ }
+
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
this.decompressorRegistry = registry;
@@ -266,7 +283,7 @@
}
@Override
- public ManagedChannelImpl build() {
+ public ManagedChannel build() {
ClientTransportFactory transportFactory = buildTransportFactory();
if (authorityOverride != null) {
transportFactory = new AuthorityOverridingTransportFactory(
@@ -279,20 +296,39 @@
// getResource(), then this shouldn't be a problem unless called on the UI thread.
nameResolverFactory = NameResolverProvider.asFactory();
}
- return new ManagedChannelImpl(
- target,
- // TODO(carl-mastrangelo): Allow clients to pass this in
- new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory,
- getNameResolverParams(),
- firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
- transportFactory,
- firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
- firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
- GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
- executor, userAgent, interceptors,
- firstNonNull(statsFactory,
- firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ if (loadBalancer2Factory != null) {
+ return new ManagedChannelImpl2(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ nameResolverFactory,
+ getNameResolverParams(),
+ loadBalancer2Factory,
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
+ getExecutorPool(executor),
+ SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
+ GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+ userAgent, interceptors, firstNonNull(statsFactory,
+ firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ } else {
+ return new ManagedChannelImpl(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ nameResolverFactory,
+ getNameResolverParams(),
+ firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+ executor, userAgent, interceptors,
+ firstNonNull(statsFactory,
+ firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+ }
}
/**
@@ -311,6 +347,24 @@
return Attributes.EMPTY;
}
+ private static ObjectPool<? extends Executor> getExecutorPool(final @Nullable Executor executor) {
+ if (executor != null) {
+ return new ObjectPool<Executor>() {
+ @Override
+ public Executor getObject() {
+ return executor;
+ }
+
+ @Override
+ public Executor returnObject(Object returned) {
+ return null;
+ }
+ };
+ } else {
+ return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+ }
+ }
+
private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
final ClientTransportFactory factory;
final String authorityOverride;