core: Channel uses transport's ScheduledExecutorService
Coupled with the similar change on server-side, this removes the need for a
thread when using Netty. For InProcess and OkHttp, it would allow us to let the
user to provide the scheduler for tests or application-wide thread sharing.
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 9355e2d..937edf7 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -22,7 +22,10 @@
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SharedResourceHolder;
import java.net.SocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
/**
* Builder for a channel that issues in-process requests. Clients identify the in-process server by
@@ -87,6 +90,8 @@
@Internal
static final class InProcessClientTransportFactory implements ClientTransportFactory {
private final String name;
+ private final ScheduledExecutorService timerService =
+ SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private boolean closed;
@@ -104,9 +109,17 @@
}
@Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return timerService;
+ }
+
+ @Override
public void close() {
+ if (closed) {
+ return;
+ }
closed = true;
- // Do nothing.
+ SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timerService);
}
}
}
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index a624ccd..a61de4e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -325,7 +325,6 @@
buildTransportFactory(),
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
- SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors());
diff --git a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
index 559f5cc..ca7e83b 100644
--- a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
@@ -27,6 +27,7 @@
import io.grpc.SecurityLevel;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
final class CallCredentialsApplyingTransportFactory implements ClientTransportFactory {
@@ -47,6 +48,11 @@
}
@Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return delegate.getScheduledExecutorService();
+ }
+
+ @Override
public void close() {
delegate.close();
}
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index beee360..6568ffa 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -18,6 +18,7 @@
import java.io.Closeable;
import java.net.SocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/** Pre-configured factory for creating {@link ConnectionClientTransport} instances. */
@@ -32,6 +33,18 @@
@Nullable String userAgent);
/**
+ * Returns an executor for scheduling provided by the transport. The service should be configured
+ * to allow cancelled scheduled runnables to be GCed.
+ *
+ * <p>The executor should not be used after the factory has been closed. The caller should ensure
+ * any outstanding tasks are cancelled before the factory is closed. However, it is a
+ * <a href="https://github.com/grpc/grpc-java/issues/1981">known issue</a> that ClientCallImpl may
+ * use this executor after close, so implementations should not go out of their way to prevent
+ * usage.
+ */
+ ScheduledExecutorService getScheduledExecutorService();
+
+ /**
* Releases any resources.
*
* <p>After this method has been called, it's no longer valid to call
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index f33dc97..bbfbdb0 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -54,7 +54,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -100,17 +99,10 @@
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
- private final ObjectPool<ScheduledExecutorService> timerServicePool;
private final Supplier<Stopwatch> stopwatchSupplier;
/** The timout before entering idle mode. */
private final long idleTimeoutMillis;
- /**
- * Executor that runs deadline timers for requests.
- */
- // Must be assigned from channelExecutor
- private volatile ScheduledExecutorService scheduledExecutor;
-
private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
private final BackoffPolicy.Provider backoffPolicyProvider;
@@ -320,7 +312,7 @@
}
cancelIdleTimer();
idleModeTimer = new IdleModeTimer();
- idleModeTimerFuture = scheduledExecutor.schedule(
+ idleModeTimerFuture = transportFactory.getScheduledExecutorService().schedule(
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
@@ -372,7 +364,6 @@
AbstractManagedChannelImplBuilder<?> builder,
ClientTransportFactory clientTransportFactory,
BackoffPolicy.Provider backoffPolicyProvider,
- ObjectPool<ScheduledExecutorService> timerServicePool,
ObjectPool<? extends Executor> oobExecutorPool,
Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors) {
@@ -391,8 +382,6 @@
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
- this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool");
- this.scheduledExecutor = checkNotNull(timerServicePool.getObject(), "timerService");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
@@ -553,7 +542,7 @@
executor,
callOptions,
transportProvider,
- scheduledExecutor)
+ terminated ? null : transportFactory.getScheduledExecutorService())
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry);
}
@@ -578,7 +567,6 @@
terminated = true;
terminatedLatch.countDown();
executorPool.returnObject(executor);
- scheduledExecutor = timerServicePool.returnObject(scheduledExecutor);
// Release the transport factory so that it can deallocate any resources.
transportFactory.close();
}
@@ -623,14 +611,12 @@
EquivalentAddressGroup addressGroup, Attributes attrs) {
checkNotNull(addressGroup, "addressGroup");
checkNotNull(attrs, "attrs");
- ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
- checkState(scheduledExecutorCopy != null,
- "scheduledExecutor is already cleared. Looks like you are calling this method after "
- + "you've already shut down");
+ // TODO(ejona): can we be even stricter? Like loadBalancer == null?
+ checkState(!terminated, "Channel is terminated");
final SubchannelImpl subchannel = new SubchannelImpl(attrs);
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority(), userAgent, backoffPolicyProvider, transportFactory,
- scheduledExecutorCopy, stopwatchSupplier, channelExecutor,
+ transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
new InternalSubchannel.Callback() {
// All callbacks are run in channelExecutor
@Override
@@ -712,15 +698,14 @@
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
- ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
- checkState(scheduledExecutorCopy != null,
- "scheduledExecutor is already cleared. Looks like you are calling this method after "
- + "you've already shut down");
+ // TODO(ejona): can we be even stricter? Like terminating?
+ checkState(!terminated, "Channel is terminated");
final OobChannel oobChannel = new OobChannel(
- authority, oobExecutorPool, scheduledExecutorCopy, channelExecutor);
+ authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
+ channelExecutor);
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
- scheduledExecutorCopy, stopwatchSupplier, channelExecutor,
+ transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
// All callback methods are run from channelExecutor
new InternalSubchannel.Callback() {
@Override
@@ -895,7 +880,6 @@
} else {
shutdownRequested = true;
}
- ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor;
// Add a delay to shutdown to deal with the race between 1) a transport being picked and
// newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
// because of address change, or because LoadBalancer is shutdown by Channel entering idle
@@ -904,8 +888,8 @@
//
// TODO(zhangkun83): consider a better approach
// (https://github.com/grpc/grpc-java/issues/2562).
- if (!terminating && scheduledExecutorCopy != null) {
- delayedShutdownTask = scheduledExecutorCopy.schedule(
+ if (!terminating) {
+ delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
new LogExceptionRunnable(
new Runnable() {
@Override
@@ -916,13 +900,8 @@
return;
}
}
- // Two possible ways to get here:
- //
- // 1. terminating == true: no more real streams will be created, it's safe and also desirable
- // to shutdown timely.
- //
- // 2. scheduledExecutor == null: possible only when Channel has already been terminated.
- // Though may not be necessary, we'll do it anyway.
+ // When terminating == true, no more real streams will be created. It's safe and also
+ // desirable to shutdown timely.
subchannel.shutdown();
}
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
index c553c05..a340954 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
@@ -57,7 +57,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@@ -91,8 +90,6 @@
.build();
private final List<EquivalentAddressGroup> servers = Lists.newArrayList();
- private final ObjectPool<ScheduledExecutorService> timerServicePool =
- new FixedObjectPool<ScheduledExecutorService>(timer.getScheduledExecutorService());
private final ObjectPool<Executor> executorPool =
new FixedObjectPool<Executor>(executor.getScheduledExecutorService());
private final ObjectPool<Executor> oobExecutorPool =
@@ -116,6 +113,8 @@
when(mockNameResolverFactory
.newNameResolver(any(URI.class), any(Attributes.class)))
.thenReturn(mockNameResolver);
+ when(mockTransportFactory.getScheduledExecutorService())
+ .thenReturn(timer.getScheduledExecutorService());
class Builder extends AbstractManagedChannelImplBuilder<Builder> {
Builder(String target) {
@@ -139,7 +138,7 @@
builder.executorPool = executorPool;
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
- timerServicePool, oobExecutorPool, timer.getStopwatchSupplier(),
+ oobExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList());
newTransports = TestUtils.captureTransports(mockTransportFactory);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index a9e2365..59dc883 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -81,7 +81,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -157,8 +156,6 @@
@Mock
private ClientCall.Listener<Integer> mockCallListener5;
@Mock
- private ObjectPool<ScheduledExecutorService> timerServicePool;
- @Mock
private ObjectPool<Executor> executorPool;
@Mock
private ObjectPool<Executor> oobExecutorPool;
@@ -203,7 +200,7 @@
builder.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
- timerServicePool, oobExecutorPool, timer.getStopwatchSupplier(), interceptors);
+ oobExecutorPool, timer.getStopwatchSupplier(), interceptors);
if (requestConnection) {
// Force-exit the initial idle-mode
@@ -222,7 +219,8 @@
expectedUri = new URI(target);
when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
transports = TestUtils.captureTransports(mockTransportFactory);
- when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService());
+ when(mockTransportFactory.getScheduledExecutorService())
+ .thenReturn(timer.getScheduledExecutorService());
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
}
@@ -265,15 +263,12 @@
public void shutdownWithNoTransportsEverCreated() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
verify(executorPool).getObject();
- verify(timerServicePool).getObject();
verify(executorPool, never()).returnObject(anyObject());
- verify(timerServicePool, never()).returnObject(anyObject());
verifyNoMoreInteractions(mockTransportFactory);
channel.shutdown();
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
verify(executorPool).returnObject(executor.getScheduledExecutorService());
- verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
}
@Test
@@ -296,7 +291,6 @@
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
verify(executorPool).getObject();
- verify(timerServicePool).getObject();
ClientStream mockStream = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
Metadata headers = new Metadata();
@@ -327,7 +321,8 @@
// First RPC, will be pending
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
- verifyNoMoreInteractions(mockTransportFactory);
+ verify(mockTransportFactory).newClientTransport(
+ any(SocketAddress.class), any(String.class), any(String.class));
call.start(mockCallListener, headers);
verify(mockTransport, never())
@@ -405,15 +400,14 @@
transportListener.transportShutdown(Status.UNAVAILABLE);
assertFalse(channel.isTerminated());
verify(executorPool, never()).returnObject(anyObject());
- verify(timerServicePool, never()).returnObject(anyObject());
transportListener.transportTerminated();
assertTrue(channel.isTerminated());
verify(executorPool).returnObject(executor.getScheduledExecutorService());
- verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
verifyNoMoreInteractions(oobExecutorPool);
+ verify(mockTransportFactory).newClientTransport(
+ any(SocketAddress.class), any(String.class), any(String.class));
verify(mockTransportFactory).close();
- verifyNoMoreInteractions(mockTransportFactory);
verify(mockTransport, atLeast(0)).getLogId();
verifyNoMoreInteractions(mockTransport);
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 3683c62..2fc7a38 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -46,6 +46,7 @@
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
@@ -498,6 +499,11 @@
}
@Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return group;
+ }
+
+ @Override
public void close() {
if (closed) {
return;
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index b60cca0..0f0ae2a 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -46,6 +46,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
@@ -382,6 +383,8 @@
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
+ private final ScheduledExecutorService timeoutService =
+ SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private boolean closed;
private OkHttpTransportFactory(Executor executor,
@@ -446,11 +449,17 @@
}
@Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return timeoutService;
+ }
+
+ @Override
public void close() {
if (closed) {
return;
}
closed = true;
+ SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor);