Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame] | 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.future |
| 6 | |
| 7 | import kotlinx.coroutines.* |
Roman Elizarov | a4b5693 | 2018-03-13 17:59:48 +0300 | [diff] [blame] | 8 | import java.util.concurrent.* |
| 9 | import java.util.function.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 10 | import kotlin.coroutines.* |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 11 | |
| 12 | /** |
Roman Elizarov | b170819 | 2017-12-22 12:14:05 +0300 | [diff] [blame] | 13 | * Starts new coroutine and returns its result as an implementation of [CompletableFuture]. |
Roman Elizarov | d528e3e | 2017-01-23 15:40:05 +0300 | [diff] [blame] | 14 | * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed. |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 15 | * |
Roman Elizarov | 592de52 | 2018-09-10 20:06:38 +0300 | [diff] [blame] | 16 | * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. |
Roman Elizarov | dc29b07 | 2018-09-11 18:42:03 +0300 | [diff] [blame] | 17 | * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. |
Roman Elizarov | 592de52 | 2018-09-10 20:06:38 +0300 | [diff] [blame] | 18 | * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden |
| 19 | * with corresponding [coroutineContext] element. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 20 | * |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 21 | * By default, the coroutine is immediately scheduled for execution. |
| 22 | * Other options can be specified via `start` parameter. See [CoroutineStart] for details. |
| 23 | * A value of [CoroutineStart.LAZY] is not supported |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 24 | * (since `CompletableFuture` framework does not provide the corresponding capability) and |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 25 | * produces [IllegalArgumentException]. |
| 26 | * |
Roman Elizarov | 592de52 | 2018-09-10 20:06:38 +0300 | [diff] [blame] | 27 | * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 28 | * |
Roman Elizarov | dc29b07 | 2018-09-11 18:42:03 +0300 | [diff] [blame] | 29 | * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 30 | * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. |
| 31 | * @param block the coroutine code. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 32 | */ |
Roman Elizarov | 592de52 | 2018-09-10 20:06:38 +0300 | [diff] [blame] | 33 | public fun <T> CoroutineScope.future( |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 34 | context: CoroutineContext = EmptyCoroutineContext, |
| 35 | start: CoroutineStart = CoroutineStart.DEFAULT, |
| 36 | block: suspend CoroutineScope.() -> T |
| 37 | ) : CompletableFuture<T> { |
| 38 | require(!start.isLazy) { "$start start is not supported" } |
| 39 | val newContext = this.newCoroutineContext(context) |
Vsevolod Tolstopyatov | 2bac00f | 2018-10-25 14:06:33 +0300 | [diff] [blame] | 40 | val future = CompletableFuture<T>() |
| 41 | val coroutine = CompletableFutureCoroutine(newContext, future) |
| 42 | future.whenComplete(coroutine) // Cancel coroutine if future was completed externally |
| 43 | coroutine.start(start, coroutine, block) |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 44 | return future |
| 45 | } |
| 46 | |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 47 | private class CompletableFutureCoroutine<T>( |
Vsevolod Tolstopyatov | 2bac00f | 2018-10-25 14:06:33 +0300 | [diff] [blame] | 48 | context: CoroutineContext, |
| 49 | private val completion: CompletableFuture<T> |
| 50 | ) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> { |
| 51 | |
| 52 | override fun accept(value: T?, exception: Throwable?) { |
| 53 | cancel() |
| 54 | } |
| 55 | |
| 56 | override fun onCompleted(value: T) { |
| 57 | completion.complete(value) |
| 58 | } |
| 59 | |
| 60 | override fun onCompletedExceptionally(exception: Throwable) { |
Vsevolod Tolstopyatov | 5639304 | 2018-10-25 14:15:48 +0300 | [diff] [blame] | 61 | if (!completion.completeExceptionally(exception)) { |
| 62 | handleCoroutineException(parentContext, exception, this) |
| 63 | } |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 64 | } |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 65 | } |
| 66 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 67 | /** |
| 68 | * Converts this deferred value to the instance of [CompletableFuture]. |
| 69 | * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. |
| 70 | */ |
Roman Elizarov | f7a7f7b | 2017-03-09 12:08:36 +0300 | [diff] [blame] | 71 | public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 72 | val future = CompletableFuture<T>() |
| 73 | future.whenComplete { _, exception -> cancel(exception) } |
Roman Elizarov | e780347 | 2017-02-16 09:52:31 +0300 | [diff] [blame] | 74 | invokeOnCompletion { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 75 | try { |
Roman Elizarov | c581454 | 2017-01-19 10:19:06 +0300 | [diff] [blame] | 76 | future.complete(getCompleted()) |
Vsevolod Tolstopyatov | a334c4e | 2018-08-06 11:04:24 +0300 | [diff] [blame] | 77 | } catch (t: Throwable) { |
| 78 | future.completeExceptionally(t) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 79 | } |
| 80 | } |
| 81 | return future |
| 82 | } |
| 83 | |
| 84 | /** |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 85 | * Converts this completion stage to an instance of [Deferred]. |
| 86 | * When this completion stage is an instance of [Future], then it is cancelled when |
| 87 | * the resulting deferred is cancelled. |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 88 | */ |
| 89 | public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> { |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 90 | // Fast path if already completed |
| 91 | if (this is Future<*> && isDone()){ |
| 92 | return try { |
| 93 | @Suppress("UNCHECKED_CAST") |
| 94 | CompletableDeferred(get() as T) |
Roman Elizarov | 19c1f2e | 2018-03-01 22:15:58 +0300 | [diff] [blame] | 95 | } catch (e: Throwable) { |
| 96 | // unwrap original cause from ExecutionException |
| 97 | val original = (e as? ExecutionException)?.cause ?: e |
Ohad Shai | 8f1c728 | 2018-12-27 10:58:47 +0200 | [diff] [blame^] | 98 | CompletableDeferred<T>().also { it.completeExceptionally(original) } |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 99 | } |
| 100 | } |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 101 | val result = CompletableDeferred<T>() |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 102 | whenComplete { value, exception -> |
| 103 | if (exception == null) { |
| 104 | result.complete(value) |
| 105 | } else { |
Ohad Shai | 8f1c728 | 2018-12-27 10:58:47 +0200 | [diff] [blame^] | 106 | result.completeExceptionally(exception) |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 107 | } |
| 108 | } |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 109 | if (this is Future<*>) result.cancelFutureOnCompletion(this) |
Jonathan Cornaz | 7bd2c50 | 2018-02-28 15:59:21 +0100 | [diff] [blame] | 110 | return result |
| 111 | } |
| 112 | |
| 113 | /** |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 114 | * Awaits for completion of the completion stage without blocking a thread. |
| 115 | * |
Roman Elizarov | be4cae3 | 2017-02-15 17:57:02 +0300 | [diff] [blame] | 116 | * This suspending function is cancellable. |
Roman Elizarov | d82b3a9 | 2017-06-23 21:52:08 +0300 | [diff] [blame] | 117 | * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 118 | * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. |
Vsevolod Tolstopyatov | 8d6c1a9 | 2018-09-25 13:50:30 +0300 | [diff] [blame] | 119 | * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture]. |
| 120 | * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 121 | */ |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 122 | public suspend fun <T> CompletionStage<T>.await(): T { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 123 | // fast path when CompletableFuture is already done (does not suspend) |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 124 | if (this is Future<*> && isDone()) { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 125 | try { |
Vsevolod Tolstopyatov | 87f2faa | 2018-04-30 22:53:02 +0300 | [diff] [blame] | 126 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | eb4f9be | 2018-03-01 22:41:08 +0300 | [diff] [blame] | 127 | return get() as T |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 128 | } catch (e: ExecutionException) { |
| 129 | throw e.cause ?: e // unwrap original cause from ExecutionException |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 130 | } |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 131 | } |
Roman Elizarov | fc6461f | 2017-05-16 18:48:17 +0300 | [diff] [blame] | 132 | // slow path -- suspend |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 133 | return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 134 | val consumer = ContinuationConsumer(cont) |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 135 | whenComplete(consumer) |
Vsevolod Tolstopyatov | 80a2947 | 2018-04-17 16:02:02 +0300 | [diff] [blame] | 136 | cont.invokeOnCancellation { |
Vsevolod Tolstopyatov | 8d6c1a9 | 2018-09-25 13:50:30 +0300 | [diff] [blame] | 137 | // mayInterruptIfRunning is not used |
| 138 | (this as? CompletableFuture<T>)?.cancel(false) |
| 139 | consumer.cont = null // shall clear reference to continuation to aid GC |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 140 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 141 | } |
Roman Elizarov | ee89344 | 2017-01-19 14:56:21 +0300 | [diff] [blame] | 142 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 143 | |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 144 | private class ContinuationConsumer<T>( |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 145 | @Volatile @JvmField var cont: Continuation<T>? |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 146 | ) : BiConsumer<T?, Throwable?> { |
Roman Elizarov | 63e779c | 2017-07-11 14:42:48 +0300 | [diff] [blame] | 147 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 148 | override fun accept(result: T?, exception: Throwable?) { |
Roman Elizarov | 489cac2 | 2017-05-17 12:18:28 +0300 | [diff] [blame] | 149 | val cont = this.cont ?: return // atomically read current value unless null |
Vsevolod Tolstopyatov | 1cbe8f0 | 2018-06-05 18:13:51 +0300 | [diff] [blame] | 150 | if (exception == null) { |
| 151 | // the future has been completed normally |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 152 | cont.resume(result as T) |
Vsevolod Tolstopyatov | 1cbe8f0 | 2018-06-05 18:13:51 +0300 | [diff] [blame] | 153 | } else { |
| 154 | // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception |
| 155 | cont.resumeWithException((exception as? CompletionException)?.cause ?: exception) |
| 156 | } |
Roman Elizarov | 3fa4bec | 2017-05-16 21:05:43 +0300 | [diff] [blame] | 157 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 158 | } |