blob: b9644251e65093f37978473c946989aaad9b8750 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental.future
2
3import kotlinx.coroutines.experimental.*
4import java.util.concurrent.CompletableFuture
5import kotlin.coroutines.Continuation
6import kotlin.coroutines.CoroutineContext
7import kotlin.coroutines.startCoroutine
8
9/**
10 * Starts new coroutine and returns its results an an implementation of [CompletableFuture].
11 * This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
12 *
13 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed
14 * The [context] for the new coroutine must include [CoroutineDispatcher] element.
15 * The specified context is added to the context of the parent running coroutine (if any) inside which this function
16 * is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
17 *
18 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
19 */
20public fun <T> future(context: CoroutineContext = CommonPool, block: suspend () -> T): CompletableFuture<T> {
21 val newContext = newCoroutineContext(CommonPool + context)
22 val job = Job(newContext[Job])
23 val future = CompletableFutureCoroutine<T>(newContext + job)
24 job.cancelFutureOnCompletion(future)
25 future.whenComplete { _, exception -> job.cancel(exception) }
26 block.startCoroutine(future)
27 return future
28}
29
30/**
31 * Converts this deferred value to the instance of [CompletableFuture].
32 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
33 */
34public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> {
35 val future = CompletableFuture<T>()
36 future.whenComplete { _, exception -> cancel(exception) }
37 onCompletion {
Roman Elizarov3754f952017-01-18 20:47:54 +030038 try {
Roman Elizarovc5814542017-01-19 10:19:06 +030039 future.complete(getCompleted())
Roman Elizarov3754f952017-01-18 20:47:54 +030040 } catch (exception: Exception) {
41 future.completeExceptionally(exception)
42 }
43 }
44 return future
45}
46
47/**
48 * Awaits for completion of the future without blocking a thread. This suspending function is cancellable.
49 * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
50 * immediately resumes with [CancellationException] .
51 */
52public suspend fun <T> CompletableFuture<T>.await(): T =
53 // quick check if already complete (avoid extra object creation)
54 if (isDone) get() else suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
55 val completionFuture = whenComplete { result, exception ->
56 if (exception == null) // the future has been completed normally
57 cont.resume(result)
58 else // the future has completed with an exception
59 cont.resumeWithException(exception)
60 }
61 cont.cancelFutureOnCompletion(completionFuture)
62 Unit
63 }
64
65private class CompletableFutureCoroutine<T>(
66 override val context: CoroutineContext
67) : CompletableFuture<T>(), Continuation<T> {
68 override fun resume(value: T) { complete(value) }
69 override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) }
70}