core: Channel uses transport's ScheduledExecutorService

Coupled with the similar change on server-side, this removes the need for a
thread when using Netty. For InProcess and OkHttp, it would allow us to let the
user to provide the scheduler for tests or application-wide thread sharing.
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 9355e2d..937edf7 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -22,7 +22,10 @@
 import io.grpc.internal.AbstractManagedChannelImplBuilder;
 import io.grpc.internal.ClientTransportFactory;
 import io.grpc.internal.ConnectionClientTransport;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SharedResourceHolder;
 import java.net.SocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Builder for a channel that issues in-process requests. Clients identify the in-process server by
@@ -87,6 +90,8 @@
   @Internal
   static final class InProcessClientTransportFactory implements ClientTransportFactory {
     private final String name;
+    private final ScheduledExecutorService timerService =
+        SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
 
     private boolean closed;
 
@@ -104,9 +109,17 @@
     }
 
     @Override
+    public ScheduledExecutorService getScheduledExecutorService() {
+      return timerService;
+    }
+
+    @Override
     public void close() {
+      if (closed) {
+        return;
+      }
       closed = true;
-      // Do nothing.
+      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
     }
   }
 }
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index a624ccd..a61de4e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -325,7 +325,6 @@
         buildTransportFactory(),
         // TODO(carl-mastrangelo): Allow clients to pass this in
         new ExponentialBackoffPolicy.Provider(),
-        SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
         SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
         GrpcUtil.STOPWATCH_SUPPLIER,
         getEffectiveInterceptors());
diff --git a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
index 559f5cc..ca7e83b 100644
--- a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
@@ -27,6 +27,7 @@
 import io.grpc.SecurityLevel;
 import java.net.SocketAddress;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.annotation.Nullable;
 
 final class CallCredentialsApplyingTransportFactory implements ClientTransportFactory {
@@ -47,6 +48,11 @@
   }
 
   @Override
+  public ScheduledExecutorService getScheduledExecutorService() {
+    return delegate.getScheduledExecutorService();
+  }
+
+  @Override
   public void close() {
     delegate.close();
   }
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index beee360..6568ffa 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -18,6 +18,7 @@
 
 import java.io.Closeable;
 import java.net.SocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.annotation.Nullable;
 
 /** Pre-configured factory for creating {@link ConnectionClientTransport} instances. */
@@ -32,6 +33,18 @@
       @Nullable String userAgent);
 
   /**
+   * Returns an executor for scheduling provided by the transport. The service should be configured
+   * to allow cancelled scheduled runnables to be GCed.
+   *
+   * <p>The executor should not be used after the factory has been closed. The caller should ensure
+   * any outstanding tasks are cancelled before the factory is closed. However, it is a
+   * <a href="https://github.com/grpc/grpc-java/issues/1981">known issue</a> that ClientCallImpl may
+   * use this executor after close, so implementations should not go out of their way to prevent
+   * usage.
+   */
+  ScheduledExecutorService getScheduledExecutorService();
+
+  /**
    * Releases any resources.
    *
    * <p>After this method has been called, it's no longer valid to call
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index f33dc97..bbfbdb0 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -54,7 +54,6 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -100,17 +99,10 @@
   private final DecompressorRegistry decompressorRegistry;
   private final CompressorRegistry compressorRegistry;
 
-  private final ObjectPool<ScheduledExecutorService> timerServicePool;
   private final Supplier<Stopwatch> stopwatchSupplier;
   /** The timout before entering idle mode. */
   private final long idleTimeoutMillis;
 
-  /**
-   * Executor that runs deadline timers for requests.
-   */
-  // Must be assigned from channelExecutor
-  private volatile ScheduledExecutorService scheduledExecutor;
-
   private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
 
   private final BackoffPolicy.Provider backoffPolicyProvider;
@@ -320,7 +312,7 @@
     }
     cancelIdleTimer();
     idleModeTimer = new IdleModeTimer();
