core: abstract channel builder to accept LoadBalancer2 (#2583)

If a LoadBalancer2 is passed in, the builder will create ManagedChannelImpl2 instead of ManagedChannelImpl. This allows us to test the LBv2 classes on a large scale.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index de22269..7ccfaa7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -46,6 +46,8 @@
 import io.grpc.DecompressorRegistry;
 import io.grpc.Internal;
 import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer2;
+import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import io.grpc.NameResolverProvider;
@@ -94,6 +96,7 @@
 
   @Nullable
   private Executor executor;
+
   private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
 
   private final String target;
@@ -114,6 +117,9 @@
   private LoadBalancer.Factory loadBalancerFactory;
 
   @Nullable
+  private LoadBalancer2.Factory loadBalancer2Factory;
+
+  @Nullable
   private DecompressorRegistry decompressorRegistry;
 
   @Nullable
@@ -204,6 +210,17 @@
     return thisT();
   }
 
+  /**
+   * DO NOT CALL THIS, as its argument type will soon be renamed.
+   */
+  public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) {
+    Preconditions.checkState(directServerAddress == null,
+        "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+        directServerAddress);
+    this.loadBalancer2Factory = loadBalancerFactory;
+    return thisT();
+  }
+
   @Override
   public final T decompressorRegistry(DecompressorRegistry registry) {
     this.decompressorRegistry = registry;
@@ -266,7 +283,7 @@
   }
 
   @Override
-  public ManagedChannelImpl build() {
+  public ManagedChannel build() {
     ClientTransportFactory transportFactory = buildTransportFactory();
     if (authorityOverride != null) {
       transportFactory = new AuthorityOverridingTransportFactory(
@@ -279,20 +296,39 @@
       // getResource(), then this shouldn't be a problem unless called on the UI thread.
       nameResolverFactory = NameResolverProvider.asFactory();
     }
-    return new ManagedChannelImpl(
-        target,
-        // TODO(carl-mastrangelo): Allow clients to pass this in
-        new ExponentialBackoffPolicy.Provider(),
-        nameResolverFactory,
-        getNameResolverParams(),
-        firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
-        transportFactory,
-        firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
-        firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
-        GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
-        executor, userAgent, interceptors,
-        firstNonNull(statsFactory,
-            firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    if (loadBalancer2Factory != null) {
+      return new ManagedChannelImpl2(
+          target,
+          // TODO(carl-mastrangelo): Allow clients to pass this in
+          new ExponentialBackoffPolicy.Provider(),
+          nameResolverFactory,
+          getNameResolverParams(),
+          loadBalancer2Factory,
+          transportFactory,
+          firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+          firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+          SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
+          getExecutorPool(executor),
+          SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
+          GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+          userAgent, interceptors, firstNonNull(statsFactory,
+              firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    } else {
+      return new ManagedChannelImpl(
+          target,
+          // TODO(carl-mastrangelo): Allow clients to pass this in
+          new ExponentialBackoffPolicy.Provider(),
+          nameResolverFactory,
+          getNameResolverParams(),
+          firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()),
+          transportFactory,
+          firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+          firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+          GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis,
+          executor, userAgent, interceptors,
+          firstNonNull(statsFactory,
+              firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE)));
+    }
   }
 
   /**
@@ -311,6 +347,24 @@
     return Attributes.EMPTY;
   }
 
+  private static ObjectPool<? extends Executor> getExecutorPool(final @Nullable Executor executor) {
+    if (executor != null) {
+      return new ObjectPool<Executor>() {
+        @Override
+        public Executor getObject() {
+          return executor;
+        }
+
+        @Override
+        public Executor returnObject(Object returned) {
+          return null;
+        }
+      };
+    } else {
+      return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+    }
+  }
+
   private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
     final ClientTransportFactory factory;
     final String authorityOverride;
diff --git a/core/src/main/java/io/grpc/internal/SharedResourcePool.java b/core/src/main/java/io/grpc/internal/SharedResourcePool.java
new file mode 100644
index 0000000..fbcef43
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/SharedResourcePool.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+/**
+ * An ObjectPool backed by a {@link SharedResourceHolder.Resource}.
+ */
+public final class SharedResourcePool<T> implements ObjectPool<T> {
+  private final SharedResourceHolder.Resource<T> resource;
+
+  private SharedResourcePool(SharedResourceHolder.Resource<T> resource) {
+    this.resource = resource;
+  }
+
+  public static <T> SharedResourcePool<T> forResource(SharedResourceHolder.Resource<T> resource) {
+    return new SharedResourcePool<T>(resource);
+  }
+
+  @Override
+  public T getObject() {
+    return SharedResourceHolder.get(resource);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public T returnObject(Object object) {
+    SharedResourceHolder.release(resource, (T) object);
+    return null;
+  }
+}