Re-implement jdk8 and Guava future builders on top o AbstractCoroutine in order to properly handle parallel decomposition
* Get rid of cancel(cause)
* Improve test coverage for cancellation and exception handling
Fixes #751
diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
index ceb6932..04f6c61 100644
--- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
+++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
@@ -7,6 +7,7 @@
import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
import java.util.concurrent.*
+import java.util.concurrent.CancellationException
import kotlin.coroutines.*
/**
@@ -37,23 +38,40 @@
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
- val job = Job(newContext[Job])
- val future = ListenableFutureCoroutine<T>(newContext + job)
- job.cancelFutureOnCompletion(future)
- start(block, receiver = future, completion = future) // use the specified start strategy
+ val future = SettableFuture.create<T>()
+ val coroutine = ListenableFutureCoroutine(newContext, future)
+ future.addCallback(coroutine, MoreExecutors.directExecutor())
+ coroutine.start(start, coroutine, block)
return future
}
private class ListenableFutureCoroutine<T>(
- override val context: CoroutineContext
-) : AbstractFuture<T>(), Continuation<T>, CoroutineScope {
- override val coroutineContext: CoroutineContext get() = context
- override fun resumeWith(result: Result<T>) {
- result
- .onSuccess { set(it) }
- .onFailure { setException(it) }
+ context: CoroutineContext,
+ private val completion: SettableFuture<T>
+) : AbstractCoroutine<T>(context), FutureCallback<T> {
+
+ /*
+ * We register coroutine as callback to the future this coroutine completes.
+ * But when future is cancelled externally, we'd like to cancel coroutine,
+ * so we register on failure handler for this purpose
+ */
+ override fun onSuccess(result: T?) {
+ // Do nothing
}
- override fun interruptTask() { context[Job]!!.cancel() }
+
+ override fun onFailure(t: Throwable) {
+ if (t is CancellationException) {
+ cancel()
+ }
+ }
+
+ override fun onCompleted(value: T) {
+ completion.set(value)
+ }
+
+ override fun onCompletedExceptionally(exception: Throwable) {
+ completion.setException(exception)
+ }
}
/**
diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
index 719c102..cd19002 100644
--- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
+++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
@@ -13,6 +13,7 @@
import org.junit.Test
import java.io.*
import java.util.concurrent.*
+import kotlin.reflect.*
import kotlin.test.assertFailsWith
class ListenableFutureTest : TestBase() {
@@ -257,6 +258,57 @@
}
}
+ @Test
+ fun testChildException() = runTest {
+ val result = future(Dispatchers.Unconfined) {
+ // child crashes
+ launch { throw TestException("FAIL") }
+ 42
+ }
+
+ result.checkFutureException<TestException>()
+ }
+
+ @Test
+ fun testExceptionAggregation() = runTest {
+ val result = future(Dispatchers.Unconfined) {
+ // child crashes
+ launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
+ launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
+ throw TestException()
+ }
+
+ expect(1)
+ result.checkFutureException<TestException>(TestException1::class, TestException2::class)
+ yield()
+ finish(2) // we are not cancelled
+ }
+
+ @Test
+ fun testExternalCancellation() = runTest {
+ val future = future(Dispatchers.Unconfined) {
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(2)
+ }
+ }
+
+ yield()
+ expect(1)
+ future.cancel(true)
+ finish(3)
+ }
+
+ private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
+ val e = assertFailsWith<ExecutionException> { get() }
+ val cause = e.cause!!
+ assertTrue(cause is T)
+ for ((index, clazz) in suppressed.withIndex()) {
+ assertTrue(clazz.isInstance(cause.suppressed[index]))
+ }
+ }
+
private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> {
val latch = CountDownLatch(1)
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index 5305f6d..9a6f245 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -37,22 +37,28 @@
) : CompletableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = this.newCoroutineContext(context)
- val job = Job(newContext[Job])
- val future = CompletableFutureCoroutine<T>(newContext + job)
- job.cancelFutureOnCompletion(future)
- future.whenComplete { _, exception -> job.cancel(exception) }
- start(block, receiver = future, completion = future) // use the specified start strategy
+ val future = CompletableFuture<T>()
+ val coroutine = CompletableFutureCoroutine(newContext, future)
+ future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
+ coroutine.start(start, coroutine, block)
return future
}
private class CompletableFutureCoroutine<T>(
- override val context: CoroutineContext
-) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
- override val coroutineContext: CoroutineContext get() = context
- override fun resumeWith(result: Result<T>) {
- result
- .onSuccess { complete(it) }
- .onFailure { completeExceptionally(it) }
+ context: CoroutineContext,
+ private val completion: CompletableFuture<T>
+) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
+
+ override fun accept(value: T?, exception: Throwable?) {
+ cancel()
+ }
+
+ override fun onCompleted(value: T) {
+ completion.complete(value)
+ }
+
+ override fun onCompletedExceptionally(exception: Throwable) {
+ completion.completeExceptionally(exception)
}
}
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index 792cdcf..e970729 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -15,6 +15,7 @@
import java.util.function.*
import kotlin.concurrent.*
import kotlin.coroutines.*
+import kotlin.reflect.*
import kotlin.test.assertFailsWith
class FutureTest : TestBase() {
@@ -369,6 +370,56 @@
return future
}
+ @Test
+ fun testChildException() = runTest {
+ val result = future(Dispatchers.Unconfined) {
+ // child crashes
+ launch { throw TestException("FAIL") }
+ 42
+ }
+
+ result.checkFutureException<TestException>()
+ }
+
+ @Test
+ fun testExceptionAggregation() = runTest {
+ val result = future(Dispatchers.Unconfined) {
+ // child crashes
+ launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
+ launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
+ throw TestException()
+ }
+
+ expect(1)
+ result.checkFutureException<TestException>(TestException1::class, TestException2::class)
+ yield()
+ finish(2) // we are not cancelled
+ }
+
+ @Test
+ fun testExternalCompletion() = runTest {
+ expect(1)
+ val result = future(Dispatchers.Unconfined) {
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(2)
+ }
+ }
+
+ result.complete(Unit)
+ finish(3)
+ }
+
+ private inline fun <reified T: Throwable> CompletableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
+ val e = assertFailsWith<ExecutionException> { get() }
+ val cause = e.cause!!
+ assertTrue(cause is T)
+ for ((index, clazz) in suppressed.withIndex()) {
+ assertTrue(clazz.isInstance(cause.suppressed[index]))
+ }
+ }
+
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
wrapper {