blob: 3cb4cff976eb29bcf63360c26552cb8b45a89b95 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov3754f952017-01-18 20:47:54 +03005package kotlinx.coroutines.experimental.future
6
7import kotlinx.coroutines.experimental.*
Roman Elizarova4b56932018-03-13 17:59:48 +03008import java.util.concurrent.*
9import java.util.function.*
10import kotlin.coroutines.experimental.*
Roman Elizarov3754f952017-01-18 20:47:54 +030011
12/**
Roman Elizarovb1708192017-12-22 12:14:05 +030013 * Starts new coroutine and returns its result as an implementation of [CompletableFuture].
Roman Elizarov3754f952017-01-18 20:47:54 +030014 * This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
15 *
Roman Elizarovd528e3e2017-01-23 15:40:05 +030016 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030017 *
18 * The [context] for the new coroutine can be explicitly specified.
19 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarov9fe5f462018-02-21 19:05:52 +030020 * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
21 * of the parent coroutine may be used,
Roman Elizarov489cac22017-05-17 12:18:28 +030022 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
Roman Elizarove8f694e2017-11-28 10:12:00 +030023 * The parent job may be also explicitly specified using [parent] parameter.
24 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030025 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
Roman Elizarov3754f952017-01-18 20:47:54 +030026 *
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030027 * By default, the coroutine is immediately scheduled for execution.
28 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
29 * A value of [CoroutineStart.LAZY] is not supported
Roman Elizarov489cac22017-05-17 12:18:28 +030030 * (since `CompletableFuture` framework does not provide the corresponding capability) and
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030031 * produces [IllegalArgumentException].
32 *
Roman Elizarov3754f952017-01-18 20:47:54 +030033 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030034 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030035 * @param context context of the coroutine. The default value is [DefaultDispatcher].
36 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
Roman Elizarove8f694e2017-11-28 10:12:00 +030037 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
Roman Elizarova4b56932018-03-13 17:59:48 +030038 * @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
Roman Elizarovc0d559b2017-09-28 14:27:05 +030039 * @param block the coroutine code.
Roman Elizarov3754f952017-01-18 20:47:54 +030040 */
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030041public fun <T> future(
Roman Elizarovc0d559b2017-09-28 14:27:05 +030042 context: CoroutineContext = DefaultDispatcher,
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030043 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030044 parent: Job? = null,
Roman Elizarova4b56932018-03-13 17:59:48 +030045 onCompletion: CompletionHandler? = null,
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030046 block: suspend CoroutineScope.() -> T
47): CompletableFuture<T> {
48 require(!start.isLazy) { "$start start is not supported" }
Roman Elizarove8f694e2017-11-28 10:12:00 +030049 val newContext = newCoroutineContext(context, parent)
Roman Elizarov3754f952017-01-18 20:47:54 +030050 val job = Job(newContext[Job])
51 val future = CompletableFutureCoroutine<T>(newContext + job)
52 job.cancelFutureOnCompletion(future)
53 future.whenComplete { _, exception -> job.cancel(exception) }
Roman Elizarova4b56932018-03-13 17:59:48 +030054 if (onCompletion != null) job.invokeOnCompletion(handler = onCompletion)
Roman Elizarov489cac22017-05-17 12:18:28 +030055 start(block, receiver=future, completion=future) // use the specified start strategy
Roman Elizarov3754f952017-01-18 20:47:54 +030056 return future
57}
58
Roman Elizarove8f694e2017-11-28 10:12:00 +030059/** @suppress **Deprecated**: Binary compatibility */
60@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
61public fun <T> future(
62 context: CoroutineContext = DefaultDispatcher,
63 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarova4b56932018-03-13 17:59:48 +030064 parent: Job? = null,
65 block: suspend CoroutineScope.() -> T
66): CompletableFuture<T> = future(context, start, parent, block = block)
67
68/** @suppress **Deprecated**: Binary compatibility */
69@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
70public fun <T> future(
71 context: CoroutineContext = DefaultDispatcher,
72 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030073 block: suspend CoroutineScope.() -> T
74): CompletableFuture<T> =
75 future(context, start, block = block)
76
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030077private class CompletableFutureCoroutine<T>(
78 override val context: CoroutineContext
79) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
Roman Elizarov43e3af72017-07-21 16:01:31 +030080 override val coroutineContext: CoroutineContext get() = context
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030081 override val isActive: Boolean get() = context[Job]!!.isActive
82 override fun resume(value: T) { complete(value) }
83 override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) }
84}
85
Roman Elizarov3754f952017-01-18 20:47:54 +030086/**
87 * Converts this deferred value to the instance of [CompletableFuture].
88 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
89 */
Roman Elizarovf7a7f7b2017-03-09 12:08:36 +030090public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030091 val future = CompletableFuture<T>()
92 future.whenComplete { _, exception -> cancel(exception) }
Roman Elizarove7803472017-02-16 09:52:31 +030093 invokeOnCompletion {
Roman Elizarov3754f952017-01-18 20:47:54 +030094 try {
Roman Elizarovc5814542017-01-19 10:19:06 +030095 future.complete(getCompleted())
Roman Elizarov3754f952017-01-18 20:47:54 +030096 } catch (exception: Exception) {
97 future.completeExceptionally(exception)
98 }
99 }
100 return future
101}
102
103/**
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300104 * Converts this completion stage to an instance of [Deferred].
105 * When this completion stage is an instance of [Future], then it is cancelled when
106 * the resulting deferred is cancelled.
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100107 */
108public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100109 // Fast path if already completed
110 if (this is Future<*> && isDone()){
111 return try {
112 @Suppress("UNCHECKED_CAST")
113 CompletableDeferred(get() as T)
Roman Elizarov19c1f2e2018-03-01 22:15:58 +0300114 } catch (e: Throwable) {
115 // unwrap original cause from ExecutionException
116 val original = (e as? ExecutionException)?.cause ?: e
117 CompletableDeferred<T>().also { it.completeExceptionally(original) }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100118 }
119 }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100120 val result = CompletableDeferred<T>()
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100121 whenComplete { value, exception ->
122 if (exception == null) {
123 result.complete(value)
124 } else {
125 result.completeExceptionally(exception)
126 }
127 }
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300128 if (this is Future<*>) result.cancelFutureOnCompletion(this)
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100129 return result
130}
131
132/**
Roman Elizarovbe4cae32017-02-15 17:57:02 +0300133 * Awaits for completion of the future without blocking a thread.
134 *
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300135 * @suppress **Deprecated**: For binary compatibility only
136 */
137@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
138public suspend fun <T> CompletableFuture<T>.await(): T =
139 (this as CompletionStage<T>).await()
140
141/**
142 * Awaits for completion of the completion stage without blocking a thread.
143 *
Roman Elizarovbe4cae32017-02-15 17:57:02 +0300144 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300145 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300146 * stops waiting for the completion stage and immediately resumes with [CancellationException].
Roman Elizarov489cac22017-05-17 12:18:28 +0300147 *
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300148 * Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait
149 * a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself.
Roman Elizarov489cac22017-05-17 12:18:28 +0300150 * However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300151 * released even if the completion stage never completes.
Roman Elizarov3754f952017-01-18 20:47:54 +0300152 */
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300153public suspend fun <T> CompletionStage<T>.await(): T {
Roman Elizarov489cac22017-05-17 12:18:28 +0300154 // fast path when CompletableFuture is already done (does not suspend)
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300155 if (this is Future<*> && isDone()) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300156 try {
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +0300157 @Suppress("UNCHECKED_CAST")
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300158 return get() as T
Roman Elizarov489cac22017-05-17 12:18:28 +0300159 } catch (e: ExecutionException) {
160 throw e.cause ?: e // unwrap original cause from ExecutionException
Roman Elizarovee893442017-01-19 14:56:21 +0300161 }
Roman Elizarovee893442017-01-19 14:56:21 +0300162 }
Roman Elizarovfc6461f2017-05-16 18:48:17 +0300163 // slow path -- suspend
Roman Elizarovee893442017-01-19 14:56:21 +0300164 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300165 val consumer = ContinuationConsumer(cont)
Roman Elizarov489cac22017-05-17 12:18:28 +0300166 whenComplete(consumer)
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300167 cont.invokeOnCancellation {
Roman Elizarov489cac22017-05-17 12:18:28 +0300168 consumer.cont = null // shall clear reference to continuation
Roman Elizarov3754f952017-01-18 20:47:54 +0300169 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300170 }
Roman Elizarovee893442017-01-19 14:56:21 +0300171}
Roman Elizarov3754f952017-01-18 20:47:54 +0300172
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300173private class ContinuationConsumer<T>(
Roman Elizarov489cac22017-05-17 12:18:28 +0300174 @Volatile @JvmField var cont: Continuation<T>?
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300175) : BiConsumer<T?, Throwable?> {
Roman Elizarov63e779c2017-07-11 14:42:48 +0300176 @Suppress("UNCHECKED_CAST")
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300177 override fun accept(result: T?, exception: Throwable?) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300178 val cont = this.cont ?: return // atomically read current value unless null
Vsevolod Tolstopyatov1cbe8f02018-06-05 18:13:51 +0300179 if (exception == null) {
180 // the future has been completed normally
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300181 cont.resume(result as T)
Vsevolod Tolstopyatov1cbe8f02018-06-05 18:13:51 +0300182 } else {
183 // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
184 cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
185 }
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300186 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300187}
Roman Elizarovfc6461f2017-05-16 18:48:17 +0300188
189// --------------------------------------- DEPRECATED APIs ---------------------------------------
190// We keep it only for backwards compatibility with old versions of this integration library.
191// Do not copy when using this file an example for other integration.
192
193/**
194 * Converts this deferred value to the instance of [CompletableFuture].
195 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
196 * @suppress: **Deprecated**: Renamed to [asCompletableFuture]
197 */
198@Deprecated("Renamed to `asCompletableFuture`",
199 replaceWith = ReplaceWith("asCompletableFuture()"))
200public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300201
Roman Elizarov4eae2a82017-05-17 20:55:27 +0300202/** @suppress **Deprecated** */
Roman Elizarov489cac22017-05-17 12:18:28 +0300203@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300204@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
205public fun <T> future(
206 context: CoroutineContext = CommonPool,
207 block: suspend () -> T
Vsevolod Tolstopyatov1cbe8f02018-06-05 18:13:51 +0300208): CompletableFuture<T> = future(context = context) { block() }