Make exception handling in AbstractContinuation consistent: always prefer exception thrown from coroutine as exceptional reason, add cancellation cause as suppressed exception

Add workaround to work with suppressed exceptions in tests
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
index 66d843f..e93eca3 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
@@ -234,7 +234,12 @@
                     } else {
                         /*
                          * If already cancelled block is resumed with an exception,
-                         * then we should properly merge them to avoid information loss
+                         * then we should properly merge them to avoid information loss.
+                         *
+                         * General rule:
+                         * Thrown exception always becomes a result and cancellation reason
+                         * is added to suppressed exceptions if necessary.
+                         * Basic duplicate/cycles check is performed
                          */
                         val update: CompletedExceptionally
 
@@ -244,15 +249,15 @@
                          * ```
                          * T1: ctxJob.cancel(e1) // -> cancelling
                          * T2:
-                         * withContext(ctx) {
+                         * withContext(ctx, Mode.ATOMIC) {
                          *   // -> resumed with cancellation exception
                          * }
                          * ```
                          */
                         if (proposedUpdate.cause is CancellationException) {
                             // Keep original cancellation cause and try add to suppressed exception from proposed cancel
-                            update = state.cancel
-                            coerceWithCancellation(state, proposedUpdate, update)
+                            update = proposedUpdate
+                            coerceWithException(state, update)
                         } else {
                             /*
                              * Proposed update is exception => transition to terminal state
@@ -295,16 +300,16 @@
         }
     }
 
-    // Coerce current cancelling state with proposed cancellation
-    private fun coerceWithCancellation(state: Cancelling, proposedUpdate: CompletedExceptionally, update: CompletedExceptionally) {
+    // Coerce current cancelling state with proposed exception
+    private fun coerceWithException(state: Cancelling, proposedUpdate: CompletedExceptionally) {
         val originalCancellation = state.cancel
         val originalException = originalCancellation.cause
         val updateCause = proposedUpdate.cause
-        // Cause of proposed update is present and differs from one in current state TODO clashes with await all
+        // Cause of proposed update is present and differs from one in current state
         val isSameCancellation = originalCancellation.cause is CancellationException
                 && originalException.cause === updateCause.cause
-        if (!isSameCancellation && originalException.cause !== updateCause) {
-            update.cause.addSuppressedThrowable(updateCause)
+        if (!isSameCancellation && (originalException.cause !== updateCause)) {
+            proposedUpdate.cause.addSuppressedThrowable(originalException)
         }
     }
 
diff --git a/core/kotlinx-coroutines-core/build.gradle b/core/kotlinx-coroutines-core/build.gradle
index ca4131d..d367e22 100644
--- a/core/kotlinx-coroutines-core/build.gradle
+++ b/core/kotlinx-coroutines-core/build.gradle
@@ -37,11 +37,11 @@
     include '**/*LFTest.*'
 }
 
-
 task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) {
     executable = "$System.env.JDK_16/bin/java"
     exclude '**/*LinearizabilityTest.*'
     exclude '**/*LFTest.*'
+    exclude '**/*CancellableContinuationExceptionHandlingTest.*'
 }
 
 task moreTest(dependsOn: [lockFreedomTest, jdk16Test])
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationExceptionHandlingTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationExceptionHandlingTest.kt
new file mode 100644
index 0000000..e728d73
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationExceptionHandlingTest.kt
@@ -0,0 +1,346 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.Test
+import java.io.*
+import java.nio.channels.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class CancellableContinuationExceptionHandlingTest : TestBase() {
+
+    @Test
+    fun testCancellation() = runTest {
+        /*
+         * Continuation cancelled without cause
+         * Continuation itself throws ISE
+         * Result: ISE with suppressed CancellationException
+         */
+        runCancellation(coroutineContext, null, IllegalStateException()) { e ->
+            assertNull(e.cause)
+            val suppressed = e.suppressed()
+            assertTrue(suppressed.size == 1)
+
+            val cancellation = suppressed[0] as CancellationException
+            assertNull(cancellation.cause)
+            assertTrue(cancellation.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testCancellationWithException() = runTest {
+        /*
+         * Continuation cancelled with IOE
+         * Continuation itself throws ISE
+         * Result: ISE with suppressed CancellationException(IOE)
+         */
+        val cancellationCause = IOException()
+        runCancellation(coroutineContext, cancellationCause, IllegalStateException()) { e ->
+            assertNull(e.cause)
+            val suppressed = e.suppressed()
+            assertTrue(suppressed.size == 1)
+
+            val cancellation = suppressed[0] as CancellationException
+            assertSame(cancellation.cause, cancellationCause)
+            assertTrue(cancellationCause.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testSameException() = runTest {
+        /*
+         * Continuation cancelled with ISE
+         * Continuation itself throws the same ISE
+         * Result: ISE
+         */
+        val cancellationCause = IllegalStateException()
+        runCancellation(coroutineContext, cancellationCause, cancellationCause) { e ->
+            assertNull(e.cause)
+            val suppressed = e.suppressed()
+            assertTrue(suppressed.isEmpty())
+        }
+    }
+
+    @Test
+    fun testSameCancellation() = runTest {
+        /*
+         * Continuation cancelled with CancellationException
+         * Continuation itself throws the same CE
+         * Result: CE
+         */
+        val cancellationCause = CancellationException()
+        runCancellation(coroutineContext, cancellationCause, cancellationCause) { e ->
+            assertNull(e.cause)
+            assertSame(e, cancellationCause)
+            val suppressed = e.suppressed()
+            assertTrue(suppressed.isEmpty())
+        }
+    }
+
+    @Test
+    fun testSameCancellationWithException() = runTest {
+        /*
+         * Continuation cancelled with CancellationException(IOE)
+         * Continuation itself throws the same IOE
+         * Result: IOE
+         */
+        val cancellationCause = CancellationException()
+        val exception = IOException()
+        cancellationCause.initCause(exception)
+        runCancellation(coroutineContext, cancellationCause, exception) { e ->
+            assertNull(e.cause)
+            assertSame(exception, e)
+            assertTrue(e.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testConflictingCancellation() = runTest {
+        /*
+         * Continuation cancelled with ISE
+         * Continuation itself throws CE(IOE)
+         * Result: CE(IOE) with suppressed JCE(ISE)
+         */
+        val cancellationCause = IllegalStateException()
+        val thrown = CancellationException()
+        thrown.initCause(IOException())
+        runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+            assertSame(thrown, e)
+            assertEquals(1, thrown.suppressed().size)
+
+            val suppressed = thrown.suppressed()[0]
+            assertTrue(suppressed is JobCancellationException)
+            assertTrue(suppressed.cause is IllegalStateException)
+        }
+    }
+
+    @Test
+    fun testConflictingCancellation2() = runTest {
+        /*
+         * Continuation cancelled with ISE
+         * Continuation itself throws CE
+         * Result: CE with suppressed JCE(ISE)
+         */
+        val cancellationCause = IllegalStateException()
+        val thrown = CancellationException()
+        runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+            assertSame(thrown, e)
+            assertEquals(1, thrown.suppressed().size)
+
+            val suppressed = thrown.suppressed()[0]
+            assertTrue(suppressed is JobCancellationException)
+            assertTrue(suppressed.cause is IllegalStateException)
+
+        }
+    }
+
+    @Test
+    fun testConflictingCancellation3() = runTest {
+        /*
+         * Continuation cancelled with CE
+         * Continuation itself throws CE
+         * Result: CE
+         */
+        val cancellationCause = CancellationException()
+        val thrown = CancellationException()
+        runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+            assertSame(thrown, e)
+            assertNull(e.cause)
+            assertTrue(e.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testConflictingCancellationWithSameException() = runTest {
+        val cancellationCause = IllegalStateException()
+        val thrown = CancellationException()
+        /*
+         * Continuation cancelled with ISE
+         * Continuation itself throws CE with the same ISE as a cause
+         * Result: CE(ISE)
+         */
+        thrown.initCause(cancellationCause)
+        runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+            assertSame(thrown, e)
+            assertSame(cancellationCause, e.cause)
+            assertEquals(0, thrown.suppressed().size)
+
+        }
+    }
+
+    @Test
+    fun testThrowingCancellation() = runTest {
+        val thrown = CancellationException()
+        runThrowingContinuation(coroutineContext, thrown) { e ->
+            assertSame(thrown, e)
+        }
+    }
+
+    @Test
+    fun testThrowingCancellationWithCause() = runTest {
+        val thrown = CancellationException()
+        thrown.initCause(IOException())
+        runThrowingContinuation(coroutineContext, thrown) { e ->
+            assertSame(thrown, e)
+        }
+    }
+
+    @Test
+    fun testCancel() = runTest {
+        runOnlyCancellation(coroutineContext, null) { e ->
+            assertNull(e.cause)
+            assertTrue(e.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testCancelWithCause() = runTest {
+        val cause = IOException()
+        runOnlyCancellation(coroutineContext, cause) { e ->
+            assertSame(cause, e.cause)
+            assertTrue(e.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testCancelWithCancellationException() = runTest {
+        val cause = CancellationException()
+        runThrowingContinuation(coroutineContext, cause) { e ->
+            assertSame(cause, e)
+            assertNull(e.cause)
+            assertTrue(e.suppressed().isEmpty())
+        }
+    }
+
+    @Test
+    fun testMultipleCancellations() = runTest {
+        var continuation: Continuation<Unit>? = null
+
+        val job = launch(coroutineContext) {
+            try {
+                expect(2)
+                suspendCancellableCoroutine<Unit> { c ->
+                    continuation = c
+                }
+            } catch (e: IOException) {
+                expect(3)
+            }
+        }
+
+        expect(1)
+        yield()
+        continuation!!.resumeWithException(IOException())
+        yield()
+        assertFailsWith<IllegalStateException> { continuation!!.resumeWithException(ClosedChannelException()) }
+        try {
+            job.join()
+        } finally {
+            finish(4)
+        }
+    }
+
+    @Test
+    fun testResumeAndCancel() = runTest {
+        var continuation: Continuation<Unit>? = null
+
+        val job = launch(coroutineContext) {
+            expect(2)
+            suspendCancellableCoroutine<Unit> { c ->
+                continuation = c
+            }
+            expect(3)
+        }
+
+        expect(1)
+        yield()
+        continuation!!.resume(Unit)
+        job.join()
+        assertFailsWith<IllegalStateException> { continuation!!.resumeWithException(ClosedChannelException()) }
+        finish(4)
+    }
+
+    private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
+        val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
+        return object : CoroutineDispatcher() {
+            override fun dispatch(context: CoroutineContext, block: Runnable) {
+                dispatcher.dispatch(context, block)
+            }
+        }
+    }
+
+    private suspend inline fun <reified T : Exception> CoroutineScope.runCancellation(
+        coroutineContext: CoroutineContext,
+        cancellationCause: Exception?,
+        thrownException: T, exceptionChecker: (T) -> Unit
+    ) {
+
+        expect(1)
+        val job = Job()
+        job.cancel(cancellationCause)
+
+        try {
+            withContext(wrapperDispatcher(coroutineContext) + job, CoroutineStart.ATOMIC) {
+                require(isActive)
+                expect(2)
+                throw thrownException
+            }
+        } catch (e: Exception) {
+            assertTrue(e is T)
+            exceptionChecker(e as T)
+            finish(3)
+            return
+        }
+
+        fail()
+    }
+
+    private suspend inline fun <reified T : Exception> CoroutineScope.runThrowingContinuation(
+        coroutineContext: CoroutineContext,
+        thrownException: T, exceptionChecker: (T) -> Unit
+    ) {
+
+        expect(1)
+        try {
+            withContext(wrapperDispatcher(coroutineContext), CoroutineStart.ATOMIC) {
+                require(isActive)
+                expect(2)
+                throw thrownException
+            }
+        } catch (e: Exception) {
+            assertTrue(e is T)
+            exceptionChecker(e as T)
+            finish(3)
+            return
+        }
+
+        fail()
+    }
+
+    private suspend inline fun CoroutineScope.runOnlyCancellation(
+        coroutineContext: CoroutineContext,
+        cancellationCause: Exception?, exceptionChecker: (CancellationException) -> Unit
+    ) {
+
+        expect(1)
+        val job = Job()
+        job.cancel(cancellationCause)
+        try {
+            withContext(wrapperDispatcher(coroutineContext) + job, CoroutineStart.ATOMIC) {
+                require(isActive)
+                expect(2)
+            }
+        } catch (e: Exception) {
+            assertTrue(e is CancellationException)
+            exceptionChecker(e as CancellationException)
+            finish(3)
+            return
+        }
+
+        fail()
+    }
+}
+
+// Workaround to run tests on JDK 8, this test is excluded from jdk16Test task
+fun Throwable.suppressed(): Array<Throwable> {
+    val method = this::class.java.getMethod("getSuppressed") ?: error("This test can only be run using JDK 1.8")
+    return method.invoke(this) as Array<Throwable>
+}
diff --git a/gradle/compile-jvm.gradle b/gradle/compile-jvm.gradle
index 74030e5..c289b50 100644
--- a/gradle/compile-jvm.gradle
+++ b/gradle/compile-jvm.gradle
@@ -11,6 +11,8 @@
 dependencies {
     compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
     testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
+    // Workaround to make addSuppressed work in tests
+    testCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
     testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
     testCompile "junit:junit:$junit_version"
 }
diff --git a/integration/kotlinx-coroutines-guava/src/test/kotlin/kotlinx/coroutines/experimental/guava/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/src/test/kotlin/kotlinx/coroutines/experimental/guava/ListenableFutureTest.kt
index 7db81ca..fe958ac 100644
--- a/integration/kotlinx-coroutines-guava/src/test/kotlin/kotlinx/coroutines/experimental/guava/ListenableFutureTest.kt
+++ b/integration/kotlinx-coroutines-guava/src/test/kotlin/kotlinx/coroutines/experimental/guava/ListenableFutureTest.kt
@@ -22,6 +22,7 @@
 import org.hamcrest.core.*
 import org.junit.*
 import org.junit.Assert.*
+import java.io.*
 import java.util.concurrent.*
 import kotlin.coroutines.experimental.*
 
@@ -43,6 +44,32 @@
     }
 
     @Test
+    fun testAwaitWithContext() = runTest {
+        val future = SettableFuture.create<Int>()
+        val deferred = async(coroutineContext) {
+            withContext(CommonPool) {
+                future.await()
+            }
+        }
+
+        future.set(1)
+        assertEquals(1, deferred.await())
+    }
+
+    @Test
+    fun testAwaitWithContextCancellation() = runTest(expected = {it is JobCancellationException}) {
+        val future = SettableFuture.create<Int>()
+        val deferred = async(coroutineContext) {
+            withContext(CommonPool) {
+                future.await()
+            }
+        }
+
+        deferred.cancel(IOException())
+        deferred.await()
+    }
+
+    @Test
     fun testCompletedFuture() {
         val toAwait = SettableFuture.create<String>()
         toAwait.set("O")