blob: 24f596f81131b1747f0d794063d8f25f92a4ffa1 [file] [log] [blame]
* This file is a modified version of
* which contained the following notice:
* Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the
* public domain, as explained at
* Rationale for copying:
* Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected
* customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of
* AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods.
import static;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
* Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute}
* method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods
* create {@link ListenableFutureTask} instances and pass them to {@link #execute}.
* <p>In addition to {@link #execute}, subclasses must implement all methods related to shutdown and
* termination.
* @author Doug Lea
abstract class AbstractListeningExecutorService implements ListeningExecutorService {
@Override public ListenableFuture<?> submit(Runnable task) {
ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null);
return ftask;
@Override public <T> ListenableFuture<T> submit(Runnable task, T result) {
ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result);
return ftask;
@Override public <T> ListenableFuture<T> submit(Callable<T> task) {
ListenableFutureTask<T> ftask = ListenableFutureTask.create(task);
return ftask;
* the main mechanics of invokeAny.
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
int ntasks = tasks.size();
checkArgument(ntasks > 0);
List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
} else if (active == 0) {
} else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null) {
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
} else {
f = ecs.take();
if (f != null) {
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
if (ee == null) {
ee = new ExecutionException(null);
throw ee;
} finally {
for (Future<T> f : futures) {
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
throw new AssertionError();
@Override public <T> T invokeAny(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null) {
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
ListenableFutureTask<T> f = ListenableFutureTask.create(t);
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
done = true;
return futures;
} finally {
if (!done) {
for (Future<T> f : futures) {
@Override public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null) {
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
long lastTime = System.nanoTime();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
Iterator<Future<T>> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable) (;
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
return futures;
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0) {
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
done = true;
return futures;
} finally {
if (!done) {
for (Future<T> f : futures) {