Merge
diff --git a/src/share/classes/java/util/concurrent/CompletableFuture.java b/src/share/classes/java/util/concurrent/CompletableFuture.java
index 7926f2e..5e2fb13 100644
--- a/src/share/classes/java/util/concurrent/CompletableFuture.java
+++ b/src/share/classes/java/util/concurrent/CompletableFuture.java
@@ -48,13 +48,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
/**
* A {@link Future} that may be explicitly completed (setting its
- * value and status), and may include dependent functions and actions
- * that trigger upon its completion.
+ * value and status), and may be used as a {@link CompletionStage},
+ * supporting dependent functions and actions that trigger upon its
+ * completion.
*
* <p>When two or more threads attempt to
* {@link #complete complete},
@@ -62,64 +65,50 @@
* {@link #cancel cancel}
* a CompletableFuture, only one of them succeeds.
*
- * <p>Methods are available for adding dependents based on
- * user-provided Functions, Consumers, or Runnables. The appropriate
- * form to use depends on whether actions require arguments and/or
- * produce results. Completion of a dependent action will trigger the
- * completion of another CompletableFuture. Actions may also be
- * triggered after either or both the current and another
- * CompletableFuture complete. Multiple CompletableFutures may also
- * be grouped as one using {@link #anyOf(CompletableFuture...)} and
- * {@link #allOf(CompletableFuture...)}.
+ * <p>In addition to these and related methods for directly
+ * manipulating status and results, CompletableFuture implements
+ * interface {@link CompletionStage} with the following policies: <ul>
*
- * <p>CompletableFutures themselves do not execute asynchronously.
- * However, actions supplied for dependent completions of another
- * CompletableFuture may do so, depending on whether they are provided
- * via one of the <em>async</em> methods (that is, methods with names
- * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
- * methods provide a way to commence asynchronous processing of an
- * action using either a given {@link Executor} or by default the
- * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
+ * <li>Actions supplied for dependent completions of
+ * <em>non-async</em> methods may be performed by the thread that
+ * completes the current CompletableFuture, or by any other caller of
+ * a completion method.</li>
+ *
+ * <li>All <em>async</em> methods without an explicit Executor
+ * argument are performed using the {@link ForkJoinPool#commonPool()}
+ * (unless it does not support a parallelism level of at least two, in
+ * which case, a new Thread is used). To simplify monitoring,
* debugging, and tracking, all generated asynchronous tasks are
- * instances of the marker interface {@link AsynchronousCompletionTask}.
+ * instances of the marker interface {@link
+ * AsynchronousCompletionTask}. </li>
*
- * <p>Actions supplied for dependent completions of <em>non-async</em>
- * methods may be performed by the thread that completes the current
- * CompletableFuture, or by any other caller of these methods. There
- * are no guarantees about the order of processing completions unless
- * constrained by these methods.
+ * <li>All CompletionStage methods are implemented independently of
+ * other public methods, so the behavior of one method is not impacted
+ * by overrides of others in subclasses. </li> </ul>
*
- * <p>Since (unlike {@link FutureTask}) this class has no direct
+ * <p>CompletableFuture also implements {@link Future} with the following
+ * policies: <ul>
+ *
+ * <li>Since (unlike {@link FutureTask}) this class has no direct
* control over the computation that causes it to be completed,
- * cancellation is treated as just another form of exceptional completion.
- * Method {@link #cancel cancel} has the same effect as
- * {@code completeExceptionally(new CancellationException())}.
+ * cancellation is treated as just another form of exceptional
+ * completion. Method {@link #cancel cancel} has the same effect as
+ * {@code completeExceptionally(new CancellationException())}. Method
+ * {@link #isCompletedExceptionally} can be used to determine if a
+ * CompletableFuture completed in any exceptional fashion.</li>
*
- * <p>Upon exceptional completion (including cancellation), or when a
- * completion entails an additional computation which terminates
- * abruptly with an (unchecked) exception or error, then all of their
- * dependent completions (and their dependents in turn) generally act
- * as {@code completeExceptionally} with a {@link CompletionException}
- * holding that exception as its cause. However, the {@link
- * #exceptionally exceptionally} and {@link #handle handle}
- * completions <em>are</em> able to handle exceptional completions of
- * the CompletableFutures they depend on.
- *
- * <p>In case of exceptional completion with a CompletionException,
+ * <li>In case of exceptional completion with a CompletionException,
* methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
* {@link ExecutionException} with the same cause as held in the
- * corresponding CompletionException. However, in these cases,
- * methods {@link #join()} and {@link #getNow} throw the
- * CompletionException, which simplifies usage.
- *
- * <p>Arguments used to pass a completion result (that is, for parameters
- * of type {@code T}) may be null, but passing a null value for any other
- * parameter will result in a {@link NullPointerException} being thrown.
+ * corresponding CompletionException. To simplify usage in most
+ * contexts, this class also defines methods {@link #join()} and
+ * {@link #getNow} that instead throw the CompletionException directly
+ * in these cases.</li> </ul>
*
* @author Doug Lea
* @since 1.8
*/
-public class CompletableFuture<T> implements Future<T> {
+public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
/*
* Overview:
@@ -438,6 +427,19 @@
public final void run() { exec(); }
}
+ /**
+ * Starts the given async task using the given executor, unless
+ * the executor is ForkJoinPool.commonPool and it has been
+ * disabled, in which case starts a new thread.
+ */
+ static void execAsync(Executor e, Async r) {
+ if (e == ForkJoinPool.commonPool() &&
+ ForkJoinPool.getCommonPoolParallelism() <= 1)
+ new Thread(r).start();
+ else
+ e.execute(r);
+ }
+
static final class AsyncRun extends Async {
final Runnable fn;
final CompletableFuture<Void> dst;
@@ -538,13 +540,13 @@
static final class AsyncAccept<T> extends Async {
final T arg;
final Consumer<? super T> fn;
- final CompletableFuture<Void> dst;
+ final CompletableFuture<?> dst;
AsyncAccept(T arg, Consumer<? super T> fn,
- CompletableFuture<Void> dst) {
+ CompletableFuture<?> dst) {
this.arg = arg; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
- CompletableFuture<Void> d; Throwable ex;
+ CompletableFuture<?> d; Throwable ex;
if ((d = this.dst) != null && d.result == null) {
try {
fn.accept(arg);
@@ -563,14 +565,14 @@
final T arg1;
final U arg2;
final BiConsumer<? super T,? super U> fn;
- final CompletableFuture<Void> dst;
+ final CompletableFuture<?> dst;
AsyncAcceptBoth(T arg1, U arg2,
BiConsumer<? super T,? super U> fn,
- CompletableFuture<Void> dst) {
+ CompletableFuture<?> dst) {
this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
- CompletableFuture<Void> d; Throwable ex;
+ CompletableFuture<?> d; Throwable ex;
if ((d = this.dst) != null && d.result == null) {
try {
fn.accept(arg1, arg2);
@@ -587,10 +589,10 @@
static final class AsyncCompose<T,U> extends Async {
final T arg;
- final Function<? super T, CompletableFuture<U>> fn;
+ final Function<? super T, ? extends CompletionStage<U>> fn;
final CompletableFuture<U> dst;
AsyncCompose(T arg,
- Function<? super T, CompletableFuture<U>> fn,
+ Function<? super T, ? extends CompletionStage<U>> fn,
CompletableFuture<U> dst) {
this.arg = arg; this.fn = fn; this.dst = dst;
}
@@ -598,7 +600,8 @@
CompletableFuture<U> d, fr; U u; Throwable ex;
if ((d = this.dst) != null && d.result == null) {
try {
- fr = fn.apply(arg);
+ CompletionStage<U> cs = fn.apply(arg);
+ fr = (cs == null) ? null : cs.toCompletableFuture();
ex = (fr == null) ? new NullPointerException() : null;
} catch (Throwable rex) {
ex = rex;
@@ -626,6 +629,33 @@
private static final long serialVersionUID = 5232453952276885070L;
}
+ static final class AsyncWhenComplete<T> extends Async {
+ final T arg1;
+ final Throwable arg2;
+ final BiConsumer<? super T,? super Throwable> fn;
+ final CompletableFuture<T> dst;
+ AsyncWhenComplete(T arg1, Throwable arg2,
+ BiConsumer<? super T,? super Throwable> fn,
+ CompletableFuture<T> dst) {
+ this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
+ }
+ public final boolean exec() {
+ CompletableFuture<T> d;
+ if ((d = this.dst) != null && d.result == null) {
+ Throwable ex = arg2;
+ try {
+ fn.accept(arg1, ex);
+ } catch (Throwable rex) {
+ if (ex == null)
+ ex = rex;
+ }
+ d.internalComplete(arg1, ex);
+ }
+ return true;
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
/* ------------- Completions -------------- */
/**
@@ -680,7 +710,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncApply<T,U>(t, fn, dst));
+ execAsync(e, new AsyncApply<T,U>(t, fn, dst));
else
u = fn.apply(t);
} catch (Throwable rex) {
@@ -697,11 +727,11 @@
static final class ThenAccept<T> extends Completion {
final CompletableFuture<? extends T> src;
final Consumer<? super T> fn;
- final CompletableFuture<Void> dst;
+ final CompletableFuture<?> dst;
final Executor executor;
ThenAccept(CompletableFuture<? extends T> src,
Consumer<? super T> fn,
- CompletableFuture<Void> dst,
+ CompletableFuture<?> dst,
Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
this.executor = executor;
@@ -709,7 +739,7 @@
public final void run() {
final CompletableFuture<? extends T> a;
final Consumer<? super T> fn;
- final CompletableFuture<Void> dst;
+ final CompletableFuture<?> dst;
Object r; T t; Throwable ex;
if ((dst = this.dst) != null &&
(fn = this.fn) != null &&
@@ -729,7 +759,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncAccept<T>(t, fn, dst));
+ execAsync(e, new AsyncAccept<T>(t, fn, dst));
else
fn.accept(t);
} catch (Throwable rex) {
@@ -773,7 +803,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncRun(fn, dst));
+ execAsync(e, new AsyncRun(fn, dst));
else
fn.run();
} catch (Throwable rex) {
@@ -839,7 +869,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
+ execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
else
v = fn.apply(t, u);
} catch (Throwable rex) {
@@ -904,7 +934,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
+ execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
else
fn.accept(t, u);
} catch (Throwable rex) {
@@ -956,7 +986,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncRun(fn, dst));
+ execAsync(e, new AsyncRun(fn, dst));
else
fn.run();
} catch (Throwable rex) {
@@ -1042,7 +1072,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncApply<T,U>(t, fn, dst));
+ execAsync(e, new AsyncApply<T,U>(t, fn, dst));
else
u = fn.apply(t);
} catch (Throwable rex) {
@@ -1095,7 +1125,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncAccept<T>(t, fn, dst));
+ execAsync(e, new AsyncAccept<T>(t, fn, dst));
else
fn.accept(t);
} catch (Throwable rex) {
@@ -1143,7 +1173,7 @@
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncRun(fn, dst));
+ execAsync(e, new AsyncRun(fn, dst));
else
fn.run();
} catch (Throwable rex) {
@@ -1226,6 +1256,54 @@
private static final long serialVersionUID = 5232453952276885070L;
}
+ static final class WhenCompleteCompletion<T> extends Completion {
+ final CompletableFuture<? extends T> src;
+ final BiConsumer<? super T, ? super Throwable> fn;
+ final CompletableFuture<T> dst;
+ final Executor executor;
+ WhenCompleteCompletion(CompletableFuture<? extends T> src,
+ BiConsumer<? super T, ? super Throwable> fn,
+ CompletableFuture<T> dst,
+ Executor executor) {
+ this.src = src; this.fn = fn; this.dst = dst;
+ this.executor = executor;
+ }
+ public final void run() {
+ final CompletableFuture<? extends T> a;
+ final BiConsumer<? super T, ? super Throwable> fn;
+ final CompletableFuture<T> dst;
+ Object r; T t; Throwable ex;
+ if ((dst = this.dst) != null &&
+ (fn = this.fn) != null &&
+ (a = this.src) != null &&
+ (r = a.result) != null &&
+ compareAndSet(0, 1)) {
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ Executor e = executor;
+ Throwable dx = null;
+ try {
+ if (e != null)
+ execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
+ else
+ fn.accept(t, ex);
+ } catch (Throwable rex) {
+ dx = rex;
+ }
+ if (e == null || dx != null)
+ dst.internalComplete(t, ex != null ? ex : dx);
+ }
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
static final class ThenCopy<T> extends Completion {
final CompletableFuture<?> src;
final CompletableFuture<T> dst;
@@ -1286,10 +1364,13 @@
final CompletableFuture<? extends T> src;
final BiFunction<? super T, Throwable, ? extends U> fn;
final CompletableFuture<U> dst;
+ final Executor executor;
HandleCompletion(CompletableFuture<? extends T> src,
BiFunction<? super T, Throwable, ? extends U> fn,
- CompletableFuture<U> dst) {
+ CompletableFuture<U> dst,
+ Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
+ this.executor = executor;
}
public final void run() {
final CompletableFuture<? extends T> a;
@@ -1310,13 +1391,19 @@
@SuppressWarnings("unchecked") T tr = (T) r;
t = tr;
}
- U u = null; Throwable dx = null;
+ Executor e = executor;
+ U u = null;
+ Throwable dx = null;
try {
- u = fn.apply(t, ex);
+ if (e != null)
+ execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
+ else
+ u = fn.apply(t, ex);
} catch (Throwable rex) {
dx = rex;
}
- dst.internalComplete(u, dx);
+ if (e == null || dx != null)
+ dst.internalComplete(u, dx);
}
}
private static final long serialVersionUID = 5232453952276885070L;
@@ -1324,11 +1411,11 @@
static final class ThenCompose<T,U> extends Completion {
final CompletableFuture<? extends T> src;
- final Function<? super T, CompletableFuture<U>> fn;
+ final Function<? super T, ? extends CompletionStage<U>> fn;
final CompletableFuture<U> dst;
final Executor executor;
ThenCompose(CompletableFuture<? extends T> src,
- Function<? super T, CompletableFuture<U>> fn,
+ Function<? super T, ? extends CompletionStage<U>> fn,
CompletableFuture<U> dst,
Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
@@ -1336,7 +1423,7 @@
}
public final void run() {
final CompletableFuture<? extends T> a;
- final Function<? super T, CompletableFuture<U>> fn;
+ final Function<? super T, ? extends CompletionStage<U>> fn;
final CompletableFuture<U> dst;
Object r; T t; Throwable ex; Executor e;
if ((dst = this.dst) != null &&
@@ -1358,10 +1445,12 @@
boolean complete = false;
if (ex == null) {
if ((e = executor) != null)
- e.execute(new AsyncCompose<T,U>(t, fn, dst));
+ execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
else {
try {
- if ((c = fn.apply(t)) == null)
+ CompletionStage<U> cs = fn.apply(t);
+ c = (cs == null) ? null : cs.toCompletableFuture();
+ if (c == null)
ex = new NullPointerException();
} catch (Throwable rex) {
ex = rex;
@@ -1401,6 +1490,619 @@
private static final long serialVersionUID = 5232453952276885070L;
}
+ // Implementations of stage methods with (plain, async, Executor) forms
+
+ private <U> CompletableFuture<U> doThenApply
+ (Function<? super T,? extends U> fn,
+ Executor e) {
+ if (fn == null) throw new NullPointerException();
+ CompletableFuture<U> dst = new CompletableFuture<U>();
+ ThenApply<T,U> d = null;
+ Object r;
+ if ((r = result) == null) {
+ CompletionNode p = new CompletionNode
+ (d = new ThenApply<T,U>(this, fn, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ U u = null;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncApply<T,U>(t, fn, dst));
+ else
+ u = fn.apply(t);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(u, ex);
+ }
+ helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
+ Executor e) {
+ if (fn == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ ThenAccept<T> d = null;
+ Object r;
+ if ((r = result) == null) {
+ CompletionNode p = new CompletionNode
+ (d = new ThenAccept<T>(this, fn, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncAccept<T>(t, fn, dst));
+ else
+ fn.accept(t);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<Void> doThenRun(Runnable action,
+ Executor e) {
+ if (action == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ ThenRun d = null;
+ Object r;
+ if ((r = result) == null) {
+ CompletionNode p = new CompletionNode
+ (d = new ThenRun(this, action, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ Throwable ex;
+ if (r instanceof AltResult)
+ ex = ((AltResult)r).ex;
+ else
+ ex = null;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncRun(action, dst));
+ else
+ action.run();
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ return dst;
+ }
+
+ private <U,V> CompletableFuture<V> doThenCombine
+ (CompletableFuture<? extends U> other,
+ BiFunction<? super T,? super U,? extends V> fn,
+ Executor e) {
+ if (other == null || fn == null) throw new NullPointerException();
+ CompletableFuture<V> dst = new CompletableFuture<V>();
+ ThenCombine<T,U,V> d = null;
+ Object r, s = null;
+ if ((r = result) == null || (s = other.result) == null) {
+ d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r == null && (r = result) == null) ||
+ (s == null && (s = other.result) == null)) {
+ if (q != null) {
+ if (s != null ||
+ UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (r != null ||
+ UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p)) {
+ if (s != null)
+ break;
+ q = new CompletionNode(d);
+ }
+ }
+ }
+ if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; U u; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ if (ex != null)
+ u = null;
+ else if (s instanceof AltResult) {
+ ex = ((AltResult)s).ex;
+ u = null;
+ }
+ else {
+ @SuppressWarnings("unchecked") U us = (U) s;
+ u = us;
+ }
+ V v = null;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
+ else
+ v = fn.apply(t, u);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(v, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private <U> CompletableFuture<Void> doThenAcceptBoth
+ (CompletableFuture<? extends U> other,
+ BiConsumer<? super T,? super U> fn,
+ Executor e) {
+ if (other == null || fn == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ ThenAcceptBoth<T,U> d = null;
+ Object r, s = null;
+ if ((r = result) == null || (s = other.result) == null) {
+ d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r == null && (r = result) == null) ||
+ (s == null && (s = other.result) == null)) {
+ if (q != null) {
+ if (s != null ||
+ UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (r != null ||
+ UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p)) {
+ if (s != null)
+ break;
+ q = new CompletionNode(d);
+ }
+ }
+ }
+ if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; U u; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ if (ex != null)
+ u = null;
+ else if (s instanceof AltResult) {
+ ex = ((AltResult)s).ex;
+ u = null;
+ }
+ else {
+ @SuppressWarnings("unchecked") U us = (U) s;
+ u = us;
+ }
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
+ else
+ fn.accept(t, u);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
+ Runnable action,
+ Executor e) {
+ if (other == null || action == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ RunAfterBoth d = null;
+ Object r, s = null;
+ if ((r = result) == null || (s = other.result) == null) {
+ d = new RunAfterBoth(this, other, action, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r == null && (r = result) == null) ||
+ (s == null && (s = other.result) == null)) {
+ if (q != null) {
+ if (s != null ||
+ UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (r != null ||
+ UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p)) {
+ if (s != null)
+ break;
+ q = new CompletionNode(d);
+ }
+ }
+ }
+ if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
+ Throwable ex;
+ if (r instanceof AltResult)
+ ex = ((AltResult)r).ex;
+ else
+ ex = null;
+ if (ex == null && (s instanceof AltResult))
+ ex = ((AltResult)s).ex;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncRun(action, dst));
+ else
+ action.run();
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private <U> CompletableFuture<U> doApplyToEither
+ (CompletableFuture<? extends T> other,
+ Function<? super T, U> fn,
+ Executor e) {
+ if (other == null || fn == null) throw new NullPointerException();
+ CompletableFuture<U> dst = new CompletableFuture<U>();
+ ApplyToEither<T,U> d = null;
+ Object r;
+ if ((r = result) == null && (r = other.result) == null) {
+ d = new ApplyToEither<T,U>(this, other, fn, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r = result) == null && (r = other.result) == null) {
+ if (q != null) {
+ if (UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ q = new CompletionNode(d);
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ U u = null;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncApply<T,U>(t, fn, dst));
+ else
+ u = fn.apply(t);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(u, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<Void> doAcceptEither
+ (CompletableFuture<? extends T> other,
+ Consumer<? super T> fn,
+ Executor e) {
+ if (other == null || fn == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ AcceptEither<T> d = null;
+ Object r;
+ if ((r = result) == null && (r = other.result) == null) {
+ d = new AcceptEither<T>(this, other, fn, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r = result) == null && (r = other.result) == null) {
+ if (q != null) {
+ if (UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ q = new CompletionNode(d);
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncAccept<T>(t, fn, dst));
+ else
+ fn.accept(t);
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<Void> doRunAfterEither
+ (CompletableFuture<?> other,
+ Runnable action,
+ Executor e) {
+ if (other == null || action == null) throw new NullPointerException();
+ CompletableFuture<Void> dst = new CompletableFuture<Void>();
+ RunAfterEither d = null;
+ Object r;
+ if ((r = result) == null && (r = other.result) == null) {
+ d = new RunAfterEither(this, other, action, dst, e);
+ CompletionNode q = null, p = new CompletionNode(d);
+ while ((r = result) == null && (r = other.result) == null) {
+ if (q != null) {
+ if (UNSAFE.compareAndSwapObject
+ (other, COMPLETIONS, q.next = other.completions, q))
+ break;
+ }
+ else if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ q = new CompletionNode(d);
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ Throwable ex;
+ if (r instanceof AltResult)
+ ex = ((AltResult)r).ex;
+ else
+ ex = null;
+ if (ex == null) {
+ try {
+ if (e != null)
+ execAsync(e, new AsyncRun(action, dst));
+ else
+ action.run();
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ other.helpPostComplete();
+ return dst;
+ }
+
+ private <U> CompletableFuture<U> doThenCompose
+ (Function<? super T, ? extends CompletionStage<U>> fn,
+ Executor e) {
+ if (fn == null) throw new NullPointerException();
+ CompletableFuture<U> dst = null;
+ ThenCompose<T,U> d = null;
+ Object r;
+ if ((r = result) == null) {
+ dst = new CompletableFuture<U>();
+ CompletionNode p = new CompletionNode
+ (d = new ThenCompose<T,U>(this, fn, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject
+ (this, COMPLETIONS, p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ if (ex == null) {
+ if (e != null) {
+ if (dst == null)
+ dst = new CompletableFuture<U>();
+ execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
+ }
+ else {
+ try {
+ CompletionStage<U> cs = fn.apply(t);
+ if (cs == null ||
+ (dst = cs.toCompletableFuture()) == null)
+ ex = new NullPointerException();
+ } catch (Throwable rex) {
+ ex = rex;
+ }
+ }
+ }
+ if (dst == null)
+ dst = new CompletableFuture<U>();
+ if (e == null || ex != null)
+ dst.internalComplete(null, ex);
+ }
+ helpPostComplete();
+ dst.helpPostComplete();
+ return dst;
+ }
+
+ private CompletableFuture<T> doWhenComplete
+ (BiConsumer<? super T, ? super Throwable> fn,
+ Executor e) {
+ if (fn == null) throw new NullPointerException();
+ CompletableFuture<T> dst = new CompletableFuture<T>();
+ WhenCompleteCompletion<T> d = null;
+ Object r;
+ if ((r = result) == null) {
+ CompletionNode p =
+ new CompletionNode(d = new WhenCompleteCompletion<T>
+ (this, fn, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
+ p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ Throwable dx = null;
+ try {
+ if (e != null)
+ execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
+ else
+ fn.accept(t, ex);
+ } catch (Throwable rex) {
+ dx = rex;
+ }
+ if (e == null || dx != null)
+ dst.internalComplete(t, ex != null ? ex : dx);
+ }
+ helpPostComplete();
+ return dst;
+ }
+
+ private <U> CompletableFuture<U> doHandle
+ (BiFunction<? super T, Throwable, ? extends U> fn,
+ Executor e) {
+ if (fn == null) throw new NullPointerException();
+ CompletableFuture<U> dst = new CompletableFuture<U>();
+ HandleCompletion<T,U> d = null;
+ Object r;
+ if ((r = result) == null) {
+ CompletionNode p =
+ new CompletionNode(d = new HandleCompletion<T,U>
+ (this, fn, dst, e));
+ while ((r = result) == null) {
+ if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
+ p.next = completions, p))
+ break;
+ }
+ }
+ if (r != null && (d == null || d.compareAndSet(0, 1))) {
+ T t; Throwable ex;
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ t = null;
+ }
+ else {
+ ex = null;
+ @SuppressWarnings("unchecked") T tr = (T) r;
+ t = tr;
+ }
+ U u = null;
+ Throwable dx = null;
+ try {
+ if (e != null)
+ execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
+ else {
+ u = fn.apply(t, ex);
+ dx = null;
+ }
+ } catch (Throwable rex) {
+ dx = rex;
+ u = null;
+ }
+ if (e == null || dx != null)
+ dst.internalComplete(u, dx);
+ }
+ helpPostComplete();
+ return dst;
+ }
+
+
// public methods
/**
@@ -1416,13 +2118,13 @@
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
+ * @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
if (supplier == null) throw new NullPointerException();
CompletableFuture<U> f = new CompletableFuture<U>();
- ForkJoinPool.commonPool().
- execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
+ execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
return f;
}
@@ -1434,6 +2136,7 @@
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
+ * @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
@@ -1441,7 +2144,7 @@
if (executor == null || supplier == null)
throw new NullPointerException();
CompletableFuture<U> f = new CompletableFuture<U>();
- executor.execute(new AsyncSupply<U>(supplier, f));
+ execAsync(executor, new AsyncSupply<U>(supplier, f));
return f;
}
@@ -1457,8 +2160,7 @@
public static CompletableFuture<Void> runAsync(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
CompletableFuture<Void> f = new CompletableFuture<Void>();
- ForkJoinPool.commonPool().
- execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
+ execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
return f;
}
@@ -1477,7 +2179,7 @@
if (executor == null || runnable == null)
throw new NullPointerException();
CompletableFuture<Void> f = new CompletableFuture<Void>();
- executor.execute(new AsyncRun(runnable, f));
+ execAsync(executor, new AsyncRun(runnable, f));
return f;
}
@@ -1486,6 +2188,7 @@
* the given value.
*
* @param value the value
+ * @param <U> the type of the value
* @return the completed CompletableFuture
*/
public static <U> CompletableFuture<U> completedFuture(U value) {
@@ -1657,60 +2360,18 @@
return triggered;
}
- /**
- * Returns a new CompletableFuture that is completed
- * when this CompletableFuture completes, with the result of the
- * given function of this CompletableFuture's result.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied function throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
- public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
+ // CompletionStage methods
+
+ public <U> CompletableFuture<U> thenApply
+ (Function<? super T,? extends U> fn) {
return doThenApply(fn, null);
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, with the result of the
- * given function of this CompletableFuture's result from a
- * task running in the {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied function throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
public <U> CompletableFuture<U> thenApplyAsync
(Function<? super T,? extends U> fn) {
return doThenApply(fn, ForkJoinPool.commonPool());
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, with the result of the
- * given function of this CompletableFuture's result from a
- * task running in the given executor.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied function throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
public <U> CompletableFuture<U> thenApplyAsync
(Function<? super T,? extends U> fn,
Executor executor) {
@@ -1718,1149 +2379,228 @@
return doThenApply(fn, executor);
}
- private <U> CompletableFuture<U> doThenApply
- (Function<? super T,? extends U> fn,
- Executor e) {
- if (fn == null) throw new NullPointerException();
- CompletableFuture<U> dst = new CompletableFuture<U>();
- ThenApply<T,U> d = null;
- Object r;
- if ((r = result) == null) {
- CompletionNode p = new CompletionNode
- (d = new ThenApply<T,U>(this, fn, dst, e));
- while ((r = result) == null) {
- if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- break;
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- U u = null;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncApply<T,U>(t, fn, dst));
- else
- u = fn.apply(t);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(u, ex);
- }
- helpPostComplete();
- return dst;
+ public CompletableFuture<Void> thenAccept
+ (Consumer<? super T> action) {
+ return doThenAccept(action, null);
}
- /**
- * Returns a new CompletableFuture that is completed
- * when this CompletableFuture completes, after performing the given
- * action with this CompletableFuture's result.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
- return doThenAccept(block, null);
+ public CompletableFuture<Void> thenAcceptAsync
+ (Consumer<? super T> action) {
+ return doThenAccept(action, ForkJoinPool.commonPool());
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, after performing the given
- * action with this CompletableFuture's result from a task running
- * in the {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
- return doThenAccept(block, ForkJoinPool.commonPool());
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, after performing the given
- * action with this CompletableFuture's result from a task running
- * in the given executor.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
- Executor executor) {
+ public CompletableFuture<Void> thenAcceptAsync
+ (Consumer<? super T> action,
+ Executor executor) {
if (executor == null) throw new NullPointerException();
- return doThenAccept(block, executor);
+ return doThenAccept(action, executor);
}
- private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
- Executor e) {
- if (fn == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- ThenAccept<T> d = null;
- Object r;
- if ((r = result) == null) {
- CompletionNode p = new CompletionNode
- (d = new ThenAccept<T>(this, fn, dst, e));
- while ((r = result) == null) {
- if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- break;
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncAccept<T>(t, fn, dst));
- else
- fn.accept(t);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when this CompletableFuture completes, after performing the given
- * action.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenRun(Runnable action) {
+ public CompletableFuture<Void> thenRun
+ (Runnable action) {
return doThenRun(action, null);
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, after performing the given
- * action from a task running in the {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenRunAsync(Runnable action) {
+ public CompletableFuture<Void> thenRunAsync
+ (Runnable action) {
return doThenRun(action, ForkJoinPool.commonPool());
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when this CompletableFuture completes, after performing the given
- * action from a task running in the given executor.
- *
- * <p>If this CompletableFuture completes exceptionally, or the
- * supplied action throws an exception, then the returned
- * CompletableFuture completes exceptionally with a
- * CompletionException holding the exception as its cause.
- *
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> thenRunAsync(Runnable action,
- Executor executor) {
+ public CompletableFuture<Void> thenRunAsync
+ (Runnable action,
+ Executor executor) {
if (executor == null) throw new NullPointerException();
return doThenRun(action, executor);
}
- private CompletableFuture<Void> doThenRun(Runnable action,
- Executor e) {
- if (action == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- ThenRun d = null;
- Object r;
- if ((r = result) == null) {
- CompletionNode p = new CompletionNode
- (d = new ThenRun(this, action, dst, e));
- while ((r = result) == null) {
- if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- break;
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- Throwable ex;
- if (r instanceof AltResult)
- ex = ((AltResult)r).ex;
- else
- ex = null;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncRun(action, dst));
- else
- action.run();
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when both this and the other given CompletableFuture complete,
- * with the result of the given function of the results of the two
- * CompletableFutures.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied function throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
public <U,V> CompletableFuture<V> thenCombine
- (CompletableFuture<? extends U> other,
+ (CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
- return doThenCombine(other, fn, null);
+ return doThenCombine(other.toCompletableFuture(), fn, null);
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * with the result of the given function of the results of the two
- * CompletableFutures from a task running in the
- * {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied function throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
public <U,V> CompletableFuture<V> thenCombineAsync
- (CompletableFuture<? extends U> other,
+ (CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
- return doThenCombine(other, fn, ForkJoinPool.commonPool());
+ return doThenCombine(other.toCompletableFuture(), fn,
+ ForkJoinPool.commonPool());
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * with the result of the given function of the results of the two
- * CompletableFutures from a task running in the given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied function throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
public <U,V> CompletableFuture<V> thenCombineAsync
- (CompletableFuture<? extends U> other,
+ (CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor) {
if (executor == null) throw new NullPointerException();
- return doThenCombine(other, fn, executor);
+ return doThenCombine(other.toCompletableFuture(), fn, executor);
}
- private <U,V> CompletableFuture<V> doThenCombine
- (CompletableFuture<? extends U> other,
- BiFunction<? super T,? super U,? extends V> fn,
- Executor e) {
- if (other == null || fn == null) throw new NullPointerException();
- CompletableFuture<V> dst = new CompletableFuture<V>();
- ThenCombine<T,U,V> d = null;
- Object r, s = null;
- if ((r = result) == null || (s = other.result) == null) {
- d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r == null && (r = result) == null) ||
- (s == null && (s = other.result) == null)) {
- if (q != null) {
- if (s != null ||
- UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (r != null ||
- UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p)) {
- if (s != null)
- break;
- q = new CompletionNode(d);
- }
- }
- }
- if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
- T t; U u; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- if (ex != null)
- u = null;
- else if (s instanceof AltResult) {
- ex = ((AltResult)s).ex;
- u = null;
- }
- else {
- @SuppressWarnings("unchecked") U us = (U) s;
- u = us;
- }
- V v = null;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
- else
- v = fn.apply(t, u);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(v, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action with the results of the two
- * CompletableFutures.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
public <U> CompletableFuture<Void> thenAcceptBoth
- (CompletableFuture<? extends U> other,
- BiConsumer<? super T, ? super U> block) {
- return doThenAcceptBoth(other, block, null);
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action) {
+ return doThenAcceptBoth(other.toCompletableFuture(), action, null);
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action with the results of the two
- * CompletableFutures from a task running in the {@link
- * ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
public <U> CompletableFuture<Void> thenAcceptBothAsync
- (CompletableFuture<? extends U> other,
- BiConsumer<? super T, ? super U> block) {
- return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action) {
+ return doThenAcceptBoth(other.toCompletableFuture(), action,
+ ForkJoinPool.commonPool());
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action with the results of the two
- * CompletableFutures from a task running in the given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
public <U> CompletableFuture<Void> thenAcceptBothAsync
- (CompletableFuture<? extends U> other,
- BiConsumer<? super T, ? super U> block,
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action,
Executor executor) {
if (executor == null) throw new NullPointerException();
- return doThenAcceptBoth(other, block, executor);
+ return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
}
- private <U> CompletableFuture<Void> doThenAcceptBoth
- (CompletableFuture<? extends U> other,
- BiConsumer<? super T,? super U> fn,
- Executor e) {
- if (other == null || fn == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- ThenAcceptBoth<T,U> d = null;
- Object r, s = null;
- if ((r = result) == null || (s = other.result) == null) {
- d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r == null && (r = result) == null) ||
- (s == null && (s = other.result) == null)) {
- if (q != null) {
- if (s != null ||
- UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (r != null ||
- UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p)) {
- if (s != null)
- break;
- q = new CompletionNode(d);
- }
- }
- }
- if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
- T t; U u; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- if (ex != null)
- u = null;
- else if (s instanceof AltResult) {
- ex = ((AltResult)s).ex;
- u = null;
- }
- else {
- @SuppressWarnings("unchecked") U us = (U) s;
- u = us;
- }
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
- else
- fn.accept(t, u);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
- Runnable action) {
- return doRunAfterBoth(other, action, null);
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action from a task running in the
- * {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
- Runnable action) {
- return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when both this and the other given CompletableFuture complete,
- * after performing the given action from a task running in the
- * given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, or the supplied action throws an exception,
- * then the returned CompletableFuture completes exceptionally
- * with a CompletionException holding the exception as its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
- Runnable action,
- Executor executor) {
- if (executor == null) throw new NullPointerException();
- return doRunAfterBoth(other, action, executor);
- }
-
- private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
- Runnable action,
- Executor e) {
- if (other == null || action == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- RunAfterBoth d = null;
- Object r, s = null;
- if ((r = result) == null || (s = other.result) == null) {
- d = new RunAfterBoth(this, other, action, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r == null && (r = result) == null) ||
- (s == null && (s = other.result) == null)) {
- if (q != null) {
- if (s != null ||
- UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (r != null ||
- UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p)) {
- if (s != null)
- break;
- q = new CompletionNode(d);
- }
- }
- }
- if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
- Throwable ex;
- if (r instanceof AltResult)
- ex = ((AltResult)r).ex;
- else
- ex = null;
- if (ex == null && (s instanceof AltResult))
- ex = ((AltResult)s).ex;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncRun(action, dst));
- else
- action.run();
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when either this or the other given CompletableFuture completes,
- * with the result of the given function of either this or the other
- * CompletableFuture's result.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied function
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
- public <U> CompletableFuture<U> applyToEither
- (CompletableFuture<? extends T> other,
- Function<? super T, U> fn) {
- return doApplyToEither(other, fn, null);
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * with the result of the given function of either this or the other
- * CompletableFuture's result from a task running in the
- * {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied function
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @return the new CompletableFuture
- */
- public <U> CompletableFuture<U> applyToEitherAsync
- (CompletableFuture<? extends T> other,
- Function<? super T, U> fn) {
- return doApplyToEither(other, fn, ForkJoinPool.commonPool());
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * with the result of the given function of either this or the other
- * CompletableFuture's result from a task running in the
- * given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied function
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param fn the function to use to compute the value of
- * the returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public <U> CompletableFuture<U> applyToEitherAsync
- (CompletableFuture<? extends T> other,
- Function<? super T, U> fn,
- Executor executor) {
- if (executor == null) throw new NullPointerException();
- return doApplyToEither(other, fn, executor);
- }
-
- private <U> CompletableFuture<U> doApplyToEither
- (CompletableFuture<? extends T> other,
- Function<? super T, U> fn,
- Executor e) {
- if (other == null || fn == null) throw new NullPointerException();
- CompletableFuture<U> dst = new CompletableFuture<U>();
- ApplyToEither<T,U> d = null;
- Object r;
- if ((r = result) == null && (r = other.result) == null) {
- d = new ApplyToEither<T,U>(this, other, fn, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r = result) == null && (r = other.result) == null) {
- if (q != null) {
- if (UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- q = new CompletionNode(d);
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- U u = null;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncApply<T,U>(t, fn, dst));
- else
- u = fn.apply(t);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(u, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action with the result of either this
- * or the other CompletableFuture's result.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> acceptEither
- (CompletableFuture<? extends T> other,
- Consumer<? super T> block) {
- return doAcceptEither(other, block, null);
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action with the result of either this
- * or the other CompletableFuture's result from a task running in
- * the {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> acceptEitherAsync
- (CompletableFuture<? extends T> other,
- Consumer<? super T> block) {
- return doAcceptEither(other, block, ForkJoinPool.commonPool());
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action with the result of either this
- * or the other CompletableFuture's result from a task running in
- * the given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param block the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> acceptEitherAsync
- (CompletableFuture<? extends T> other,
- Consumer<? super T> block,
- Executor executor) {
- if (executor == null) throw new NullPointerException();
- return doAcceptEither(other, block, executor);
- }
-
- private CompletableFuture<Void> doAcceptEither
- (CompletableFuture<? extends T> other,
- Consumer<? super T> fn,
- Executor e) {
- if (other == null || fn == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- AcceptEither<T> d = null;
- Object r;
- if ((r = result) == null && (r = other.result) == null) {
- d = new AcceptEither<T>(this, other, fn, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r = result) == null && (r = other.result) == null) {
- if (q != null) {
- if (UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- q = new CompletionNode(d);
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncAccept<T>(t, fn, dst));
- else
- fn.accept(t);
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
- }
-
- /**
- * Returns a new CompletableFuture that is completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
- Runnable action) {
- return doRunAfterEither(other, action, null);
- }
-
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action from a task running in the
- * {@link ForkJoinPool#commonPool()}.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterEitherAsync
- (CompletableFuture<?> other,
+ public CompletableFuture<Void> runAfterBoth
+ (CompletionStage<?> other,
Runnable action) {
- return doRunAfterEither(other, action, ForkJoinPool.commonPool());
+ return doRunAfterBoth(other.toCompletableFuture(), action, null);
}
- /**
- * Returns a new CompletableFuture that is asynchronously completed
- * when either this or the other given CompletableFuture completes,
- * after performing the given action from a task running in the
- * given executor.
- *
- * <p>If this and/or the other CompletableFuture complete
- * exceptionally, then the returned CompletableFuture may also do so,
- * with a CompletionException holding one of these exceptions as its
- * cause. No guarantees are made about which result or exception is
- * used in the returned CompletableFuture. If the supplied action
- * throws an exception, then the returned CompletableFuture completes
- * exceptionally with a CompletionException holding the exception as
- * its cause.
- *
- * @param other the other CompletableFuture
- * @param action the action to perform before completing the
- * returned CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the new CompletableFuture
- */
- public CompletableFuture<Void> runAfterEitherAsync
- (CompletableFuture<?> other,
+ public CompletableFuture<Void> runAfterBothAsync
+ (CompletionStage<?> other,
+ Runnable action) {
+ return doRunAfterBoth(other.toCompletableFuture(), action,
+ ForkJoinPool.commonPool());
+ }
+
+ public CompletableFuture<Void> runAfterBothAsync
+ (CompletionStage<?> other,
Runnable action,
Executor executor) {
if (executor == null) throw new NullPointerException();
- return doRunAfterEither(other, action, executor);
+ return doRunAfterBoth(other.toCompletableFuture(), action, executor);
}
- private CompletableFuture<Void> doRunAfterEither
- (CompletableFuture<?> other,
+
+ public <U> CompletableFuture<U> applyToEither
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn) {
+ return doApplyToEither(other.toCompletableFuture(), fn, null);
+ }
+
+ public <U> CompletableFuture<U> applyToEitherAsync
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn) {
+ return doApplyToEither(other.toCompletableFuture(), fn,
+ ForkJoinPool.commonPool());
+ }
+
+ public <U> CompletableFuture<U> applyToEitherAsync
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn,
+ Executor executor) {
+ if (executor == null) throw new NullPointerException();
+ return doApplyToEither(other.toCompletableFuture(), fn, executor);
+ }
+
+ public CompletableFuture<Void> acceptEither
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action) {
+ return doAcceptEither(other.toCompletableFuture(), action, null);
+ }
+
+ public CompletableFuture<Void> acceptEitherAsync
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action) {
+ return doAcceptEither(other.toCompletableFuture(), action,
+ ForkJoinPool.commonPool());
+ }
+
+ public CompletableFuture<Void> acceptEitherAsync
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action,
+ Executor executor) {
+ if (executor == null) throw new NullPointerException();
+ return doAcceptEither(other.toCompletableFuture(), action, executor);
+ }
+
+ public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
+ Runnable action) {
+ return doRunAfterEither(other.toCompletableFuture(), action, null);
+ }
+
+ public CompletableFuture<Void> runAfterEitherAsync
+ (CompletionStage<?> other,
+ Runnable action) {
+ return doRunAfterEither(other.toCompletableFuture(), action,
+ ForkJoinPool.commonPool());
+ }
+
+ public CompletableFuture<Void> runAfterEitherAsync
+ (CompletionStage<?> other,
Runnable action,
- Executor e) {
- if (other == null || action == null) throw new NullPointerException();
- CompletableFuture<Void> dst = new CompletableFuture<Void>();
- RunAfterEither d = null;
- Object r;
- if ((r = result) == null && (r = other.result) == null) {
- d = new RunAfterEither(this, other, action, dst, e);
- CompletionNode q = null, p = new CompletionNode(d);
- while ((r = result) == null && (r = other.result) == null) {
- if (q != null) {
- if (UNSAFE.compareAndSwapObject
- (other, COMPLETIONS, q.next = other.completions, q))
- break;
- }
- else if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- q = new CompletionNode(d);
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- Throwable ex;
- if (r instanceof AltResult)
- ex = ((AltResult)r).ex;
- else
- ex = null;
- if (ex == null) {
- try {
- if (e != null)
- e.execute(new AsyncRun(action, dst));
- else
- action.run();
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- other.helpPostComplete();
- return dst;
+ Executor executor) {
+ if (executor == null) throw new NullPointerException();
+ return doRunAfterEither(other.toCompletableFuture(), action, executor);
}
- /**
- * Returns a CompletableFuture that upon completion, has the same
- * value as produced by the given function of the result of this
- * CompletableFuture.
- *
- * <p>If this CompletableFuture completes exceptionally, then the
- * returned CompletableFuture also does so, with a
- * CompletionException holding this exception as its cause.
- * Similarly, if the computed CompletableFuture completes
- * exceptionally, then so does the returned CompletableFuture.
- *
- * @param fn the function returning a new CompletableFuture
- * @return the CompletableFuture
- */
public <U> CompletableFuture<U> thenCompose
- (Function<? super T, CompletableFuture<U>> fn) {
+ (Function<? super T, ? extends CompletionStage<U>> fn) {
return doThenCompose(fn, null);
}
- /**
- * Returns a CompletableFuture that upon completion, has the same
- * value as that produced asynchronously using the {@link
- * ForkJoinPool#commonPool()} by the given function of the result
- * of this CompletableFuture.
- *
- * <p>If this CompletableFuture completes exceptionally, then the
- * returned CompletableFuture also does so, with a
- * CompletionException holding this exception as its cause.
- * Similarly, if the computed CompletableFuture completes
- * exceptionally, then so does the returned CompletableFuture.
- *
- * @param fn the function returning a new CompletableFuture
- * @return the CompletableFuture
- */
public <U> CompletableFuture<U> thenComposeAsync
- (Function<? super T, CompletableFuture<U>> fn) {
+ (Function<? super T, ? extends CompletionStage<U>> fn) {
return doThenCompose(fn, ForkJoinPool.commonPool());
}
- /**
- * Returns a CompletableFuture that upon completion, has the same
- * value as that produced asynchronously using the given executor
- * by the given function of this CompletableFuture.
- *
- * <p>If this CompletableFuture completes exceptionally, then the
- * returned CompletableFuture also does so, with a
- * CompletionException holding this exception as its cause.
- * Similarly, if the computed CompletableFuture completes
- * exceptionally, then so does the returned CompletableFuture.
- *
- * @param fn the function returning a new CompletableFuture
- * @param executor the executor to use for asynchronous execution
- * @return the CompletableFuture
- */
public <U> CompletableFuture<U> thenComposeAsync
- (Function<? super T, CompletableFuture<U>> fn,
+ (Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
if (executor == null) throw new NullPointerException();
return doThenCompose(fn, executor);
}
- private <U> CompletableFuture<U> doThenCompose
- (Function<? super T, CompletableFuture<U>> fn,
- Executor e) {
- if (fn == null) throw new NullPointerException();
- CompletableFuture<U> dst = null;
- ThenCompose<T,U> d = null;
- Object r;
- if ((r = result) == null) {
- dst = new CompletableFuture<U>();
- CompletionNode p = new CompletionNode
- (d = new ThenCompose<T,U>(this, fn, dst, e));
- while ((r = result) == null) {
- if (UNSAFE.compareAndSwapObject
- (this, COMPLETIONS, p.next = completions, p))
- break;
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- if (ex == null) {
- if (e != null) {
- if (dst == null)
- dst = new CompletableFuture<U>();
- e.execute(new AsyncCompose<T,U>(t, fn, dst));
- }
- else {
- try {
- if ((dst = fn.apply(t)) == null)
- ex = new NullPointerException();
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- }
- if (dst == null)
- dst = new CompletableFuture<U>();
- if (e == null || ex != null)
- dst.internalComplete(null, ex);
- }
- helpPostComplete();
- dst.helpPostComplete();
- return dst;
+ public CompletableFuture<T> whenComplete
+ (BiConsumer<? super T, ? super Throwable> action) {
+ return doWhenComplete(action, null);
}
+ public CompletableFuture<T> whenCompleteAsync
+ (BiConsumer<? super T, ? super Throwable> action) {
+ return doWhenComplete(action, ForkJoinPool.commonPool());
+ }
+
+ public CompletableFuture<T> whenCompleteAsync
+ (BiConsumer<? super T, ? super Throwable> action,
+ Executor executor) {
+ if (executor == null) throw new NullPointerException();
+ return doWhenComplete(action, executor);
+ }
+
+ public <U> CompletableFuture<U> handle
+ (BiFunction<? super T, Throwable, ? extends U> fn) {
+ return doHandle(fn, null);
+ }
+
+ public <U> CompletableFuture<U> handleAsync
+ (BiFunction<? super T, Throwable, ? extends U> fn) {
+ return doHandle(fn, ForkJoinPool.commonPool());
+ }
+
+ public <U> CompletableFuture<U> handleAsync
+ (BiFunction<? super T, Throwable, ? extends U> fn,
+ Executor executor) {
+ if (executor == null) throw new NullPointerException();
+ return doHandle(fn, executor);
+ }
+
+ /**
+ * Returns this CompletableFuture
+ *
+ * @return this CompletableFuture
+ */
+ public CompletableFuture<T> toCompletableFuture() {
+ return this;
+ }
+
+ // not in interface CompletionStage
+
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
@@ -2868,6 +2608,8 @@
* completion when it completes exceptionally; otherwise, if this
* CompletableFuture completes normally, then the returned
* CompletableFuture also completes normally with the same value.
+ * Note: More flexible versions of this functionality are
+ * available using methods {@code whenComplete} and {@code handle}.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture if this CompletableFuture completed
@@ -2882,7 +2624,8 @@
Object r;
if ((r = result) == null) {
CompletionNode p =
- new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
+ new CompletionNode(d = new ExceptionCompletion<T>
+ (this, fn, dst));
while ((r = result) == null) {
if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
p.next = completions, p))
@@ -2910,59 +2653,6 @@
return dst;
}
- /**
- * Returns a new CompletableFuture that is completed when this
- * CompletableFuture completes, with the result of the given
- * function of the result and exception of this CompletableFuture's
- * completion. The given function is invoked with the result (or
- * {@code null} if none) and the exception (or {@code null} if none)
- * of this CompletableFuture when complete.
- *
- * @param fn the function to use to compute the value of the
- * returned CompletableFuture
- * @return the new CompletableFuture
- */
- public <U> CompletableFuture<U> handle
- (BiFunction<? super T, Throwable, ? extends U> fn) {
- if (fn == null) throw new NullPointerException();
- CompletableFuture<U> dst = new CompletableFuture<U>();
- HandleCompletion<T,U> d = null;
- Object r;
- if ((r = result) == null) {
- CompletionNode p =
- new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
- while ((r = result) == null) {
- if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
- p.next = completions, p))
- break;
- }
- }
- if (r != null && (d == null || d.compareAndSet(0, 1))) {
- T t; Throwable ex;
- if (r instanceof AltResult) {
- ex = ((AltResult)r).ex;
- t = null;
- }
- else {
- ex = null;
- @SuppressWarnings("unchecked") T tr = (T) r;
- t = tr;
- }
- U u; Throwable dx;
- try {
- u = fn.apply(t, ex);
- dx = null;
- } catch (Throwable rex) {
- dx = rex;
- u = null;
- }
- dst.internalComplete(u, dx);
- }
- helpPostComplete();
- return dst;
- }
-
-
/* ------------- Arbitrary-arity constructions -------------- */
/*
@@ -3215,6 +2905,21 @@
}
/**
+ * Returns {@code true} if this CompletableFuture completed
+ * exceptionally, in any way. Possible causes include
+ * cancellation, explicit invocation of {@code
+ * completeExceptionally}, and abrupt termination of a
+ * CompletionStage action.
+ *
+ * @return {@code true} if this CompletableFuture completed
+ * exceptionally
+ */
+ public boolean isCompletedExceptionally() {
+ Object r;
+ return ((r = result) instanceof AltResult) && r != NIL;
+ }
+
+ /**
* Forcibly sets or resets the value subsequently returned by
* method {@link #get()} and related methods, whether or not
* already completed. This method is designed for use only in
diff --git a/src/share/classes/java/util/concurrent/CompletionStage.java b/src/share/classes/java/util/concurrent/CompletionStage.java
new file mode 100644
index 0000000..6de6098
--- /dev/null
+++ b/src/share/classes/java/util/concurrent/CompletionStage.java
@@ -0,0 +1,760 @@
+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * This file is available under and governed by the GNU General Public
+ * License version 2 only, as published by the Free Software Foundation.
+ * However, the following notice accompanied the original version of this
+ * file:
+ *
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+package java.util.concurrent;
+import java.util.function.Supplier;
+import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.BiFunction;
+import java.util.concurrent.Executor;
+
+/**
+ * A stage of a possibly asynchronous computation, that performs an
+ * action or computes a value when another CompletionStage completes.
+ * A stage completes upon termination of its computation, but this may
+ * in turn trigger other dependent stages. The functionality defined
+ * in this interface takes only a few basic forms, which expand out to
+ * a larger set of methods to capture a range of usage styles: <ul>
+ *
+ * <li>The computation performed by a stage may be expressed as a
+ * Function, Consumer, or Runnable (using methods with names including
+ * <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively)
+ * depending on whether it requires arguments and/or produces results.
+ * For example, {@code stage.thenApply(x -> square(x)).thenAccept(x ->
+ * System.out.print(x)).thenRun(() -> System.out.println())}. An
+ * additional form (<em>compose</em>) applies functions of stages
+ * themselves, rather than their results. </li>
+ *
+ * <li> One stage's execution may be triggered by completion of a
+ * single stage, or both of two stages, or either of two stages.
+ * Dependencies on a single stage are arranged using methods with
+ * prefix <em>then</em>. Those triggered by completion of
+ * <em>both</em> of two stages may <em>combine</em> their results or
+ * effects, using correspondingly named methods. Those triggered by
+ * <em>either</em> of two stages make no guarantees about which of the
+ * results or effects are used for the dependent stage's
+ * computation.</li>
+ *
+ * <li> Dependencies among stages control the triggering of
+ * computations, but do not otherwise guarantee any particular
+ * ordering. Additionally, execution of a new stage's computations may
+ * be arranged in any of three ways: default execution, default
+ * asynchronous execution (using methods with suffix <em>async</em>
+ * that employ the stage's default asynchronous execution facility),
+ * or custom (via a supplied {@link Executor}). The execution
+ * properties of default and async modes are specified by
+ * CompletionStage implementations, not this interface. Methods with
+ * explicit Executor arguments may have arbitrary execution
+ * properties, and might not even support concurrent execution, but
+ * are arranged for processing in a way that accommodates asynchrony.
+ *
+ * <li> Two method forms support processing whether the triggering
+ * stage completed normally or exceptionally: Method {@link
+ * #whenComplete whenComplete} allows injection of an action
+ * regardless of outcome, otherwise preserving the outcome in its
+ * completion. Method {@link #handle handle} additionally allows the
+ * stage to compute a replacement result that may enable further
+ * processing by other dependent stages. In all other cases, if a
+ * stage's computation terminates abruptly with an (unchecked)
+ * exception or error, then all dependent stages requiring its
+ * completion complete exceptionally as well, with a {@link
+ * CompletionException} holding the exception as its cause. If a
+ * stage is dependent on <em>both</em> of two stages, and both
+ * complete exceptionally, then the CompletionException may correspond
+ * to either one of these exceptions. If a stage is dependent on
+ * <em>either</em> of two others, and only one of them completes
+ * exceptionally, no guarantees are made about whether the dependent
+ * stage completes normally or exceptionally. In the case of method
+ * {@code whenComplete}, when the supplied action itself encounters an
+ * exception, then the stage exceptionally completes with this
+ * exception if not already completed exceptionally.</li>
+ *
+ * </ul>
+ *
+ * <p>All methods adhere to the above triggering, execution, and
+ * exceptional completion specifications (which are not repeated in
+ * individual method specifications). Additionally, while arguments
+ * used to pass a completion result (that is, for parameters of type
+ * {@code T}) for methods accepting them may be null, passing a null
+ * value for any other parameter will result in a {@link
+ * NullPointerException} being thrown.
+ *
+ * <p>This interface does not define methods for initially creating,
+ * forcibly completing normally or exceptionally, probing completion
+ * status or results, or awaiting completion of a stage.
+ * Implementations of CompletionStage may provide means of achieving
+ * such effects, as appropriate. Method {@link #toCompletableFuture}
+ * enables interoperability among different implementations of this
+ * interface by providing a common conversion type.
+ *
+ * @author Doug Lea
+ * @since 1.8
+ */
+public interface CompletionStage<T> {
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed with this stage's result as the argument
+ * to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using this stage's default asynchronous
+ * execution facility, with this stage's result as the argument to
+ * the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> thenApplyAsync
+ (Function<? super T,? extends U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using the supplied Executor, with this
+ * stage's result as the argument to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> thenApplyAsync
+ (Function<? super T,? extends U> fn,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed with this stage's result as the argument
+ * to the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenAccept(Consumer<? super T> action);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using this stage's default asynchronous
+ * execution facility, with this stage's result as the argument to
+ * the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using the supplied Executor, with this
+ * stage's result as the argument to the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
+ Executor executor);
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, executes the given action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenRun(Runnable action);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, executes the given action using this stage's default
+ * asynchronous execution facility.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenRunAsync(Runnable action);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, executes the given action using the supplied Executor.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> thenRunAsync(Runnable action,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage both complete normally, is executed with the two
+ * results as arguments to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the type of the other CompletionStage's result
+ * @param <V> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U,V> CompletionStage<V> thenCombine
+ (CompletionStage<? extends U> other,
+ BiFunction<? super T,? super U,? extends V> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, is executed using this stage's
+ * default asynchronous execution facility, with the two results
+ * as arguments to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the type of the other CompletionStage's result
+ * @param <V> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U,V> CompletionStage<V> thenCombineAsync
+ (CompletionStage<? extends U> other,
+ BiFunction<? super T,? super U,? extends V> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, is executed using the supplied
+ * executor, with the two results as arguments to the supplied
+ * function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the type of the other CompletionStage's result
+ * @param <V> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U,V> CompletionStage<V> thenCombineAsync
+ (CompletionStage<? extends U> other,
+ BiFunction<? super T,? super U,? extends V> fn,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage both complete normally, is executed with the two
+ * results as arguments to the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param <U> the type of the other CompletionStage's result
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<Void> thenAcceptBoth
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, is executed using this stage's
+ * default asynchronous execution facility, with the two results
+ * as arguments to the supplied action.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param <U> the type of the other CompletionStage's result
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<Void> thenAcceptBothAsync
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, is executed using the supplied
+ * executor, with the two results as arguments to the supplied
+ * function.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the type of the other CompletionStage's result
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<Void> thenAcceptBothAsync
+ (CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage both complete normally, executes the given action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
+ Runnable action);
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, executes the given action using
+ * this stage's default asynchronous execution facility.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
+ Runnable action);
+
+ /**
+ * Returns a new CompletionStage that, when this and the other
+ * given stage complete normally, executes the given action using
+ * the supplied executor
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
+ Runnable action,
+ Executor executor);
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed with the
+ * corresponding result as argument to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> applyToEither
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed using this
+ * stage's default asynchronous execution facility, with the
+ * corresponding result as argument to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> applyToEitherAsync
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed using the
+ * supplied executor, with the corresponding result as argument to
+ * the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param fn the function to use to compute the value of
+ * the returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> applyToEitherAsync
+ (CompletionStage<? extends T> other,
+ Function<? super T, U> fn,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed with the
+ * corresponding result as argument to the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> acceptEither
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed using this
+ * stage's default asynchronous execution facility, with the
+ * corresponding result as argument to the supplied action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> acceptEitherAsync
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, is executed using the
+ * supplied executor, with the corresponding result as argument to
+ * the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> acceptEitherAsync
+ (CompletionStage<? extends T> other,
+ Consumer<? super T> action,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, executes the given action.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
+ Runnable action);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, executes the given action
+ * using this stage's default asynchronous execution facility.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterEitherAsync
+ (CompletionStage<?> other,
+ Runnable action);
+
+ /**
+ * Returns a new CompletionStage that, when either this or the
+ * other given stage complete normally, executes the given action
+ * using supplied executor.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param other the other CompletionStage
+ * @param action the action to perform before completing the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<Void> runAfterEitherAsync
+ (CompletionStage<?> other,
+ Runnable action,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed with this stage as the argument
+ * to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function returning a new CompletionStage
+ * @param <U> the type of the returned CompletionStage's result
+ * @return the CompletionStage
+ */
+ public <U> CompletionStage<U> thenCompose
+ (Function<? super T, ? extends CompletionStage<U>> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using this stage's default asynchronous
+ * execution facility, with this stage as the argument to the
+ * supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function returning a new CompletionStage
+ * @param <U> the type of the returned CompletionStage's result
+ * @return the CompletionStage
+ */
+ public <U> CompletionStage<U> thenComposeAsync
+ (Function<? super T, ? extends CompletionStage<U>> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * normally, is executed using the supplied Executor, with this
+ * stage's result as the argument to the supplied function.
+ *
+ * See the {@link CompletionStage} documentation for rules
+ * covering exceptional completion.
+ *
+ * @param fn the function returning a new CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the type of the returned CompletionStage's result
+ * @return the CompletionStage
+ */
+ public <U> CompletionStage<U> thenComposeAsync
+ (Function<? super T, ? extends CompletionStage<U>> fn,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * exceptionally, is executed with this stage's exception as the
+ * argument to the supplied function. Otherwise, if this stage
+ * completes normally, then the returned stage also completes
+ * normally with the same value.
+ *
+ * @param fn the function to use to compute the value of the
+ * returned CompletionStage if this CompletionStage completed
+ * exceptionally
+ * @return the new CompletionStage
+ */
+ public CompletionStage<T> exceptionally
+ (Function<Throwable, ? extends T> fn);
+
+ /**
+ * Returns a new CompletionStage with the same result or exception
+ * as this stage, and when this stage completes, executes the
+ * given action with the result (or {@code null} if none) and the
+ * exception (or {@code null} if none) of this stage.
+ *
+ * @param action the action to perform
+ * @return the new CompletionStage
+ */
+ public CompletionStage<T> whenComplete
+ (BiConsumer<? super T, ? super Throwable> action);
+
+ /**
+ * Returns a new CompletionStage with the same result or exception
+ * as this stage, and when this stage completes, executes the
+ * given action executes the given action using this stage's
+ * default asynchronous execution facility, with the result (or
+ * {@code null} if none) and the exception (or {@code null} if
+ * none) of this stage as arguments.
+ *
+ * @param action the action to perform
+ * @return the new CompletionStage
+ */
+ public CompletionStage<T> whenCompleteAsync
+ (BiConsumer<? super T, ? super Throwable> action);
+
+ /**
+ * Returns a new CompletionStage with the same result or exception
+ * as this stage, and when this stage completes, executes using
+ * the supplied Executor, the given action with the result (or
+ * {@code null} if none) and the exception (or {@code null} if
+ * none) of this stage as arguments.
+ *
+ * @param action the action to perform
+ * @param executor the executor to use for asynchronous execution
+ * @return the new CompletionStage
+ */
+ public CompletionStage<T> whenCompleteAsync
+ (BiConsumer<? super T, ? super Throwable> action,
+ Executor executor);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * either normally or exceptionally, is executed with this stage's
+ * result and exception as arguments to the supplied function.
+ * The given function is invoked with the result (or {@code null}
+ * if none) and the exception (or {@code null} if none) of this
+ * stage when complete as arguments.
+ *
+ * @param fn the function to use to compute the value of the
+ * returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> handle
+ (BiFunction<? super T, Throwable, ? extends U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * either normally or exceptionally, is executed using this stage's
+ * default asynchronous execution facility, with this stage's
+ * result and exception as arguments to the supplied function.
+ * The given function is invoked with the result (or {@code null}
+ * if none) and the exception (or {@code null} if none) of this
+ * stage when complete as arguments.
+ *
+ * @param fn the function to use to compute the value of the
+ * returned CompletionStage
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> handleAsync
+ (BiFunction<? super T, Throwable, ? extends U> fn);
+
+ /**
+ * Returns a new CompletionStage that, when this stage completes
+ * either normally or exceptionally, is executed using the
+ * supplied executor, with this stage's result and exception as
+ * arguments to the supplied function. The given function is
+ * invoked with the result (or {@code null} if none) and the
+ * exception (or {@code null} if none) of this stage when complete
+ * as arguments.
+ *
+ * @param fn the function to use to compute the value of the
+ * returned CompletionStage
+ * @param executor the executor to use for asynchronous execution
+ * @param <U> the function's return type
+ * @return the new CompletionStage
+ */
+ public <U> CompletionStage<U> handleAsync
+ (BiFunction<? super T, Throwable, ? extends U> fn,
+ Executor executor);
+
+ /**
+ * Returns a {@link CompletableFuture} maintaining the same
+ * completion properties as this stage. If this stage is already a
+ * CompletableFuture, this method may return this stage itself.
+ * Otherwise, invocation of this method may be equivalent in
+ * effect to {@code thenApply(x -> x)}, but returning an instance
+ * of type {@code CompletableFuture}. A CompletionStage
+ * implementation that does not choose to interoperate with others
+ * may throw {@code UnsupportedOperationException}.
+ *
+ * @return the CompletableFuture
+ * @throws UnsupportedOperationException if this implementation
+ * does not interoperate with CompletableFuture
+ */
+ public CompletableFuture<T> toCompletableFuture();
+
+}
diff --git a/test/ProblemList.txt b/test/ProblemList.txt
index 595dfb1..7dcb991 100644
--- a/test/ProblemList.txt
+++ b/test/ProblemList.txt
@@ -370,12 +370,6 @@
# Filed 6772009
java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java generic-all
-# 8020435
-java/util/concurrent/CompletableFuture/Basic.java generic-all
-
-# 8020291
-java/util/Random/RandomStreamTest.java generic-all
-
# 7041639, Solaris DSA keypair generation bug
java/util/TimeZone/TimeZoneDatePermissionCheck.sh solaris-all
diff --git a/test/java/util/concurrent/CompletableFuture/Basic.java b/test/java/util/concurrent/CompletableFuture/Basic.java
index ec6909d..f623389 100644
--- a/test/java/util/concurrent/CompletableFuture/Basic.java
+++ b/test/java/util/concurrent/CompletableFuture/Basic.java
@@ -34,6 +34,8 @@
/*
* @test
* @bug 8005696
+ * @run main Basic
+ * @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 Basic
* @summary Basic tests for CompletableFuture
* @author Chris Hegarty
*/