blob: 974e246283162751b6ceb037a15902440d413d83 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
import com.google.common.util.concurrent.internal.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*
/**
* Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
*
* The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
* [IllegalArgumentException], because Futures don't have a way to start lazily.
*
* The created coroutine is cancelled when the resulting future completes successfully, fails, or
* is cancelled.
*
* `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
* added/overlaid by passing [context].
*
* If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
* member, [Dispatchers.Default] is used.
*
* The parent job is inherited from this [CoroutineScope], and can be overridden by passing
* a [Job] in [context].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
* facilities.
*
* Note that the error and cancellation semantics of [future] are _subtly different_ than
* [asListenableFuture]'s. See [ListenableFutureCoroutine] for details.
*
* @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the code to execute.
*/
public fun <T> CoroutineScope.future(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
val future = SettableFuture.create<T>()
val coroutine = ListenableFutureCoroutine(newContext, future)
future.addListener(
coroutine,
MoreExecutors.directExecutor())
coroutine.start(start, coroutine, block)
// Return hides the SettableFuture. This should prevent casting.
return object: ListenableFuture<T> by future {}
}
/**
* Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
*
* Completion is non-atomic between the two promises.
*
* Cancellation is propagated bidirectionally.
*
* When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
* complete the returned `Deferred` with the same value or exception. This will succeed, barring a
* race with cancellation of the `Deferred`.
*
* When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
* it will cancel the returned `Deferred`.
*
* When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the
* cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
* `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
* will complete with a different outcome than `this` `ListenableFuture`.
*/
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
/* This method creates very specific behaviour as it entangles the `Deferred` and
* `ListenableFuture`. This behaviour is the best discovered compromise between the possible
* states and interface contracts of a `Future` and the states of a `Deferred`. The specific
* behaviour is described here.
*
* When `this` `ListenableFuture` is successfully cancelled - meaning
* `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
* `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
* `Deferred` will always be put into its "cancelling" state and (barring uncooperative
* cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
* cancelled.
*
* When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
* called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
* cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
* terminal state.
*
* The returned `Deferred` may receive and suppress the `true` return value from
* `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
* This is unavoidable, so make sure no idempotent cancellation work is performed by a
* reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
* cancellation was from the `Deferred` representation of the task.
*
* This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
* semantics. See `Job` for a description of coroutine cancellation semantics.
*/
// First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
// Exception by using the same instance the Future created.
if (this is InternalFutureFailureAccess) {
val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
if (t != null) {
return CompletableDeferred<T>().also {
it.completeExceptionally(t)
}
}
}
// Second, try the fast path for a completed Future. The Future is known to be done, so get()
// will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
// getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
// handle interruption.
if (isDone) {
return try {
CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
} catch (e: CancellationException) {
CompletableDeferred<T>().also { it.cancel(e) }
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
}
}
// Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
val deferred = CompletableDeferred<T>()
Futures.addCallback(this, object : FutureCallback<T> {
override fun onSuccess(result: T?) {
// Here we work with flexible types, so we unchecked cast to trick the type system
@Suppress("UNCHECKED_CAST")
deferred.complete(result as T)
}
override fun onFailure(t: Throwable) {
deferred.completeExceptionally(t)
}
}, MoreExecutors.directExecutor())
// ... And cancel the Future when the deferred completes. Since the return type of this method
// is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
// completion handler runs before the Future is completed, the Deferred must have been
// cancelled and should propagate its cancellation. If it runs after the Future is completed,
// this is a no-op.
deferred.invokeOnCompletion {
cancel(false)
}
return deferred
}
/**
* Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
*
* [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
* fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
* [Exception].
*
* If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
* state - a serious fundamental bug.
*/
private fun ExecutionException.nonNullCause(): Throwable {
return this.cause!!
}
/**
* Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
*
* Completion is non-atomic between the two promises.
*
* When either promise successfully completes, it will attempt to synchronously complete its
* counterpart with the same value. This will succeed barring a race with cancellation.
*
* When either promise completes with an Exception, it will attempt to synchronously complete its
* counterpart with the same Exception. This will succeed barring a race with cancellation.
*
* Cancellation is propagated bidirectionally.
*
* When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
* [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
* the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
* non-cancelled terminal state.
*
* When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
* completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
* cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
* the returned `Future` will always _eventually_ reach its cancelled state when either promise is
* successfully cancelled, for their different meanings of "successfully cancelled".
*
* This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
* semantics. See [Job] for a description of coroutine cancellation semantics. See
* [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
* corner cases of this method.
*/
public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
val outerFuture = OuterFuture<T>(this)
outerFuture.afterInit()
return outerFuture
}
/**
* Awaits completion of `this` [ListenableFuture] without blocking a thread.
*
* This suspend function is cancellable.
*
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
*
* This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
* If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
* [kotlinx.coroutines.NonCancellable].
*
*/
public suspend fun <T> ListenableFuture<T>.await(): T {
try {
if (isDone) return Uninterruptibles.getUninterruptibly(this)
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future, other than CancellationException. Cancellation is propagated upward so that
// the coroutine running this suspend function may process it.
// Any other Exception showing up here indicates a very fundamental bug in a
// Future implementation.
throw e.nonNullCause()
}
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
addListener(
ToContinuation(this, cont),
MoreExecutors.directExecutor())
cont.invokeOnCancellation {
cancel(false)
}
}
}
/**
* Propagates the outcome of [futureToObserve] to [continuation] on completion.
*
* Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
* and fails, the cause of the Future will be propagated without a wrapping
* [ExecutionException] when thrown.
*/
private class ToContinuation<T>(
val futureToObserve: ListenableFuture<T>,
val continuation: CancellableContinuation<T>
): Runnable {
override fun run() {
if (futureToObserve.isCancelled) {
continuation.cancel()
} else {
try {
continuation.resumeWith(
Result.success(Uninterruptibles.getUninterruptibly(futureToObserve)))
} catch (e: ExecutionException) {
// ExecutionException is the only kind of exception that can be thrown from a gotten
// Future. Anything else showing up here indicates a very fundamental bug in a
// Future implementation.
continuation.resumeWithException(e.nonNullCause())
}
}
}
}
/**
* An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
* completion.
*
* The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback.
* See [run] for details. Both types are implemented by this object to save an allocation.
*/
private class ListenableFutureCoroutine<T>(
context: CoroutineContext,
private val future: SettableFuture<T>
) : AbstractCoroutine<T>(context), Runnable {
/**
* When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if
* [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if
* and only if its `isCancelled()` method returns true.
*
* Any error that occurs after successfully cancelling a [ListenableFuture]
* created by submitting the returned object as a [Runnable] to an `Executor` will be passed
* to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
* it to return an error after it is successfully cancelled.
*
* By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
* cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
* the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
* cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
* the [Deferred] pointing to the task will be used to observe any error outcome occurring after
* cancellation.
*
* This may be counterintuitive, but it maintains the error and cancellation contracts of both
* the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
* to the same running task.
*/
override fun run() {
if (future.isCancelled) {
cancel()
}
}
override fun onCompleted(value: T) {
future.set(value)
}
// TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation?
override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!future.setException(cause) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
handleCoroutineException(context, cause)
}
}
}
/**
* A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with
* it.
*
* This setup allows the returned [ListenableFuture] to maintain the following properties:
*
* - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
* and [isCancelled] methods
* - Cancellation propagation both to and from [Deferred]
* - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
* with different concrete implementations of [ListenableFuture]
* - Fully correct cancellation and listener happens-after obeying [Future] and
* [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
* The best way to be correct, especially given the fun corner cases from
* [AsyncFuture.setAsync], is to just use an [AsyncFuture].
* - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture]
* around its input [deferred] as a state engine to establish happens-after-completion. This
* could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the
* cost of the implementation's readability.
*/
private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> {
val innerFuture = DeferredListenableFuture(deferred)
// Adding the listener after initialization resolves partial construction hairpin problem.
//
// This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may
// have completed earlier if it got cancelled! See DeferredListenableFuture.
fun afterInit() {
deferred.invokeOnCompletion {
innerFuture.complete()
}
}
/**
* Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
* [Job.isCancelled].
*
* When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate
* [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class.
*
* See [DeferredListenableFuture.cancel].
*/
override fun isCancelled(): Boolean {
// This expression ensures that isCancelled() will *never* return true when isDone() returns false.
// In the case that the deferred has completed with cancellation, completing `this`, its
// reaching the "cancelled" state with a cause of CancellationException is treated as the
// same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and
// this Future hasn't itself been successfully cancelled, the Future will return
// isCancelled() == false. This is the only discovered way to reconcile the two different
// cancellation contracts.
return isDone
&& (innerFuture.isCancelled
|| deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException)
}
/**
* Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that
* Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes
* happens-after ordering for completion of the [Deferred] input to [OuterFuture].
*
* `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after
* contract of [Future] to be correctly followed. If this method were to directly use
* _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this
* [ListenableFuture] is created from might be in an incomplete state when used by `get()`.
*/
override fun get(): T {
return getInternal(innerFuture.get())
}
/** See [get()]. */
override fun get(timeout: Long, unit: TimeUnit): T {
return getInternal(innerFuture.get(timeout, unit))
}
/** See [get()]. */
private fun getInternal(deferred: Deferred<T>): T {
if (deferred.isCancelled) {
val exception = deferred.getCompletionExceptionOrNull()
if (exception is kotlinx.coroutines.CancellationException) {
throw exception
} else {
throw ExecutionException(exception)
}
} else {
return deferred.getCompleted()
}
}
override fun addListener(listener: Runnable, executor: Executor) {
innerFuture.addListener(listener, executor)
}
override fun isDone(): Boolean {
return innerFuture.isDone
}
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return innerFuture.cancel(mayInterruptIfRunning)
}
}
/**
* Holds a delegate deferred, and serves as a state machine for [Future] cancellation.
*
* [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
* cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to
* _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when
* returned.
*/
private class DeferredListenableFuture<T>(
private val deferred: Deferred<T>
) : AbstractFuture<Deferred<T>>() {
fun complete() {
set(deferred)
}
/**
* Tries to cancel the task. This is fundamentally racy.
*
* For any given call to `cancel()`, if [deferred] is already completed, the call will complete
* this Future with it, and fail to cancel. Otherwise, the
* call to `cancel()` will try to cancel this Future: if and only if cancellation of this
* succeeds, [deferred] will have its [Deferred.cancel] called.
*
* This arrangement means that [deferred] _might not successfully cancel_, if the race resolves
* in a particular way. [deferred] may also be in its "cancelling" state while this
* ListenableFuture is complete and cancelled.
*
* [OuterFuture] collaborates with this class to present a more cohesive picture and ensure
* that certain combinations of cancelled/cancelling states can't be observed.
*/
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return if (super.cancel(mayInterruptIfRunning)) {
deferred.cancel()
true
} else {
false
}
}
}