Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 1 | package kotlinx.coroutines.experimental.future |
| 2 | |
| 3 | import kotlinx.coroutines.experimental.* |
| 4 | import java.util.concurrent.CompletableFuture |
| 5 | import kotlin.coroutines.Continuation |
| 6 | import kotlin.coroutines.CoroutineContext |
| 7 | import kotlin.coroutines.startCoroutine |
| 8 | |
| 9 | /** |
| 10 | * Starts new coroutine and returns its results an an implementation of [CompletableFuture]. |
| 11 | * This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync]. |
| 12 | * |
| 13 | * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed |
| 14 | * The [context] for the new coroutine must include [CoroutineDispatcher] element. |
| 15 | * The specified context is added to the context of the parent running coroutine (if any) inside which this function |
| 16 | * is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any). |
| 17 | * |
| 18 | * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. |
| 19 | */ |
| 20 | public fun <T> future(context: CoroutineContext = CommonPool, block: suspend () -> T): CompletableFuture<T> { |
| 21 | val newContext = newCoroutineContext(CommonPool + context) |
| 22 | val job = Job(newContext[Job]) |
| 23 | val future = CompletableFutureCoroutine<T>(newContext + job) |
| 24 | job.cancelFutureOnCompletion(future) |
| 25 | future.whenComplete { _, exception -> job.cancel(exception) } |
| 26 | block.startCoroutine(future) |
| 27 | return future |
| 28 | } |
| 29 | |
| 30 | /** |
| 31 | * Converts this deferred value to the instance of [CompletableFuture]. |
| 32 | * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. |
| 33 | */ |
| 34 | public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> { |
| 35 | val future = CompletableFuture<T>() |
| 36 | future.whenComplete { _, exception -> cancel(exception) } |
| 37 | onCompletion { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 38 | try { |
Roman Elizarov | c581454 | 2017-01-19 10:19:06 +0300 | [diff] [blame^] | 39 | future.complete(getCompleted()) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 40 | } catch (exception: Exception) { |
| 41 | future.completeExceptionally(exception) |
| 42 | } |
| 43 | } |
| 44 | return future |
| 45 | } |
| 46 | |
| 47 | /** |
| 48 | * Awaits for completion of the future without blocking a thread. This suspending function is cancellable. |
| 49 | * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function |
| 50 | * immediately resumes with [CancellationException] . |
| 51 | */ |
| 52 | public suspend fun <T> CompletableFuture<T>.await(): T = |
| 53 | // quick check if already complete (avoid extra object creation) |
| 54 | if (isDone) get() else suspendCancellableCoroutine { cont: CancellableContinuation<T> -> |
| 55 | val completionFuture = whenComplete { result, exception -> |
| 56 | if (exception == null) // the future has been completed normally |
| 57 | cont.resume(result) |
| 58 | else // the future has completed with an exception |
| 59 | cont.resumeWithException(exception) |
| 60 | } |
| 61 | cont.cancelFutureOnCompletion(completionFuture) |
| 62 | Unit |
| 63 | } |
| 64 | |
| 65 | private class CompletableFutureCoroutine<T>( |
| 66 | override val context: CoroutineContext |
| 67 | ) : CompletableFuture<T>(), Continuation<T> { |
| 68 | override fun resume(value: T) { complete(value) } |
| 69 | override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) } |
| 70 | } |