Re-implement jdk8 and Guava future builders on top o AbstractCoroutine in order to properly handle parallel decomposition

  * Get rid of cancel(cause)
  * Improve test coverage for cancellation and exception handling

Fixes #751
diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
index ceb6932..04f6c61 100644
--- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
+++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
@@ -7,6 +7,7 @@
 import com.google.common.util.concurrent.*
 import kotlinx.coroutines.*
 import java.util.concurrent.*
+import java.util.concurrent.CancellationException
 import kotlin.coroutines.*
 
 /**
@@ -37,23 +38,40 @@
 ): ListenableFuture<T> {
     require(!start.isLazy) { "$start start is not supported" }
     val newContext = newCoroutineContext(context)
-    val job = Job(newContext[Job])
-    val future = ListenableFutureCoroutine<T>(newContext + job)
-    job.cancelFutureOnCompletion(future)
-    start(block, receiver = future, completion = future) // use the specified start strategy
+    val future = SettableFuture.create<T>()
+    val coroutine = ListenableFutureCoroutine(newContext, future)
+    future.addCallback(coroutine, MoreExecutors.directExecutor())
+    coroutine.start(start, coroutine, block)
     return future
 }
 
 private class ListenableFutureCoroutine<T>(
-    override val context: CoroutineContext
-) : AbstractFuture<T>(), Continuation<T>, CoroutineScope {
-    override val coroutineContext: CoroutineContext get() = context
-    override fun resumeWith(result: Result<T>) {
-        result
-            .onSuccess { set(it) }
-            .onFailure { setException(it) }
+    context: CoroutineContext,
+    private val completion: SettableFuture<T>
+) : AbstractCoroutine<T>(context), FutureCallback<T> {
+
+    /*
+     * We register coroutine as callback to the future this coroutine completes.
+     * But when future is cancelled externally, we'd like to cancel coroutine,
+     * so we register on failure handler for this purpose
+     */
+    override fun onSuccess(result: T?) {
+        // Do nothing
     }
-    override fun interruptTask() { context[Job]!!.cancel() }
+
+    override fun onFailure(t: Throwable) {
+        if (t is CancellationException) {
+            cancel()
+        }
+    }
+
+    override fun onCompleted(value: T) {
+        completion.set(value)
+    }
+
+    override fun onCompletedExceptionally(exception: Throwable) {
+        completion.setException(exception)
+    }
 }
 
 /**
diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
index 719c102..cd19002 100644
--- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
+++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
@@ -13,6 +13,7 @@
 import org.junit.Test
 import java.io.*
 import java.util.concurrent.*
+import kotlin.reflect.*
 import kotlin.test.assertFailsWith
 
 class ListenableFutureTest : TestBase() {
@@ -257,6 +258,57 @@
         }
     }
 
+    @Test
+    fun testChildException() = runTest {
+        val result = future(Dispatchers.Unconfined) {
+            // child crashes
+            launch { throw TestException("FAIL") }
+            42
+        }
+
+        result.checkFutureException<TestException>()
+    }
+
+    @Test
+    fun testExceptionAggregation() = runTest {
+        val result = future(Dispatchers.Unconfined) {
+            // child crashes
+            launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
+            launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
+            throw TestException()
+        }
+
+        expect(1)
+        result.checkFutureException<TestException>(TestException1::class, TestException2::class)
+        yield()
+        finish(2) // we are not cancelled
+    }
+
+    @Test
+    fun testExternalCancellation() = runTest {
+        val future = future(Dispatchers.Unconfined) {
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                expect(2)
+            }
+        }
+
+        yield()
+        expect(1)
+        future.cancel(true)
+        finish(3)
+    }
+
+    private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
+        val e = assertFailsWith<ExecutionException> { get() }
+        val cause = e.cause!!
+        assertTrue(cause is T)
+        for ((index, clazz) in suppressed.withIndex()) {
+            assertTrue(clazz.isInstance(cause.suppressed[index]))
+        }
+    }
+
     private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture<Int> {
         val latch = CountDownLatch(1)
         val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index 5305f6d..9a6f245 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -37,22 +37,28 @@
 ) : CompletableFuture<T> {
     require(!start.isLazy) { "$start start is not supported" }
     val newContext = this.newCoroutineContext(context)
-    val job = Job(newContext[Job])
-    val future = CompletableFutureCoroutine<T>(newContext + job)
-    job.cancelFutureOnCompletion(future)
-    future.whenComplete { _, exception -> job.cancel(exception) }
-    start(block, receiver = future, completion = future) // use the specified start strategy
+    val future = CompletableFuture<T>()
+    val coroutine = CompletableFutureCoroutine(newContext, future)
+    future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
+    coroutine.start(start, coroutine, block)
     return future
 }
 
 private class CompletableFutureCoroutine<T>(
-    override val context: CoroutineContext
-) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
-    override val coroutineContext: CoroutineContext get() = context
-    override fun resumeWith(result: Result<T>) {
-        result
-            .onSuccess { complete(it) }
-            .onFailure { completeExceptionally(it) }
+    context: CoroutineContext,
+    private val completion: CompletableFuture<T>
+) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
+
+    override fun accept(value: T?, exception: Throwable?) {
+        cancel()
+    }
+
+    override fun onCompleted(value: T) {
+        completion.complete(value)
+    }
+
+    override fun onCompletedExceptionally(exception: Throwable) {
+        completion.completeExceptionally(exception)
     }
 }
 
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index 792cdcf..e970729 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -15,6 +15,7 @@
 import java.util.function.*
 import kotlin.concurrent.*
 import kotlin.coroutines.*
+import kotlin.reflect.*
 import kotlin.test.assertFailsWith
 
 class FutureTest : TestBase() {
@@ -369,6 +370,56 @@
         return future
     }
 
+    @Test
+    fun testChildException() = runTest {
+        val result = future(Dispatchers.Unconfined) {
+            // child crashes
+            launch { throw TestException("FAIL") }
+            42
+        }
+
+        result.checkFutureException<TestException>()
+    }
+
+    @Test
+    fun testExceptionAggregation() = runTest {
+        val result = future(Dispatchers.Unconfined) {
+            // child crashes
+            launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
+            launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
+            throw TestException()
+        }
+
+        expect(1)
+        result.checkFutureException<TestException>(TestException1::class, TestException2::class)
+        yield()
+        finish(2) // we are not cancelled
+    }
+
+    @Test
+    fun testExternalCompletion() = runTest {
+        expect(1)
+        val result = future(Dispatchers.Unconfined) {
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                expect(2)
+            }
+        }
+
+        result.complete(Unit)
+        finish(3)
+    }
+
+    private inline fun <reified T: Throwable> CompletableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
+        val e = assertFailsWith<ExecutionException> { get() }
+        val cause = e.cause!!
+        assertTrue(cause is T)
+        for ((index, clazz) in suppressed.withIndex()) {
+            assertTrue(clazz.isInstance(cause.suppressed[index]))
+        }
+    }
+
     private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
         override fun dispatch(context: CoroutineContext, block: Runnable) {
             wrapper {