blob: 6c59a08c88c9315480d2d9ad3c5647adbfc7b708 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.future
6
7import kotlinx.coroutines.*
Roman Elizarova4b56932018-03-13 17:59:48 +03008import java.util.concurrent.*
9import java.util.function.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030010import kotlin.coroutines.*
Roman Elizarov3754f952017-01-18 20:47:54 +030011
12/**
Roman Elizarovb1708192017-12-22 12:14:05 +030013 * Starts new coroutine and returns its result as an implementation of [CompletableFuture].
Roman Elizarovd528e3e2017-01-23 15:40:05 +030014 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030015 *
Roman Elizarov592de522018-09-10 20:06:38 +030016 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
Roman Elizarovdc29b072018-09-11 18:42:03 +030017 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
Roman Elizarov592de522018-09-10 20:06:38 +030018 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
19 * with corresponding [coroutineContext] element.
Roman Elizarov3754f952017-01-18 20:47:54 +030020 *
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030021 * By default, the coroutine is immediately scheduled for execution.
22 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
23 * A value of [CoroutineStart.LAZY] is not supported
Roman Elizarov489cac22017-05-17 12:18:28 +030024 * (since `CompletableFuture` framework does not provide the corresponding capability) and
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030025 * produces [IllegalArgumentException].
26 *
Roman Elizarov592de522018-09-10 20:06:38 +030027 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030028 *
Roman Elizarovdc29b072018-09-11 18:42:03 +030029 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030030 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
31 * @param block the coroutine code.
Roman Elizarov3754f952017-01-18 20:47:54 +030032 */
Roman Elizarov592de522018-09-10 20:06:38 +030033public fun <T> CoroutineScope.future(
Roman Elizarov27b8f452018-09-20 21:23:41 +030034 context: CoroutineContext = EmptyCoroutineContext,
35 start: CoroutineStart = CoroutineStart.DEFAULT,
36 block: suspend CoroutineScope.() -> T
37) : CompletableFuture<T> {
38 require(!start.isLazy) { "$start start is not supported" }
39 val newContext = this.newCoroutineContext(context)
Vsevolod Tolstopyatov2bac00f2018-10-25 14:06:33 +030040 val future = CompletableFuture<T>()
41 val coroutine = CompletableFutureCoroutine(newContext, future)
42 future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
43 coroutine.start(start, coroutine, block)
Roman Elizarov27b8f452018-09-20 21:23:41 +030044 return future
45}
46
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030047private class CompletableFutureCoroutine<T>(
Vsevolod Tolstopyatov2bac00f2018-10-25 14:06:33 +030048 context: CoroutineContext,
49 private val completion: CompletableFuture<T>
50) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
51
52 override fun accept(value: T?, exception: Throwable?) {
53 cancel()
54 }
55
56 override fun onCompleted(value: T) {
57 completion.complete(value)
58 }
59
60 override fun onCompletedExceptionally(exception: Throwable) {
Vsevolod Tolstopyatov56393042018-10-25 14:15:48 +030061 if (!completion.completeExceptionally(exception)) {
62 handleCoroutineException(parentContext, exception, this)
63 }
Roman Elizarov0950dfa2018-07-13 10:33:25 +030064 }
Roman Elizarov3fa4bec2017-05-16 21:05:43 +030065}
66
Roman Elizarov3754f952017-01-18 20:47:54 +030067/**
68 * Converts this deferred value to the instance of [CompletableFuture].
69 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
70 */
Roman Elizarovf7a7f7b2017-03-09 12:08:36 +030071public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030072 val future = CompletableFuture<T>()
73 future.whenComplete { _, exception -> cancel(exception) }
Roman Elizarove7803472017-02-16 09:52:31 +030074 invokeOnCompletion {
Roman Elizarov3754f952017-01-18 20:47:54 +030075 try {
Roman Elizarovc5814542017-01-19 10:19:06 +030076 future.complete(getCompleted())
Vsevolod Tolstopyatova334c4e2018-08-06 11:04:24 +030077 } catch (t: Throwable) {
78 future.completeExceptionally(t)
Roman Elizarov3754f952017-01-18 20:47:54 +030079 }
80 }
81 return future
82}
83
84/**
Roman Elizaroveb4f9be2018-03-01 22:41:08 +030085 * Converts this completion stage to an instance of [Deferred].
86 * When this completion stage is an instance of [Future], then it is cancelled when
87 * the resulting deferred is cancelled.
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +010088 */
89public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +010090 // Fast path if already completed
91 if (this is Future<*> && isDone()){
92 return try {
93 @Suppress("UNCHECKED_CAST")
94 CompletableDeferred(get() as T)
Roman Elizarov19c1f2e2018-03-01 22:15:58 +030095 } catch (e: Throwable) {
96 // unwrap original cause from ExecutionException
97 val original = (e as? ExecutionException)?.cause ?: e
Ohad Shai8f1c7282018-12-27 10:58:47 +020098 CompletableDeferred<T>().also { it.completeExceptionally(original) }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +010099 }
100 }
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100101 val result = CompletableDeferred<T>()
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100102 whenComplete { value, exception ->
103 if (exception == null) {
104 result.complete(value)
105 } else {
Ohad Shai8f1c7282018-12-27 10:58:47 +0200106 result.completeExceptionally(exception)
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100107 }
108 }
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300109 if (this is Future<*>) result.cancelFutureOnCompletion(this)
Jonathan Cornaz7bd2c502018-02-28 15:59:21 +0100110 return result
111}
112
113/**
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300114 * Awaits for completion of the completion stage without blocking a thread.
115 *
Roman Elizarovbe4cae32017-02-15 17:57:02 +0300116 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300117 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300118 * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
Vsevolod Tolstopyatov8d6c1a92018-09-25 13:50:30 +0300119 * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
120 * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
Roman Elizarov3754f952017-01-18 20:47:54 +0300121 */
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300122public suspend fun <T> CompletionStage<T>.await(): T {
Roman Elizarov489cac22017-05-17 12:18:28 +0300123 // fast path when CompletableFuture is already done (does not suspend)
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300124 if (this is Future<*> && isDone()) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300125 try {
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +0300126 @Suppress("UNCHECKED_CAST")
Roman Elizaroveb4f9be2018-03-01 22:41:08 +0300127 return get() as T
Roman Elizarov489cac22017-05-17 12:18:28 +0300128 } catch (e: ExecutionException) {
129 throw e.cause ?: e // unwrap original cause from ExecutionException
Roman Elizarovee893442017-01-19 14:56:21 +0300130 }
Roman Elizarovee893442017-01-19 14:56:21 +0300131 }
Roman Elizarovfc6461f2017-05-16 18:48:17 +0300132 // slow path -- suspend
Roman Elizarovee893442017-01-19 14:56:21 +0300133 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300134 val consumer = ContinuationConsumer(cont)
Roman Elizarov489cac22017-05-17 12:18:28 +0300135 whenComplete(consumer)
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300136 cont.invokeOnCancellation {
Vsevolod Tolstopyatov8d6c1a92018-09-25 13:50:30 +0300137 // mayInterruptIfRunning is not used
138 (this as? CompletableFuture<T>)?.cancel(false)
139 consumer.cont = null // shall clear reference to continuation to aid GC
Roman Elizarov3754f952017-01-18 20:47:54 +0300140 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300141 }
Roman Elizarovee893442017-01-19 14:56:21 +0300142}
Roman Elizarov3754f952017-01-18 20:47:54 +0300143
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300144private class ContinuationConsumer<T>(
Roman Elizarov489cac22017-05-17 12:18:28 +0300145 @Volatile @JvmField var cont: Continuation<T>?
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300146) : BiConsumer<T?, Throwable?> {
Roman Elizarov63e779c2017-07-11 14:42:48 +0300147 @Suppress("UNCHECKED_CAST")
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300148 override fun accept(result: T?, exception: Throwable?) {
Roman Elizarov489cac22017-05-17 12:18:28 +0300149 val cont = this.cont ?: return // atomically read current value unless null
Vsevolod Tolstopyatov1cbe8f02018-06-05 18:13:51 +0300150 if (exception == null) {
151 // the future has been completed normally
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300152 cont.resume(result as T)
Vsevolod Tolstopyatov1cbe8f02018-06-05 18:13:51 +0300153 } else {
154 // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
155 cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
156 }
Roman Elizarov3fa4bec2017-05-16 21:05:43 +0300157 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300158}