blob: 04f6c61f5090bd22eee3bbfd3d855e199af4c057 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*
/**
* Starts new coroutine and returns its results an an implementation of [ListenableFuture].
* The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* A value of [CoroutineStart.LAZY] is not supported
* (since `ListenableFuture` framework does not provide the corresponding capability) and
* produces [IllegalArgumentException].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the coroutine code.
*/
public fun <T> CoroutineScope.future(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
val future = SettableFuture.create<T>()
val coroutine = ListenableFutureCoroutine(newContext, future)
future.addCallback(coroutine, MoreExecutors.directExecutor())
coroutine.start(start, coroutine, block)
return future
}
private class ListenableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: SettableFuture<T>
) : AbstractCoroutine<T>(context), FutureCallback<T> {
/*
* We register coroutine as callback to the future this coroutine completes.
* But when future is cancelled externally, we'd like to cancel coroutine,
* so we register on failure handler for this purpose
*/
override fun onSuccess(result: T?) {
// Do nothing
}
override fun onFailure(t: Throwable) {
if (t is CancellationException) {
cancel()
}
}
override fun onCompleted(value: T) {
completion.set(value)
}
override fun onCompletedExceptionally(exception: Throwable) {
completion.setException(exception)
}
}
/**
* Converts this deferred value to the instance of [ListenableFuture].
* The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
*/
public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> = DeferredListenableFuture<T>(this)
private class DeferredListenableFuture<T>(
private val deferred: Deferred<T>
) : AbstractFuture<T>() {
init {
deferred.invokeOnCompletion {
try {
set(deferred.getCompleted())
} catch (t: Throwable) {
setException(t)
}
}
}
override fun interruptTask() { deferred.cancel() }
}
/**
* Converts this listenable future to an instance of [Deferred].
* It is cancelled when the resulting deferred is cancelled.
*/
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
// Fast path if already completed
if (isDone) {
return try {
@Suppress("UNCHECKED_CAST")
CompletableDeferred(get() as T)
} catch (e: Throwable) {
// unwrap original cause from ExecutionException
val original = (e as? ExecutionException)?.cause ?: e
CompletableDeferred<T>().also { it.completeExceptionally(original) }
}
}
val deferred = CompletableDeferred<T>()
Futures.addCallback(this, object : FutureCallback<T> {
override fun onSuccess(result: T?) {
deferred.complete(result!!)
}
override fun onFailure(t: Throwable) {
deferred.completeExceptionally(t)
}
}, MoreExecutors.directExecutor())
deferred.invokeOnCompletion { cancel(false) }
return deferred
}
/**
* Awaits for completion of the future without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
*
* This method is intended to be used with one-shot futures, so on coroutine cancellation future is cancelled as well.
* If cancelling given future is undesired, `future.asDeferred().await()` should be used instead.
*/
public suspend fun <T> ListenableFuture<T>.await(): T {
try {
if (isDone) return get() as T
} catch (e: ExecutionException) {
throw e.cause ?: e // unwrap original cause from ExecutionException
}
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
val callback = ContinuationCallback(cont)
Futures.addCallback(this, callback, MoreExecutors.directExecutor())
cont.invokeOnCancellation {
cancel(false)
callback.cont = null // clear the reference to continuation from the future's callback
}
}
}
private class ContinuationCallback<T>(
@Volatile @JvmField var cont: Continuation<T>?
) : FutureCallback<T> {
@Suppress("UNCHECKED_CAST")
override fun onSuccess(result: T?) { cont?.resume(result as T) }
override fun onFailure(t: Throwable) { cont?.resumeWithException(t) }
}