-    idleModeTimerFuture = scheduledExecutor.schedule(
+    idleModeTimerFuture = transportFactory.getScheduledExecutorService().schedule(
         new LogExceptionRunnable(new Runnable() {
             @Override
             public void run() {
@@ -372,7 +364,6 @@
       AbstractManagedChannelImplBuilder<?> builder,
       ClientTransportFactory clientTransportFactory,
       BackoffPolicy.Provider backoffPolicyProvider,
-      ObjectPool<ScheduledExecutorService> timerServicePool,
       ObjectPool<? extends Executor> oobExecutorPool,
       Supplier<Stopwatch> stopwatchSupplier,
       List<ClientInterceptor> interceptors) {
@@ -391,8 +382,6 @@
     this.transportFactory =
         new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
     this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
-    this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool");
-    this.scheduledExecutor = checkNotNull(timerServicePool.getObject(), "timerService");
     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
     if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
       this.idleTimeoutMillis = builder.idleTimeoutMillis;
@@ -553,7 +542,7 @@
           executor,
           callOptions,
           transportProvider,
-          scheduledExecutor)
+          terminated ? null : transportFactory.getScheduledExecutorService())
               .setDecompressorRegistry(decompressorRegistry)
               .setCompressorRegistry(compressorRegistry);
     }
@@ -578,7 +567,6 @@
       terminated = true;
       terminatedLatch.countDown();
       executorPool.returnObject(executor);
-      scheduledExecutor = timerServicePool.returnObject(scheduledExecutor);
       // Release the transport factory so that it can deallocate any resources.
       transportFactory.close();
     }
@@ -623,14 +611,12 @@
         EquivalentAddressGroup addressGroup, Attributes attrs) {
       checkNotNull(addressGroup, "addressGroup");
       checkNotNull(attrs, "attrs");
-      ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
-      checkState(scheduledExecutorCopy != null,
-          "scheduledExecutor is already cleared. Looks like you are calling this method after "
-          + "you've already shut down");
+      // TODO(ejona): can we be even stricter? Like loadBalancer == null?
+      checkState(!terminated, "Channel is terminated");
       final SubchannelImpl subchannel = new SubchannelImpl(attrs);
       final InternalSubchannel internalSubchannel = new InternalSubchannel(
             addressGroup, authority(), userAgent, backoffPolicyProvider, transportFactory,
-            scheduledExecutorCopy, stopwatchSupplier, channelExecutor,
+            transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
             new InternalSubchannel.Callback() {
               // All callbacks are run in channelExecutor
               @Override
@@ -712,15 +698,14 @@
 
     @Override
     public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
-      ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
-      checkState(scheduledExecutorCopy != null,
-          "scheduledExecutor is already cleared. Looks like you are calling this method after "
-          + "you've already shut down");
+      // TODO(ejona): can we be even stricter? Like terminating?
+      checkState(!terminated, "Channel is terminated");
       final OobChannel oobChannel = new OobChannel(
-          authority, oobExecutorPool, scheduledExecutorCopy, channelExecutor);
+          authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
+          channelExecutor);
       final InternalSubchannel internalSubchannel = new InternalSubchannel(
           addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
-          scheduledExecutorCopy, stopwatchSupplier, channelExecutor,
+          transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
           // All callback methods are run from channelExecutor
           new InternalSubchannel.Callback() {
             @Override
@@ -895,7 +880,6 @@
         } else {
           shutdownRequested = true;
         }
-        ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
         // Add a delay to shutdown to deal with the race between 1) a transport being picked and
         // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
         // because of address change, or because LoadBalancer is shutdown by Channel entering idle
@@ -904,8 +888,8 @@
         //
         // TODO(zhangkun83): consider a better approach
         // (https://github.com/grpc/grpc-java/issues/2562).
-        if (!terminating && scheduledExecutorCopy != null) {
-          delayedShutdownTask = scheduledExecutorCopy.schedule(
+        if (!terminating) {
+          delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
               new LogExceptionRunnable(
                   new Runnable() {
                     @Override
@@ -916,13 +900,8 @@
           return;
         }
       }
-      // Two possible ways to get here:
-      //
-      // 1. terminating == true: no more real streams will be created, it's safe and also desirable
-      // to shutdown timely.
-      //
-      // 2. scheduledExecutor == null: possible only when Channel has already been terminated.
-      // Though may not be necessary, we'll do it anyway.
+      // When terminating == true, no more real streams will be created. It's safe and also
+      // desirable to shutdown timely.
       subchannel.shutdown();
     }
 
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
index c553c05..a340954 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
@@ -57,7 +57,6 @@
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
@@ -91,8 +90,6 @@
           .build();
 
   private final List<EquivalentAddressGroup> servers = Lists.newArrayList();
-  private final ObjectPool<ScheduledExecutorService> timerServicePool =
-      new FixedObjectPool<ScheduledExecutorService>(timer.getScheduledExecutorService());
   private final ObjectPool<Executor> executorPool =
       new FixedObjectPool<Executor>(executor.getScheduledExecutorService());
   private final ObjectPool<Executor> oobExecutorPool =
@@ -116,6 +113,8 @@
     when(mockNameResolverFactory
         .newNameResolver(any(URI.class), any(Attributes.class)))
         .thenReturn(mockNameResolver);
+    when(mockTransportFactory.getScheduledExecutorService())
+        .thenReturn(timer.getScheduledExecutorService());
 
     class Builder extends AbstractManagedChannelImplBuilder<Builder> {
       Builder(String target) {
@@ -139,7 +138,7 @@
     builder.executorPool = executorPool;
     channel = new ManagedChannelImpl(
         builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
-        timerServicePool, oobExecutorPool, timer.getStopwatchSupplier(),
+        oobExecutorPool, timer.getStopwatchSupplier(),
         Collections.<ClientInterceptor>emptyList());
     newTransports = TestUtils.captureTransports(mockTransportFactory);
 
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index a9e2365..59dc883 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -81,7 +81,6 @@
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -157,8 +156,6 @@
   @Mock
   private ClientCall.Listener<Integer> mockCallListener5;
   @Mock
-  private ObjectPool<ScheduledExecutorService> timerServicePool;
-  @Mock
   private ObjectPool<Executor> executorPool;
   @Mock
   private ObjectPool<Executor> oobExecutorPool;
@@ -203,7 +200,7 @@
     builder.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
     channel = new ManagedChannelImpl(
         builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
-        timerServicePool, oobExecutorPool, timer.getStopwatchSupplier(), interceptors);
+        oobExecutorPool, timer.getStopwatchSupplier(), interceptors);
 
     if (requestConnection) {
       // Force-exit the initial idle-mode
@@ -222,7 +219,8 @@
     expectedUri = new URI(target);
     when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
     transports = TestUtils.captureTransports(mockTransportFactory);
-    when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService());
+    when(mockTransportFactory.getScheduledExecutorService())
+        .thenReturn(timer.getScheduledExecutorService());
     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
     when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
   }
@@ -265,15 +263,12 @@
   public void shutdownWithNoTransportsEverCreated() {
     createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
     verify(executorPool).getObject();
-    verify(timerServicePool).getObject();
     verify(executorPool, never()).returnObject(anyObject());
-    verify(timerServicePool, never()).returnObject(anyObject());
     verifyNoMoreInteractions(mockTransportFactory);
     channel.shutdown();
     assertTrue(channel.isShutdown());
     assertTrue(channel.isTerminated());
     verify(executorPool).returnObject(executor.getScheduledExecutorService());
-    verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
   }
 
   @Test
@@ -296,7 +291,6 @@
     FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
     createChannel(nameResolverFactory, NO_INTERCEPTOR);
     verify(executorPool).getObject();
-    verify(timerServicePool).getObject();
     ClientStream mockStream = mock(ClientStream.class);
     ClientStream mockStream2 = mock(ClientStream.class);
     Metadata headers = new Metadata();
@@ -327,7 +321,8 @@
 
     // First RPC, will be pending
     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
-    verifyNoMoreInteractions(mockTransportFactory);
+    verify(mockTransportFactory).newClientTransport(
+        any(SocketAddress.class), any(String.class), any(String.class));
     call.start(mockCallListener, headers);
 
     verify(mockTransport, never())
@@ -405,15 +400,14 @@
     transportListener.transportShutdown(Status.UNAVAILABLE);
     assertFalse(channel.isTerminated());
     verify(executorPool, never()).returnObject(anyObject());
-    verify(timerServicePool, never()).returnObject(anyObject());
     transportListener.transportTerminated();
     assertTrue(channel.isTerminated());
     verify(executorPool).returnObject(executor.getScheduledExecutorService());
-    verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
     verifyNoMoreInteractions(oobExecutorPool);
 
+    verify(mockTransportFactory).newClientTransport(
+        any(SocketAddress.class), any(String.class), any(String.class));
     verify(mockTransportFactory).close();
-    verifyNoMoreInteractions(mockTransportFactory);
     verify(mockTransport, atLeast(0)).getLogId();
     verifyNoMoreInteractions(mockTransport);
   }
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 3683c62..2fc7a38 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -46,6 +46,7 @@
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.CheckReturnValue;
 import javax.annotation.Nullable;
@@ -498,6 +499,11 @@
     }
 
     @Override
+    public ScheduledExecutorService getScheduledExecutorService() {
+      return group;
+    }
+
+    @Override
     public void close() {
       if (closed) {
         return;
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index b60cca0..0f0ae2a 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -46,6 +46,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.net.ssl.HostnameVerifier;
@@ -382,6 +383,8 @@
     private final AtomicBackoff keepAliveTimeNanos;
     private final long keepAliveTimeoutNanos;
     private final boolean keepAliveWithoutCalls;
+    private final ScheduledExecutorService timeoutService =
+        SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
     private boolean closed;
 
     private OkHttpTransportFactory(Executor executor,
@@ -446,11 +449,17 @@
     }
 
     @Override
+    public ScheduledExecutorService getScheduledExecutorService() {
+      return timeoutService;
+    }
+
+    @Override
     public void close() {
       if (closed) {
         return;
       }
       closed = true;
+      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
 
       if (usingSharedExecutor) {
         SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor);