core: allow application to provide all threads - inprocess channel

diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 5a86ab5..31456e7 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -16,7 +16,8 @@
 
 package io.grpc.inprocess;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import io.grpc.ExperimentalApi;
 import io.grpc.Internal;
 import io.grpc.internal.AbstractManagedChannelImplBuilder;
@@ -28,6 +29,7 @@
 import java.net.SocketAddress;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * Builder for a channel that issues in-process requests. Clients identify the in-process server by
@@ -65,10 +67,11 @@
   }
 
   private final String name;
+  private ScheduledExecutorService scheduledExecutorService;
 
   private InProcessChannelBuilder(String name) {
     super(new InProcessSocketAddress(name), "localhost");
-    this.name = Preconditions.checkNotNull(name, "name");
+    this.name = checkNotNull(name, "name");
     // In-process transport should not record its traffic to the stats module.
     // https://github.com/grpc/grpc-java/issues/2284
     setStatsRecordStartedRpcs(false);
@@ -115,25 +118,44 @@
     return this;
   }
 
+  /**
+   * Provides a custom scheduled executor service.
+   *
+   * <p>It's an optional parameter. If the user has not provided a scheduled executor service when
+   * the channel is built, the builder will use a static cached thread pool.
+   *
+   * @return this
+   *
+   * @since 1.11.0
+   */
+  public InProcessChannelBuilder scheduledExecutorService(
+      ScheduledExecutorService scheduledExecutorService) {
+    this.scheduledExecutorService =
+        checkNotNull(scheduledExecutorService, "scheduledExecutorService");
+    return this;
+  }
+
   @Override
   @Internal
   protected ClientTransportFactory buildTransportFactory() {
-    return new InProcessClientTransportFactory(name);
+    return new InProcessClientTransportFactory(name, scheduledExecutorService);
   }
 
   /**
    * Creates InProcess transports. Exposed for internal use, as it should be private.
    */
-  @Internal
   static final class InProcessClientTransportFactory implements ClientTransportFactory {
     private final String name;
-    private final ScheduledExecutorService timerService =
-        SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
-
+    private final ScheduledExecutorService timerService;
+    private final boolean useSharedTimer;
     private boolean closed;
 
-    private InProcessClientTransportFactory(String name) {
+    private InProcessClientTransportFactory(
+        String name, @Nullable ScheduledExecutorService scheduledExecutorService) {
       this.name = name;
+      useSharedTimer = scheduledExecutorService == null;
+      timerService = useSharedTimer
+          ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
     }
 
     @Override
@@ -156,7 +178,9 @@
         return;
       }
       closed = true;
-      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
+      if (useSharedTimer) {
+        SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
+      }
     }
   }
 }
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
index df94843..5859c25 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
@@ -18,14 +18,11 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ServerStreamTracer;
 import io.grpc.internal.InternalServer;
 import io.grpc.internal.ObjectPool;
 import io.grpc.internal.ServerListener;
 import io.grpc.internal.ServerTransportListener;
-import io.grpc.internal.SharedResourceHolder.Resource;
-import io.grpc.internal.SharedResourcePool;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -47,7 +44,7 @@
   private final List<ServerStreamTracer.Factory> streamTracerFactories;
   private ServerListener listener;
   private boolean shutdown;
