Fix await/asDeferred for MinimalState implementations (#2457)
Fixes #2456
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index f7fdba5..bb5f426 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -105,16 +105,19 @@
}
/**
- * Converts this completion stage to an instance of [Deferred].
- * When this completion stage is an instance of [Future], then it is cancelled when
- * the resulting deferred is cancelled.
+ * Converts this [CompletionStage] to an instance of [Deferred].
+ *
+ * The [CompletableFuture] that corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
+ * is cancelled when the resulting deferred is cancelled.
*/
+@Suppress("DeferredIsResult")
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
+ val future = toCompletableFuture() // retrieve the future
// Fast path if already completed
- if (this is Future<*> && isDone()){
+ if (future.isDone) {
return try {
@Suppress("UNCHECKED_CAST")
- CompletableDeferred(get() as T)
+ CompletableDeferred(future.get() as T)
} catch (e: Throwable) {
// unwrap original cause from ExecutionException
val original = (e as? ExecutionException)?.cause ?: e
@@ -132,25 +135,28 @@
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
}
}
- if (this is Future<*>) result.cancelFutureOnCompletion(this)
+ result.cancelFutureOnCompletion(future)
return result
}
/**
- * Awaits for completion of the completion stage without blocking a thread.
+ * Awaits for completion of [CompletionStage] without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
- * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
- * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
+ *
+ * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
+ * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
+ * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
*/
public suspend fun <T> CompletionStage<T>.await(): T {
+ val future = toCompletableFuture() // retrieve the future
// fast path when CompletableFuture is already done (does not suspend)
- if (this is Future<*> && isDone()) {
+ if (future.isDone) {
try {
- @Suppress("UNCHECKED_CAST")
- return get() as T
+ @Suppress("UNCHECKED_CAST", "BlockingMethodInNonBlockingContext")
+ return future.get() as T
} catch (e: ExecutionException) {
throw e.cause ?: e // unwrap original cause from ExecutionException
}
@@ -160,8 +166,7 @@
val consumer = ContinuationConsumer(cont)
whenComplete(consumer)
cont.invokeOnCancellation {
- // mayInterruptIfRunning is not used
- (this as? CompletableFuture<T>)?.cancel(false)
+ future.cancel(false)
consumer.cont = null // shall clear reference to continuation to aid GC
}
}
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index f75c967..998aaa0 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -490,4 +490,81 @@
}
}
}
+
+ /**
+ * https://github.com/Kotlin/kotlinx.coroutines/issues/2456
+ */
+ @Test
+ fun testCompletedStageAwait() = runTest {
+ val stage = CompletableFuture.completedStage("OK")
+ assertEquals("OK", stage.await())
+ }
+
+ /**
+ * https://github.com/Kotlin/kotlinx.coroutines/issues/2456
+ */
+ @Test
+ fun testCompletedStageAsDeferredAwait() = runTest {
+ val stage = CompletableFuture.completedStage("OK")
+ val deferred = stage.asDeferred()
+ assertEquals("OK", deferred.await())
+ }
+
+ @Test
+ fun testCompletedStateThenApplyAwait() = runTest {
+ expect(1)
+ val cf = CompletableFuture<String>()
+ launch {
+ expect(3)
+ cf.complete("O")
+ }
+ expect(2)
+ val stage = cf.thenApply { it + "K" }
+ assertEquals("OK", stage.await())
+ finish(4)
+ }
+
+ @Test
+ fun testCompletedStateThenApplyAwaitCancel() = runTest {
+ expect(1)
+ val cf = CompletableFuture<String>()
+ launch {
+ expect(3)
+ cf.cancel(false)
+ }
+ expect(2)
+ val stage = cf.thenApply { it + "K" }
+ assertFailsWith<CancellationException> { stage.await() }
+ finish(4)
+ }
+
+ @Test
+ fun testCompletedStateThenApplyAsDeferredAwait() = runTest {
+ expect(1)
+ val cf = CompletableFuture<String>()
+ launch {
+ expect(3)
+ cf.complete("O")
+ }
+ expect(2)
+ val stage = cf.thenApply { it + "K" }
+ val deferred = stage.asDeferred()
+ assertEquals("OK", deferred.await())
+ finish(4)
+ }
+
+ @Test
+ fun testCompletedStateThenApplyAsDeferredAwaitCancel() = runTest {
+ expect(1)
+ val cf = CompletableFuture<String>()
+ expect(2)
+ val stage = cf.thenApply { it + "K" }
+ val deferred = stage.asDeferred()
+ launch {
+ expect(3)
+ deferred.cancel() // cancel the deferred!
+ }
+ assertFailsWith<CancellationException> { stage.await() }
+ finish(4)
+ }
}