core: abstract channel builder to accept LoadBalancer2 (#2583)

If a LoadBalancer2 is passed in, the builder will create ManagedChannelImpl2 instead of ManagedChannelImpl. This allows us to test the LBv2 classes on a large scale.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index de22269..7ccfaa7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -46,6 +46,8 @@
 import io.grpc.DecompressorRegistry;
 import io.grpc.Internal;
 import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer2;
+import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import io.grpc.NameResolverProvider;
@@ -94,6 +96,7 @@
 
   @Nullable
   private Executor executor;
+
   private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
 
   private final String target;
@@ -114,6 +117,9 @@
   private LoadBalancer.Factory loadBalancerFactory;
 
   @Nullable
+  private LoadBalancer2.Factory loadBalancer2Factory;
+
+  @Nullable
   private DecompressorRegistry decompressorRegistry;
 
   @Nullable
@@ -204,6 +210,17 @@
     return thisT();
   }
 
+  /**
+   * DO NOT CALL THIS, as its argument type will soon be renamed.
+   */
+  public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) {
+    Preconditions.checkState(directServerAddress == null,
+        "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+        directServerAddress);
+    this.loadBalancer2Factory = loadBalancerFactory;
+    return thisT();
+  }
+
   @Override
   public final T decompressorRegistry(DecompressorRegistry registry) {
     this.decompressorRegistry = registry;
@@ -266,7 +283,7 @@
   }
 
   @Override
-  public ManagedChannelImpl build() {
+  public ManagedChannel build() {
     ClientTransportFactory transportFactory = buildTransportFactory();
     if (authorityOverride != null) {
       transportFactory = new AuthorityOverridingTransportFactory(
@@ -279,20 +296,39 @@
       // getResource(), then this shouldn't be a problem unless called on the UI thread.
       nameResolverFactory = NameResolverProvider.asFactory();
     }
-    return new ManagedChannelImpl(
-        target,
-        // TODO(carl-mastrangelo): Allow clients to pass this in
-        new ExponentialBackoffPolicy.Provider(),
-        nameResolverFactory,
-        getNameResolverParams(),
-        firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
-        transportFactory,
-        firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
-        firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
-        GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
-        executor, userAgent, interceptors,
-        firstNonNull(statsFactory,
-            firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    if (loadBalancer2Factory != null) {
+      return new ManagedChannelImpl2(
+          target,
+          // TODO(carl-mastrangelo): Allow clients to pass this in
+          new ExponentialBackoffPolicy.Provider(),
+          nameResolverFactory,
+          getNameResolverParams(),
+          loadBalancer2Factory,
+          transportFactory,
+          firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+          firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+          SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
+          getExecutorPool(executor),
+          SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
+          GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+          userAgent, interceptors, firstNonNull(statsFactory,
+              firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    } else {
+      return new ManagedChannelImpl(
+          target,
+          // TODO(carl-mastrangelo): Allow clients to pass this in
+          new ExponentialBackoffPolicy.Provider(),
+          nameResolverFactory,
+          getNameResolverParams(),
+          firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
+          transportFactory,
+          firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+          firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+          GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+          executor, userAgent, interceptors,
+          firstNonNull(statsFactory,
+              firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    }
   }
 
   /**
@@ -311,6 +347,24 @@
     return Attributes.EMPTY;
   }
 
+  private static ObjectPool<? extends Executor> getExecutorPool(final @Nullable Executor executor) {
+    if (executor != null) {
+      return new ObjectPool<Executor>() {
+        @Override
+        public Executor getObject() {
+          return executor;
+        }
+
+        @Override
+        public Executor returnObject(Object returned) {
+          return null;
+        }
+      };
+    } else {
+      return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+    }
+  }
+
   private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
     final ClientTransportFactory factory;
     final String authorityOverride;