-  /** Expected to be a SharedResourcePool except in testing. */
+  /** Defaults to be a SharedResourcePool. */
   private final ObjectPool<ScheduledExecutorService> schedulerPool;
   /**
    * Only used to make sure the scheduler has at least one reference. Since child transports can
@@ -56,13 +53,6 @@
   private ScheduledExecutorService scheduler;
 
   InProcessServer(
-      String name, Resource<ScheduledExecutorService> schedulerResource,
-      List<ServerStreamTracer.Factory> streamTracerFactories) {
-    this(name, SharedResourcePool.forResource(schedulerResource), streamTracerFactories);
-  }
-
-  @VisibleForTesting
-  InProcessServer(
       String name, ObjectPool<ScheduledExecutorService> schedulerPool,
       List<ServerStreamTracer.Factory> streamTracerFactories) {
     this.name = name;
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index 3aa889b..82f5452 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -16,14 +16,20 @@
 
 package io.grpc.inprocess;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Preconditions;
 import io.grpc.ExperimentalApi;
 import io.grpc.ServerStreamTracer;
 import io.grpc.internal.AbstractServerImplBuilder;
+import io.grpc.internal.FixedObjectPool;
 import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ObjectPool;
+import io.grpc.internal.SharedResourcePool;
 import java.io.File;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -86,6 +92,8 @@
   }
 
   private final String name;
+  private ObjectPool<ScheduledExecutorService> schedulerPool =
+      SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
 
   private InProcessServerBuilder(String name) {
     this.name = Preconditions.checkNotNull(name, "name");
@@ -98,10 +106,27 @@
     handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS);
   }
 
+  /**
+   * Provides a custom scheduled executor service.
+   *
+   * <p>It's an optional parameter. If the user has not provided a scheduled executor service when
+   * the channel is built, the builder will use a static cached thread pool.
+   *
+   * @return this
+   *
+   * @since 1.11.0
+   */
+  public InProcessServerBuilder scheduledExecutorService(
+      ScheduledExecutorService scheduledExecutorService) {
+    schedulerPool = new FixedObjectPool<ScheduledExecutorService>(
+        checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
+    return this;
+  }
+
   @Override
   protected InProcessServer buildTransportServer(
       List<ServerStreamTracer.Factory> streamTracerFactories) {
-    return new InProcessServer(name, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
+    return new InProcessServer(name, schedulerPool, streamTracerFactories);
   }
 
   @Override
diff --git a/core/src/main/java/io/grpc/internal/FixedObjectPool.java b/core/src/main/java/io/grpc/internal/FixedObjectPool.java
index f266732..b5feb5f 100644
--- a/core/src/main/java/io/grpc/internal/FixedObjectPool.java
+++ b/core/src/main/java/io/grpc/internal/FixedObjectPool.java
@@ -21,7 +21,7 @@
 /**
  * An object pool that always returns the same instance and does nothing when returning the object.
  */
-final class FixedObjectPool<T> implements ObjectPool<T> {
+public final class FixedObjectPool<T> implements ObjectPool<T> {
   private final T object;
 
   public FixedObjectPool(T object) {
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java
new file mode 100644
index 0000000..1ba5e58
--- /dev/null
+++ b/core/src/test/java/io/grpc/inprocess/InProcessChannelBuilderTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.inprocess;
+
+import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
+import static org.junit.Assert.assertSame;
+
+import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.SharedResourceHolder;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link InProcessChannelBuilder}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessChannelBuilderTest {
+  @Test
+  public void scheduledExecutorService_default() {
+    InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
+    ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
+    assertSame(
+        SharedResourceHolder.get(TIMER_SERVICE),
+        clientTransportFactory.getScheduledExecutorService());
+
+    SharedResourceHolder.release(
+        TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
+    clientTransportFactory.close();
+  }
+
+  @Test
+  public void scheduledExecutorService_custom() {
+    InProcessChannelBuilder builder = InProcessChannelBuilder.forName("foo");
+    ScheduledExecutorService scheduledExecutorService =
+        new FakeClock().getScheduledExecutorService();
+
+    InProcessChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
+    assertSame(builder, builder1);
+
+    ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
+
+    assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());
+
+    clientTransportFactory.close();
+  }
+}
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
index 714b557..df11f6b 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
@@ -16,10 +16,18 @@
 
 package io.grpc.inprocess;
 
+import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 
+import io.grpc.ServerStreamTracer.Factory;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.ObjectPool;
+import io.grpc.internal.SharedResourcePool;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -42,4 +50,40 @@
 
     assertNotEquals(name1, name2);
   }
+
+  @Test
+  public void scheduledExecutorService_default() {
+    InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
+    InProcessServer server = builder.buildTransportServer(new ArrayList<Factory>());
+
+    ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
+        server.getScheduledExecutorServicePool();
+    ObjectPool<ScheduledExecutorService> expectedPool =
+        SharedResourcePool.forResource(TIMER_SERVICE);
+
+    ScheduledExecutorService expected = expectedPool.getObject();
+    ScheduledExecutorService actual = scheduledExecutorServicePool.getObject();
+    assertSame(expected, actual);
+
+    expectedPool.returnObject(expected);
+    scheduledExecutorServicePool.returnObject(actual);
+  }
+
+  @Test
+  public void scheduledExecutorService_custom() {
+    InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
+    ScheduledExecutorService scheduledExecutorService =
+        new FakeClock().getScheduledExecutorService();
+
+    InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
+    assertSame(builder, builder1);
+
+    InProcessServer server = builder1.buildTransportServer(new ArrayList<Factory>());
+    ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
+        server.getScheduledExecutorServicePool();
+
+    assertSame(scheduledExecutorService, scheduledExecutorServicePool.getObject());
+
+    scheduledExecutorServicePool.returnObject(scheduledExecutorService);
+  }
 }
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
index 1bf0858..6d196c8 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java
@@ -24,6 +24,7 @@
 import io.grpc.internal.ServerListener;
 import io.grpc.internal.ServerTransport;
 import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.SharedResourcePool;
 import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Test;
@@ -36,7 +37,7 @@
   @Test
   public void getPort_notStarted() throws Exception {
     InProcessServer s =
-        new InProcessServer("name", GrpcUtil.TIMER_SERVICE,
+        new InProcessServer("name", SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
             Collections.<ServerStreamTracer.Factory>emptyList());
 
     Truth.assertThat(s.getPort()).isEqualTo(-1);
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
index 2c1d671..d01ce4c 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
@@ -20,6 +20,7 @@
 import io.grpc.internal.GrpcUtil;
 import io.grpc.internal.InternalServer;
 import io.grpc.internal.ManagedClientTransport;
+import io.grpc.internal.SharedResourcePool;
 import io.grpc.internal.testing.AbstractTransportTest;
 import java.util.List;
 import org.junit.runner.RunWith;
@@ -34,7 +35,9 @@
 
   @Override
   protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
-    return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE, streamTracerFactories);
+    return new InProcessServer(
+        TRANSPORT_NAME,
+        SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), streamTracerFactories);
   }
 
   @Override