blob: d476990ef84e34ab41ef3f0a450a8b728511bad6 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov3754f952017-01-18 20:47:54 +030017package kotlinx.coroutines.experimental.future
18
19import kotlinx.coroutines.experimental.*
Roman Elizarova4b56932018-03-13 17:59:48 +030020import java.util.concurrent.*
21import java.util.function.*
22import kotlin.coroutines.experimental.*
Roman Elizarov3754f952017-01-18 20:47:54 +030023
24/**
Roman Elizarovb1708192017-12-22 12:14:05 +030025 * Starts new coroutine and returns its result as an implementation of [CompletableFuture].
Roman Elizarov3754f952017-01-18 20:47:54 +030026 * This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
27 *
Roman Elizarovd528e3e2017-01-23 15:40:05 +030028 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030029 *
30 * The [context] for the new coroutine can be explicitly specified.
31 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarov9fe5f462018-02-21 19:05:52 +030032 * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
33 * of the parent coroutine may be used,
Roman Elizarov489cac22017-05-17 12:18:28 +030034 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
Roman Elizarove8f694e2017-11-28 10:12:00 +030035 * The parent job may be also explicitly specified using [parent] parameter.
36 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030037 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
Roman Elizarov3754f952017-01-18 20:47:54 +030038 *
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030039 * By default, the coroutine is immediately scheduled for execution.
40 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
41 * A value of [CoroutineStart.LAZY] is not supported
Roman Elizarov489cac22017-05-17 12:18:28 +030042 * (since `CompletableFuture` framework does not provide the corresponding capability) and
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030043 * produces [IllegalArgumentException].
44 *
Roman Elizarov3754f952017-01-18 20:47:54 +030045 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030046 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030047 * @param context context of the coroutine. The default value is [DefaultDispatcher].
48 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
Roman Elizarove8f694e2017-11-28 10:12:00 +030049 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
Roman Elizarova4b56932018-03-13 17:59:48 +030050 * @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
Roman Elizarovc0d559b2017-09-28 14:27:05 +030051 * @param block the coroutine code.
Roman Elizarov3754f952017-01-18 20:47:54 +030052 */
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030053public fun <T> future(
Roman Elizarovc0d559b2017-09-28 14:27:05 +030054 context: CoroutineContext = DefaultDispatcher,
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030055 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030056 parent: Job? = null,
Roman Elizarova4b56932018-03-13 17:59:48 +030057 onCompletion: CompletionHandler? = null,
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030058 block: suspend CoroutineScope.() -> T
59): CompletableFuture<T> {
60 require(!start.isLazy) { "$start start is not supported" }
Roman Elizarove8f694e2017-11-28 10:12:00 +030061 val newContext = newCoroutineContext(context, parent)
Roman Elizarov3754f952017-01-18 20:47:54 +030062 val job = Job(newContext[Job])
63 val future = CompletableFutureCoroutine<T>(newContext + job)
64 job.cancelFutureOnCompletion(future)
65 future.whenComplete { _, exception -> job.cancel(exception) }
Roman Elizarova4b56932018-03-13 17:59:48 +030066 if (onCompletion != null) job.invokeOnCompletion(handler = onCompletion)
Roman Elizarov489cac22017-05-17 12:18:28 +030067 start(block, receiver=future, completion=future) // use the specified start strategy
Roman Elizarov3754f952017-01-18 20:47:54 +030068 return future
69}
70
Roman Elizarove8f694e2017-11-28 10:12:00 +030071/** @suppress **Deprecated**: Binary compatibility */
72@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
73public fun <T> future(
74 context: CoroutineContext = DefaultDispatcher,
75 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarova4b56932018-03-13 17:59:48 +030076 parent: Job? = null,
77 block: suspend CoroutineScope.() -> T
78): CompletableFuture<T> = future(context, start, parent, block = block)
79
80/** @suppress **Deprecated**: Binary compatibility */
81@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
82public fun <T> future(
83 context: CoroutineContext = DefaultDispatcher,
84 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030085 block: suspend CoroutineScope.() -> T
86): CompletableFuture<T> =
87 future(context, start, block = block)
88
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030089private class CompletableFutureCoroutine<T>(
90 override val context: CoroutineContext
91) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
Roman Elizarov43e3af72017-07-21 16:01:31 +030092 override val coroutineContext: CoroutineContext get() = context
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030093 override val isActive: Boolean get() = context[Job]!!.isActive
94 override fun resume(value: T) { complete(value) }
95 override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) }
96}
97
Roman Elizarov3754f952017-01-18 20:47:54 +030098/**
99 * Converts this deferred value to the instance of [CompletableFuture].
100 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
101 */
Roman Elizarovf7a7f7b2017-03-09 12:08:36 +0300102public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +0300103 val future = CompletableFuture<T>()
104 future.whenComplete { _, exception -> cancel(exception) }
Roman Elizarove7803472017-02-16 09:52:31 +0300105 invokeOnCompletion {
Roman Elizarov3754f952017-01-18 20:47:54 +0300106 try {
Roman Elizarovc5814542017-01-19 10:19:06 +0300107 future.complete(getCompleted())
Roman Elizarov3754f952017-01-18 20:47:54 +0300108 } catch (exception: Exception) {
109 future.completeExceptionally(exception)
110 }
111 }
112 return future
113}
114
115/**
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300116 * Converts this completion stage to an instance of [Deferred].
117 * When this completion stage is an instance of [Future], then it is cancelled when
118 * the resulting deferred is cancelled.
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100119 */
120public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100121 // Fast path if already completed
122 if (this is Future<*> && isDone()){
123 return try {
124 @Suppress("UNCHECKED_CAST")
125 CompletableDeferred(get() as T)
Roman Elizarov19c1f2e2018-03-01 22:15:58 +0300126 } catch (e: Throwable) {
127 // unwrap original cause from ExecutionException
128 val original = (e as? ExecutionException)?.cause ?: e
129 CompletableDeferred<T>().also { it.completeExceptionally(original) }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100130 }
131 }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100132 val result = CompletableDeferred<T>()
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100133 whenComplete { value, exception ->
134 if (exception == null) {
135 result.complete(value)
136 } else {
137 result.completeExceptionally(exception)
138 }
139 }
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300140 if (this is Future<*>) result.cancelFutureOnCompletion(this)
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100141 return result
142}
143
144/**
Roman Elizarovbe4cae32017-02-15 17:57:02 +0300145 * Awaits for completion of the future without blocking a thread.
146 *
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300147 * @suppress **Deprecated**: For binary compatibility only
148 */
149@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
150public suspend fun <T> CompletableFuture<T>.await(): T =
151 (this as CompletionStage<T>).await()
152
153/**
154 * Awaits for completion of the completion stage without blocking a thread.
155 *
Roman Elizarovbe4cae32017-02-15 17:57:02 +0300156 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300157 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300158 * stops waiting for the completion stage and immediately resumes with [CancellationException].
Roman Elizarov489cac22017-05-17 12:18:28 +0300159 *
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300160 * Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait
161 * a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself.
Roman Elizarov489cac22017-05-17 12:18:28 +0300162 * However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300163 * released even if the completion stage never completes.
Roman Elizarov3754f952017-01-18 20:47:54 +0300164 */
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300165public suspend fun <T> CompletionStage<T>.await(): T {
Roman Elizarov489cac22017-05-17 12:18:28 +0300166 // fast path when CompletableFuture is already done (does not suspend)
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300167 if (this is Future<*> && isDone()) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300168 try {
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +0300169 @Suppress("UNCHECKED_CAST")
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300170 return get() as T
Roman Elizarov489cac22017-05-17 12:18:28 +0300171 } catch (e: ExecutionException) {
172 throw e.cause ?: e // unwrap original cause from ExecutionException
Roman Elizarovee893442017-01-19 14:56:21 +0300173 }
Roman Elizarovee893442017-01-19 14:56:21 +0300174 }
Roman Elizarovfc6461f2017-05-16 18:48:17 +0300175 // slow path -- suspend
Roman Elizarovee893442017-01-19 14:56:21 +0300176 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300177 val consumer = ContinuationConsumer(cont)
Roman Elizarov489cac22017-05-17 12:18:28 +0300178 whenComplete(consumer)
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300179 cont.invokeOnCancellation {
Roman Elizarov489cac22017-05-17 12:18:28 +0300180 consumer.cont = null // shall clear reference to continuation
Roman Elizarov3754f952017-01-18 20:47:54 +0300181 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300182 }
Roman Elizarovee893442017-01-19 14:56:21 +0300183}
Roman Elizarov3754f952017-01-18 20:47:54 +0300184
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300185private class ContinuationConsumer<T>(
Roman Elizarov489cac22017-05-17 12:18:28 +0300186 @Volatile @JvmField var cont: Continuation<T>?
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300187) : BiConsumer<T?, Throwable?> {
Roman Elizarov63e779c2017-07-11 14:42:48 +0300188 @Suppress("UNCHECKED_CAST")
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300189 override fun accept(result: T?, exception: Throwable?) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300190 val cont = this.cont ?: return // atomically read current value unless null
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300191 if (exception == null) // the future has been completed normally
192 cont.resume(result as T)
193 else // the future has completed with an exception
194 cont.resumeWithException(exception)
195 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300196}
Roman Elizarovfc6461f2017-05-16 18:48:17 +0300197
198// --------------------------------------- DEPRECATED APIs ---------------------------------------
199// We keep it only for backwards compatibility with old versions of this integration library.
200// Do not copy when using this file an example for other integration.
201
202/**
203 * Converts this deferred value to the instance of [CompletableFuture].
204 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
205 * @suppress: **Deprecated**: Renamed to [asCompletableFuture]
206 */
207@Deprecated("Renamed to `asCompletableFuture`",
208 replaceWith = ReplaceWith("asCompletableFuture()"))
209public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300210
Roman Elizarov4eae2a82017-05-17 20:55:27 +0300211/** @suppress **Deprecated** */
Roman Elizarov489cac22017-05-17 12:18:28 +0300212@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300213@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
214public fun <T> future(
215 context: CoroutineContext = CommonPool,
216 block: suspend () -> T
217): CompletableFuture<T> = future(context=context) { block() }