Require a ScheduledExecutorService to be provided to Context
Before the service would be leaked, because when the scheduled future
was cancelled the scheduler wouldn't be released. Also, Future isn't
powerful enough to signal when we should release when cancelling.
Given the nature of Context, it also seems beneficial to not have it own
threads. Since any caller of withDeadline*() is required to cancel the
Context, it shouldn't be too burdensome for them to manage the lifecycle
of the scheduler.
diff --git a/core/src/main/java/io/grpc/Context.java b/core/src/main/java/io/grpc/Context.java
index 1a770fa..8f2a530 100644
--- a/core/src/main/java/io/grpc/Context.java
+++ b/core/src/main/java/io/grpc/Context.java
@@ -33,16 +33,12 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import io.grpc.internal.SharedResourceHolder;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -114,33 +110,6 @@
private static final Logger LOG = Logger.getLogger(Context.class.getName());
- /**
- * Use a shared resource to retain the {@link ScheduledExecutorService} used to
- * implement deadline based context cancellation. This allows the executor to be
- * shutdown if its not in use thereby allowing Context to be unloaded.
- */
- static final SharedResourceHolder.Resource<ScheduledExecutorService> SCHEDULER =
- new SharedResourceHolder.Resource<ScheduledExecutorService>() {
- private static final String name = "context-scheduler";
- @Override
- public ScheduledExecutorService create() {
- return Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
- .setNameFormat(name + "-%d")
- .setDaemon(true)
- .build());
- }
-
- @Override
- public void close(ScheduledExecutorService instance) {
- instance.shutdown();
- }
-
- @Override
- public String toString() {
- return name;
- }
- };
-
private static final Object[][] EMPTY_ENTRIES = new Object[0][2];
/**
@@ -268,8 +237,9 @@
* a received existing deadline. When establishing a new deadline, {@link #withDeadlineAfter}
* is the better mechanism.
*/
- public CancellableContext withDeadlineNanoTime(long deadlineNanoTime) {
- return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS);
+ public CancellableContext withDeadlineNanoTime(long deadlineNanoTime,
+ ScheduledExecutorService scheduler) {
+ return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS, scheduler);
}
/**
@@ -280,7 +250,7 @@
* <p>Sample usage:
* <pre>
* Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5,
- * TimeUnit.SECONDS);
+ * TimeUnit.SECONDS, scheduler);
* executorService.execute(withDeadline.wrap(new Runnable() {
* public void run() {
* Context current = Context.current();
@@ -291,10 +261,12 @@
* });
* </pre>
*/
- public CancellableContext withDeadlineAfter(long duration, TimeUnit unit) {
+ public CancellableContext withDeadlineAfter(long duration, TimeUnit unit,
+ ScheduledExecutorService scheduler) {
Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0");
Preconditions.checkNotNull(unit, "unit");
- return new CancellableContext(this, unit.toNanos(duration));
+ Preconditions.checkNotNull(scheduler, "scheduler");
+ return new CancellableContext(this, unit.toNanos(duration), scheduler);
}
/**
@@ -611,17 +583,13 @@
/**
* Create a cancellable context that has a deadline.
*/
- private CancellableContext(Context parent, long delayNanos) {
+ private CancellableContext(Context parent, long delayNanos,
+ ScheduledExecutorService scheduler) {
this(parent);
- final ScheduledExecutorService scheduler = SharedResourceHolder.get(SCHEDULER);
scheduledFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
- try {
- cancel(new TimeoutException("context timed out"));
- } finally {
- SharedResourceHolder.release(SCHEDULER, scheduler);
- }
+ cancel(new TimeoutException("context timed out"));
}
}, delayNanos, TimeUnit.NANOSECONDS);
}
diff --git a/core/src/test/java/io/grpc/ContextTest.java b/core/src/test/java/io/grpc/ContextTest.java
index 19853bd..77e52eb 100644
--- a/core/src/test/java/io/grpc/ContextTest.java
+++ b/core/src/test/java/io/grpc/ContextTest.java
@@ -43,8 +43,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.internal.SharedResourceHolder;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -57,9 +55,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,6 +95,7 @@
observed = Context.current();
}
};
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
@Before
public void setUp() throws Exception {
@@ -105,6 +104,7 @@
@After
public void tearDown() throws Exception {
+ scheduler.shutdown();
}
@Test
@@ -611,7 +611,7 @@
@Test
public void absoluteDeadlineTriggersAndPropagates() throws Exception {
Context base = Context.current().withDeadlineNanoTime(System.nanoTime()
- + TimeUnit.SECONDS.toNanos(1));
+ + TimeUnit.SECONDS.toNanos(1), scheduler);
Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
@@ -626,12 +626,12 @@
@Test(expected = IllegalArgumentException.class)
public void negativeDeadlineFails() {
- Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS);
+ Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS, scheduler);
}
@Test
public void relativeDeadlineTriggersAndPropagates() throws Exception {
- Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS);
+ Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS, scheduler);
Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
@@ -646,8 +646,8 @@
@Test
public void innerDeadlineCompletesBeforeOuter() throws Exception {
- Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS);
- Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS);
+ Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS, scheduler);
+ Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler);
child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled());
assertFalse(child.isCancelled());
@@ -667,48 +667,21 @@
@Test
public void cancellationCancelsScheduledTask() {
- ScheduledExecutorService service = SharedResourceHolder.get(Context.SCHEDULER);
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
try {
- ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
- executor.purge();
assertEquals(0, executor.getQueue().size());
- Context.CancellableContext base = Context.current().withDeadlineAfter(1, TimeUnit.DAYS);
+ Context.CancellableContext base
+ = Context.current().withDeadlineAfter(1, TimeUnit.DAYS, executor);
assertEquals(1, executor.getQueue().size());
base.cancel(null);
executor.purge();
assertEquals(0, executor.getQueue().size());
} finally {
- SharedResourceHolder.release(Context.SCHEDULER, service);
+ executor.shutdown();
}
}
@Test
- public void testScheduler() throws Exception {
- assertEquals("context-scheduler", Context.SCHEDULER.toString());
- ScheduledExecutorService service = Context.SCHEDULER.create();
- try {
- assertFalse(service.isShutdown());
- final AtomicReference<String> threadName = new AtomicReference<String>();
- final AtomicReference<Boolean> threadIsDaemon = new AtomicReference<Boolean>();
- Future<?> ran = service.submit(new Runnable() {
- @Override
- public void run() {
- threadName.set(Thread.currentThread().getName());
- threadIsDaemon.set(Thread.currentThread().isDaemon());
- }
- });
- ran.get();
- assertNotNull(threadName.get());
- assertTrue(threadName.get(), threadName.get().startsWith(Context.SCHEDULER.toString()));
- assertNotNull(threadIsDaemon.get());
- assertTrue(threadIsDaemon.get());
- } finally {
- Context.SCHEDULER.close(service);
- }
- assertTrue(service.isShutdown());
- }
-
- @Test
public void testKeyEqualsHashCode() {
assertTrue(PET.equals(PET));
assertFalse(PET.equals(Context.key("pet")));