Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2017 JetBrains s.r.o. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 17 | package kotlinx.coroutines.experimental.future |
| 18 | |
| 19 | import kotlinx.coroutines.experimental.* |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 20 | import java.util.concurrent.* |
| 21 | import java.util.function.* |
| 22 | import kotlin.coroutines.experimental.* |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 23 | |
| 24 | /** |
Roman Elizarov | b170819 | 2017-12-22 12:14:05 +0300 | [diff] [blame] | 25 | * Starts new coroutine and returns its result as an implementation of [CompletableFuture]. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 26 | * This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync]. |
| 27 | * |
Roman Elizarov | d528e3e | 2017-01-23 15:40:05 +0300 | [diff] [blame] | 28 | * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed. |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 29 | * |
| 30 | * The [context] for the new coroutine can be explicitly specified. |
| 31 | * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`. |
Roman Elizarov | 9fe5f46 | 2018-02-21 19:05:52 +0300 | [diff] [blame] | 32 | * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html) |
| 33 | * of the parent coroutine may be used, |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 34 | * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine. |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 35 | * The parent job may be also explicitly specified using [parent] parameter. |
| 36 | * |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 37 | * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 38 | * |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 39 | * By default, the coroutine is immediately scheduled for execution. |
| 40 | * Other options can be specified via `start` parameter. See [CoroutineStart] for details. |
| 41 | * A value of [CoroutineStart.LAZY] is not supported |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 42 | * (since `CompletableFuture` framework does not provide the corresponding capability) and |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 43 | * produces [IllegalArgumentException]. |
| 44 | * |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 45 | * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 46 | * |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 47 | * @param context context of the coroutine. The default value is [DefaultDispatcher]. |
| 48 | * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 49 | * @param parent explicitly specifies the parent job, overrides job from the [context] (if any). |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 50 | * @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]). |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 51 | * @param block the coroutine code. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 52 | */ |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 53 | public fun <T> future( |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 54 | context: CoroutineContext = DefaultDispatcher, |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 55 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 56 | parent: Job? = null, |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 57 | onCompletion: CompletionHandler? = null, |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 58 | block: suspend CoroutineScope.() -> T |
| 59 | ): CompletableFuture<T> { |
| 60 | require(!start.isLazy) { "$start start is not supported" } |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 61 | val newContext = newCoroutineContext(context, parent) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 62 | val job = Job(newContext[Job]) |
| 63 | val future = CompletableFutureCoroutine<T>(newContext + job) |
| 64 | job.cancelFutureOnCompletion(future) |
| 65 | future.whenComplete { _, exception -> job.cancel(exception) } |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 66 | if (onCompletion != null) job.invokeOnCompletion(handler = onCompletion) |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 67 | start(block, receiver=future, completion=future) // use the specified start strategy |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 68 | return future |
| 69 | } |
| 70 | |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 71 | /** @suppress **Deprecated**: Binary compatibility */ |
| 72 | @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) |
| 73 | public fun <T> future( |
| 74 | context: CoroutineContext = DefaultDispatcher, |
| 75 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 76 | parent: Job? = null, |
| 77 | block: suspend CoroutineScope.() -> T |
| 78 | ): CompletableFuture<T> = future(context, start, parent, block = block) |
| 79 | |
| 80 | /** @suppress **Deprecated**: Binary compatibility */ |
| 81 | @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) |
| 82 | public fun <T> future( |
| 83 | context: CoroutineContext = DefaultDispatcher, |
| 84 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 85 | block: suspend CoroutineScope.() -> T |
| 86 | ): CompletableFuture<T> = |
| 87 | future(context, start, block = block) |
| 88 | |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 89 | private class CompletableFutureCoroutine<T>( |
| 90 | override val context: CoroutineContext |
| 91 | ) : CompletableFuture<T>(), Continuation<T>, CoroutineScope { |
Roman Elizarov | 43e3af7 | 2017-07-21 16:01:31 +0300 | [diff] [blame] | 92 | override val coroutineContext: CoroutineContext get() = context |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 93 | override val isActive: Boolean get() = context[Job]!!.isActive |
| 94 | override fun resume(value: T) { complete(value) } |
| 95 | override fun resumeWithException(exception: Throwable) { completeExceptionally(exception) } |
| 96 | } |
| 97 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 98 | /** |
| 99 | * Converts this deferred value to the instance of [CompletableFuture]. |
| 100 | * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. |
| 101 | */ |
Roman Elizarov | f7a7f7b | 2017-03-09 12:08:36 +0300 | [diff] [blame] | 102 | public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 103 | val future = CompletableFuture<T>() |
| 104 | future.whenComplete { _, exception -> cancel(exception) } |
Roman Elizarov | e780347 | 2017-02-16 09:52:31 +0300 | [diff] [blame] | 105 | invokeOnCompletion { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 106 | try { |
Roman Elizarov | c581454 | 2017-01-19 10:19:06 +0300 | [diff] [blame] | 107 | future.complete(getCompleted()) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 108 | } catch (exception: Exception) { |
| 109 | future.completeExceptionally(exception) |
| 110 | } |
| 111 | } |
| 112 | return future |
| 113 | } |
| 114 | |
| 115 | /** |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 116 | * Converts this completion stage to an instance of [Deferred]. |
| 117 | * When this completion stage is an instance of [Future], then it is cancelled when |
| 118 | * the resulting deferred is cancelled. |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 119 | */ |
| 120 | public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> { |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 121 | // Fast path if already completed |
| 122 | if (this is Future<*> && isDone()){ |
| 123 | return try { |
| 124 | @Suppress("UNCHECKED_CAST") |
| 125 | CompletableDeferred(get() as T) |
Roman Elizarov | 19c1f2e | 2018-03-01 22:15:58 +0300 | [diff] [blame] | 126 | } catch (e: Throwable) { |
| 127 | // unwrap original cause from ExecutionException |
| 128 | val original = (e as? ExecutionException)?.cause ?: e |
| 129 | CompletableDeferred<T>().also { it.completeExceptionally(original) } |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 130 | } |
| 131 | } |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 132 | val result = CompletableDeferred<T>() |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 133 | whenComplete { value, exception -> |
| 134 | if (exception == null) { |
| 135 | result.complete(value) |
| 136 | } else { |
| 137 | result.completeExceptionally(exception) |
| 138 | } |
| 139 | } |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 140 | if (this is Future<*>) result.cancelFutureOnCompletion(this) |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 141 | return result |
| 142 | } |
| 143 | |
| 144 | /** |
Roman Elizarov | be4cae3 | 2017-02-15 17:57:02 +0300 | [diff] [blame] | 145 | * Awaits for completion of the future without blocking a thread. |
| 146 | * |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 147 | * @suppress **Deprecated**: For binary compatibility only |
| 148 | */ |
| 149 | @Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN) |
| 150 | public suspend fun <T> CompletableFuture<T>.await(): T = |
| 151 | (this as CompletionStage<T>).await() |
| 152 | |
| 153 | /** |
| 154 | * Awaits for completion of the completion stage without blocking a thread. |
| 155 | * |
Roman Elizarov | be4cae3 | 2017-02-15 17:57:02 +0300 | [diff] [blame] | 156 | * This suspending function is cancellable. |
Roman Elizarov | d82b3a9 | 2017-06-23 21:52:08 +0300 | [diff] [blame] | 157 | * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 158 | * stops waiting for the completion stage and immediately resumes with [CancellationException]. |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 159 | * |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 160 | * Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait |
| 161 | * a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself. |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 162 | * However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 163 | * released even if the completion stage never completes. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 164 | */ |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 165 | public suspend fun <T> CompletionStage<T>.await(): T { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 166 | // fast path when CompletableFuture is already done (does not suspend) |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 167 | if (this is Future<*> && isDone()) { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 168 | try { |
Vsevolod Tolstopyatov | 87f2faa | 2018-04-30 22:53:02 +0300 | [diff] [blame] | 169 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 170 | return get() as T |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 171 | } catch (e: ExecutionException) { |
| 172 | throw e.cause ?: e // unwrap original cause from ExecutionException |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 173 | } |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 174 | } |
Roman Elizarov | fc6461f | 2017-05-16 18:48:17 +0300 | [diff] [blame] | 175 | // slow path -- suspend |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 176 | return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 177 | val consumer = ContinuationConsumer(cont) |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 178 | whenComplete(consumer) |
Vsevolod Tolstopyatov | 80a2947 | 2018-04-17 16:02:02 +0300 | [diff] [blame] | 179 | cont.invokeOnCancellation { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 180 | consumer.cont = null // shall clear reference to continuation |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 181 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 182 | } |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 183 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 184 | |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 185 | private class ContinuationConsumer<T>( |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 186 | @Volatile @JvmField var cont: Continuation<T>? |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 187 | ) : BiConsumer<T?, Throwable?> { |
Roman Elizarov | 63e779c | 2017-07-11 14:42:48 +0300 | [diff] [blame] | 188 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 189 | override fun accept(result: T?, exception: Throwable?) { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 190 | val cont = this.cont ?: return // atomically read current value unless null |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 191 | if (exception == null) // the future has been completed normally |
| 192 | cont.resume(result as T) |
| 193 | else // the future has completed with an exception |
| 194 | cont.resumeWithException(exception) |
| 195 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 196 | } |
Roman Elizarov | fc6461f | 2017-05-16 18:48:17 +0300 | [diff] [blame] | 197 | |
| 198 | // --------------------------------------- DEPRECATED APIs --------------------------------------- |
| 199 | // We keep it only for backwards compatibility with old versions of this integration library. |
| 200 | // Do not copy when using this file an example for other integration. |
| 201 | |
| 202 | /** |
| 203 | * Converts this deferred value to the instance of [CompletableFuture]. |
| 204 | * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. |
| 205 | * @suppress: **Deprecated**: Renamed to [asCompletableFuture] |
| 206 | */ |
| 207 | @Deprecated("Renamed to `asCompletableFuture`", |
| 208 | replaceWith = ReplaceWith("asCompletableFuture()")) |
| 209 | public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture() |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 210 | |
Roman Elizarov | 4eae2a8 | 2017-05-17 20:55:27 +0300 | [diff] [blame] | 211 | /** @suppress **Deprecated** */ |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 212 | @Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917 |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 213 | @Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN) |
| 214 | public fun <T> future( |
| 215 | context: CoroutineContext = CommonPool, |
| 216 | block: suspend () -> T |
| 217 | ): CompletableFuture<T> = future(context=context) { block() } |